diff --git a/crates/ente-brain/examples/brainctl.rs b/crates/ente-brain/examples/brainctl.rs index 6330564..14a376c 100644 --- a/crates/ente-brain/examples/brainctl.rs +++ b/crates/ente-brain/examples/brainctl.rs @@ -10,7 +10,9 @@ //! Path del socket: $ENTE_BRAIN_SOCK o $XDG_RUNTIME_DIR/ente-brain.sock use ente_brain::introspect::{call, IntrospectRequest, IntrospectResponse}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::UnixStream; fn socket_path() -> PathBuf { if let Ok(p) = std::env::var("ENTE_BRAIN_SOCK") { @@ -26,6 +28,12 @@ async fn main() -> anyhow::Result<()> { let args: Vec = std::env::args().collect(); let cmd = args.get(1).map(|s| s.as_str()).unwrap_or("entropy"); + // Comando especial: streaming. Mantiene la conn abierta y lee frames + // hasta Ctrl-C o EOF del servidor. + if cmd == "stream-audit" || cmd == "stream" { + return run_stream_audit(socket_path()).await; + } + let req = match cmd { "list-rules" | "rules" => IntrospectRequest::ListRules, "entropy" => IntrospectRequest::EntropySnapshot, @@ -53,6 +61,7 @@ async fn main() -> anyhow::Result<()> { } "flush-audit" => IntrospectRequest::FlushAudit, "audit-verify" | "verify" => IntrospectRequest::VerifyAudit, + "replay" => IntrospectRequest::ReplayAudit, "reload" => { let path = args.get(2).cloned(); IntrospectRequest::ReloadRules { path } @@ -131,6 +140,14 @@ fn print_response(r: &IntrospectResponse) { IntrospectResponse::Reloaded { count } => { println!("reload OK: {count} reglas activas tras reload"); } + IntrospectResponse::Replayed(rep) => { + if let Some(e) = &rep.error { + println!("✗ replay falló: {e}"); + } else { + println!("✓ replay completo — {} actions aplicadas, {} reglas finales", + rep.applied, rep.final_rule_count); + } + } IntrospectResponse::AuditVerified(rep) => { if let Some(seq) = rep.broken_at_seq { println!("✗ verificación FALLÓ tras seq={seq}"); @@ -141,6 +158,11 @@ fn print_response(r: &IntrospectResponse) { if let Some(g) = rep.genesis_sha { println!(" genesis: {}", hex_long(g)); } } } + IntrospectResponse::AuditStreamFrame(_) => { + // En modo request/response no debería llegar; solo aparece en + // run_stream_audit. Si llega aquí es un bug del servidor. + eprintln!("frame de stream recibido fuera de stream-audit (bug)"); + } IntrospectResponse::Error(e) => eprintln!("error: {e}"), } } @@ -152,3 +174,43 @@ fn hex_short(sha: [u8; 32]) -> String { fn hex_long(sha: [u8; 32]) -> String { sha.iter().map(|b| format!("{:02x}", b)).collect() } + +async fn run_stream_audit(path: PathBuf) -> anyhow::Result<()> { + let mut stream = UnixStream::connect(&path).await?; + let req = IntrospectRequest::StreamAudit; + let buf = bincode::serialize(&req)?; + stream.write_u32(buf.len() as u32).await?; + stream.write_all(&buf).await?; + eprintln!("audit stream conectado a {} — Ctrl-C para salir", path.display()); + + loop { + let mut len_buf = [0u8; 4]; + if stream.read_exact(&mut len_buf).await.is_err() { + eprintln!("\nstream cerrado por el servidor"); + return Ok(()); + } + let len = u32::from_be_bytes(len_buf) as usize; + if len > 4 * 1024 * 1024 { anyhow::bail!("frame oversize"); } + let mut buf = vec![0u8; len]; + stream.read_exact(&mut buf).await?; + let resp: IntrospectResponse = bincode::deserialize(&buf)?; + match resp { + IntrospectResponse::AuditStreamFrame(entry) => { + let prev = entry.prev_sha + .map(|s| s[..4].iter().map(|b| format!("{:02x}", b)).collect::() + "..") + .unwrap_or_else(|| "—".into()); + let sha = entry.sha[..4].iter().map(|b| format!("{:02x}", b)) + .collect::() + ".."; + println!("[stream] seq={} prev={} sha={} {:?}", + entry.seq, prev, sha, entry.action); + } + other => { + eprintln!("frame no esperado en stream: {other:?}"); + return Ok(()); + } + } + } +} + +#[allow(dead_code)] +fn _suppress(_: &Path) {} // mantener Path import si compilador se queja diff --git a/crates/ente-brain/src/audit.rs b/crates/ente-brain/src/audit.rs index f32d1b2..680c9a8 100644 --- a/crates/ente-brain/src/audit.rs +++ b/crates/ente-brain/src/audit.rs @@ -45,6 +45,9 @@ pub struct AuditLog { last_flushed_sha: Option<[u8; 32]>, /// Path opcional donde escribir el head pointer tras cada flush. head_pointer_path: Option, + /// Subscribers a entries en tiempo real. Cada `append` empuja a todos. + /// Subscribers cuyo receiver se dropeó se purgan en el siguiente push. + subscribers: Vec>, } impl AuditLog { @@ -60,9 +63,21 @@ impl AuditLog { flushed_count: 0, last_flushed_sha: None, head_pointer_path: None, + subscribers: Vec::new(), } } + /// Registra un nuevo subscriber. El receiver recibe cada `AuditEntry` + /// futuro hasta que el receiver se dropee (subscriber se purga al + /// siguiente `append`). + pub fn subscribe(&mut self) -> tokio::sync::mpsc::UnboundedReceiver { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + self.subscribers.push(tx); + rx + } + + pub fn subscriber_count(&self) -> usize { self.subscribers.len() } + pub fn with_head_pointer(mut self, path: std::path::PathBuf) -> Self { self.head_pointer_path = Some(path); self @@ -86,6 +101,8 @@ impl AuditLog { self.entries.pop_front(); } self.entries.push_back(entry.clone()); + // Empujar a subscribers, purgando los muertos in-place. + self.subscribers.retain(|tx| tx.send(entry.clone()).is_ok()); entry } @@ -164,6 +181,14 @@ pub struct AuditHeadPointer { pub timestamp_ms: u64, } +/// Reporte de un replay: número de actions aplicadas + reglas finales. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplayReport { + pub applied: u64, + pub final_rule_count: usize, + pub error: Option, +} + /// Reporte de verificación de la cadena audit. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VerificationReport { @@ -241,6 +266,66 @@ pub fn verify_chain_from_cas(start_sha: [u8; 32]) -> VerificationReport { } } +/// Recorre la cadena entera (head→genesis) y reconstruye la lista de +/// actions en orden cronológico (oldest first). Útil tanto para replay +/// como para auditoría retrospectiva. +pub fn collect_chain_from_cas(start_sha: [u8; 32]) -> anyhow::Result> { + let mut entries = Vec::new(); + let mut current = Some(start_sha); + while let Some(sha) = current { + let path = ente_cas::cas_root().join(ente_cas::hex(&sha)); + let bytes = std::fs::read(&path)?; + let mut entry: AuditEntry = serde_json::from_slice(&bytes)?; + entry.sha = sha; + let prev = entry.prev_sha; + entries.push(entry); + current = prev; + } + // entries está en orden head→genesis. Reverse para chronological. + entries.reverse(); + Ok(entries) +} + +/// Aplica las actions de la cadena en orden cronológico contra un engine +/// fresco. PromoteCrystal → insert. RemoveRule → remove. LoadRulesFile → +/// log informativo (los archivos pueden no existir en el ambiente actual). +pub fn replay_chain( + start_sha: [u8; 32], + engine: &mut crate::engine::RuleEngine, +) -> ReplayReport { + let entries = match collect_chain_from_cas(start_sha) { + Ok(es) => es, + Err(e) => return ReplayReport { + applied: 0, final_rule_count: engine.len(), + error: Some(format!("collect chain: {e}")), + }, + }; + let mut applied = 0u64; + for entry in &entries { + match &entry.action { + AuditAction::PromoteCrystal { rule_id, crystal } => { + let mut rule = crate::crystallize::crystal_to_rule(crystal); + rule.id = *rule_id; // preservar identidad histórica + engine.insert(rule); + } + AuditAction::RemoveRule { rule_id } => { + engine.remove(*rule_id); + } + AuditAction::LoadRulesFile { path: _, count: _ } => { + // Los archivos referenciados por path pueden haber cambiado + // o no existir. Log y skip — el replay sólo reconstruye + // promotes/removes que tienen estado en CAS. + } + } + applied += 1; + } + ReplayReport { + applied, + final_rule_count: engine.len(), + error: None, + } +} + impl Default for AuditLog { fn default() -> Self { Self::new() } } diff --git a/crates/ente-brain/src/crystallize.rs b/crates/ente-brain/src/crystallize.rs index 0957e8b..bfd0c04 100644 --- a/crates/ente-brain/src/crystallize.rs +++ b/crates/ente-brain/src/crystallize.rs @@ -8,7 +8,7 @@ //! Cada cristal puede emitirse como snippet KCL (texto humano-readable) o //! como `Rule` ejecutable directamente por el motor. -use crate::observer::Observer; +use crate::observer::{GapStats, Observer}; use crate::rules::{Action, EventKind, EventPattern, LogLevel, Rule, Scope}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -20,6 +20,11 @@ pub struct Crystal { pub conditional_prob: f64, pub pmi: f64, pub support: u64, + /// Estadísticas del gap temporal entre antecedent → consequent. + /// None si no hay histograma. Habilita generación de reglas Sequence + /// con `within_ms = (mean + 2σ) * 1000`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub gap_stats: Option, } #[derive(Debug, Clone, Copy)] @@ -47,15 +52,19 @@ pub fn detect_crystals(obs: &Observer, params: &CrystallizationParams) -> Vec String { } } -/// Convierte un cristal a una `Rule` ejecutable por el motor. Útil para -/// "auto-aprendizaje" donde cristales se promueven a reglas vivas tras -/// validar con el operador. +/// Convierte un cristal a una `Rule` ejecutable. Si hay gap_stats con +/// muestras suficientes (≥ 4), genera una regla `Sequence` con +/// `within_ms = (mean + 2σ) * 1000`. 2σ cubre ~95% de la distribución +/// asumiendo normalidad — captura el "tiempo típico de respuesta" del +/// patrón observado. Si no hay stats, fallback a `Single { antecedent }`. pub fn crystal_to_rule(c: &Crystal) -> Rule { + let when = match &c.gap_stats { + Some(s) if s.count >= 4 => { + // Mínimo 1ms para evitar within_ms=0 cuando varianza colapsa. + let bound_secs = (s.mean_secs + 2.0 * s.stddev_secs).max(0.001); + EventPattern::Sequence { + kinds: vec![c.antecedent.clone(), c.consequent.clone()], + within_ms: (bound_secs * 1000.0).ceil() as u64, + } + } + _ => EventPattern::Single { kind: c.antecedent.clone() }, + }; + let message = match &c.gap_stats { + Some(s) if s.count >= 4 => format!( + "crystal seq: {:?} → {:?} (P={:.2}, PMI={:.2}, gap={:.3}±{:.3}s)", + c.antecedent, c.consequent, c.conditional_prob, c.pmi, + s.mean_secs, s.stddev_secs, + ), + _ => format!( + "crystal: {:?} → {:?} (P={:.2}, PMI={:.2}, n={})", + c.antecedent, c.consequent, c.conditional_prob, c.pmi, c.support + ), + }; Rule { id: Ulid::new(), priority: 5, - when: EventPattern::Single { kind: c.antecedent.clone() }, + when, scope: Scope::default(), - then: vec![Action::Log { - level: LogLevel::Info, - message: format!( - "crystal: {:?} → {:?} (P={:.2}, PMI={:.2}, n={})", - c.antecedent, c.consequent, c.conditional_prob, c.pmi, c.support - ), - }], + then: vec![Action::Log { level: LogLevel::Info, message }], } } diff --git a/crates/ente-brain/src/introspect.rs b/crates/ente-brain/src/introspect.rs index bdc821e..029bab3 100644 --- a/crates/ente-brain/src/introspect.rs +++ b/crates/ente-brain/src/introspect.rs @@ -106,6 +106,13 @@ pub enum IntrospectRequest { /// Verifica la cadena audit recorriendo prev_sha hasta el genesis, /// validando integridad de cada entry contra el CAS. VerifyAudit, + /// Reconstruye el engine desde la cadena audit. Vacía engine y aplica + /// PromoteCrystal/RemoveRule en orden cronológico. + ReplayAudit, + /// Mantiene la conexión abierta y empuja cada `AuditEntry` nuevo en + /// frames `IntrospectResponse::AuditStreamFrame` hasta que el cliente + /// cierra. Tras esta request no se aceptan más requests en la misma conn. + StreamAudit, } #[derive(Debug, Serialize, Deserialize)] @@ -129,6 +136,10 @@ pub enum IntrospectResponse { Reloaded { count: usize }, /// Resultado de VerifyAudit. AuditVerified(crate::audit::VerificationReport), + /// Resultado de ReplayAudit. + Replayed(crate::audit::ReplayReport), + /// Frame de streaming. El cliente lee estos en bucle hasta EOF. + AuditStreamFrame(crate::audit::AuditEntry), Error(String), } @@ -201,6 +212,11 @@ impl IntrospectServer { let req: IntrospectRequest = bincode::deserialize(&buf)?; debug!(?req, "introspect request"); + // StreamAudit toma posesión de la conn — no más requests aquí. + if matches!(req, IntrospectRequest::StreamAudit) { + return self.stream_audit(stream).await; + } + let resp = self.dispatch(req).await; let out = bincode::serialize(&resp)?; @@ -209,6 +225,22 @@ impl IntrospectServer { } } + /// Modo streaming: subscribe al audit log y empuja cada entry como + /// frame `AuditStreamFrame`. La función retorna cuando el cliente + /// cierra (write falla) o el subscriber se desconecta. + async fn stream_audit(self: Arc, mut stream: UnixStream) -> anyhow::Result<()> { + let mut rx = self.state.audit.write().await.subscribe(); + info!("audit stream client conectado"); + while let Some(entry) = rx.recv().await { + let frame = IntrospectResponse::AuditStreamFrame(entry); + let bytes = bincode::serialize(&frame)?; + if stream.write_u32(bytes.len() as u32).await.is_err() { break; } + if stream.write_all(&bytes).await.is_err() { break; } + } + info!("audit stream client desconectado"); + Ok(()) + } + async fn dispatch(&self, req: IntrospectRequest) -> IntrospectResponse { match req { IntrospectRequest::ListRules => { @@ -333,6 +365,26 @@ impl IntrospectServer { let report = crate::audit::verify_chain_from_cas(head); IntrospectResponse::AuditVerified(report) } + IntrospectRequest::StreamAudit => { + // Inalcanzable por construcción: handle() detecta StreamAudit + // antes de llamar a dispatch(). Pero el match exige cubrir. + IntrospectResponse::Error( + "StreamAudit no debe llegar a dispatch — bug del handler".into() + ) + } + IntrospectRequest::ReplayAudit => { + let head = self.state.audit.read().await.last_flushed_sha(); + let head = match head { + Some(h) => h, + None => return IntrospectResponse::Error( + "audit log sin entries flushadas — nada que replayar".into() + ), + }; + let mut engine = self.state.engine.write().await; + *engine = crate::engine::RuleEngine::empty(); + let report = crate::audit::replay_chain(head, &mut engine); + IntrospectResponse::Replayed(report) + } IntrospectRequest::ReloadRules { path } => { // Path explícito gana sobre el rules_out configurado. let resolved = path.map(std::path::PathBuf::from) diff --git a/crates/ente-brain/src/observer.rs b/crates/ente-brain/src/observer.rs index f46527b..d7e5c20 100644 --- a/crates/ente-brain/src/observer.rs +++ b/crates/ente-brain/src/observer.rs @@ -21,7 +21,7 @@ pub struct TimedEvent { } /// Histograma de gaps temporales con buckets exponenciales en segundos. -/// Cubre 10 órdenes de magnitud: 1ms hasta 1000s. +/// Cubre 6 órdenes de magnitud: 1ms hasta 1000s. #[derive(Debug, Clone, Default)] pub struct GapHistogram { /// Buckets cumulativos (Prometheus-style): cada índice cuenta eventos @@ -29,6 +29,17 @@ pub struct GapHistogram { pub buckets: [u64; 7], pub count: u64, pub sum_secs: f64, + /// Suma de cuadrados — permite calcular varianza/stddev en O(1). + pub sum_squares_secs: f64, + pub max_secs: f64, +} + +/// Estadísticas resumidas de un GapHistogram, usables en cristales temporales. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GapStats { + pub count: u64, + pub mean_secs: f64, + pub stddev_secs: f64, pub max_secs: f64, } @@ -45,6 +56,7 @@ impl GapHistogram { } self.count += 1; self.sum_secs += gap_secs; + self.sum_squares_secs += gap_secs * gap_secs; if gap_secs > self.max_secs { self.max_secs = gap_secs; } } @@ -52,6 +64,26 @@ impl GapHistogram { if self.count == 0 { 0.0 } else { self.sum_secs / self.count as f64 } } + /// Desviación estándar muestral. Computada vía `sum_squares - n*mean²` + /// para precisión razonable sin almacenar las muestras. + pub fn stddev_secs(&self) -> f64 { + if self.count < 2 { return 0.0; } + let n = self.count as f64; + let mean = self.mean_secs(); + let var = (self.sum_squares_secs - n * mean * mean) / (n - 1.0); + // Numerical floor: var puede ser ligeramente negativo por float ε. + if var <= 0.0 { 0.0 } else { var.sqrt() } + } + + pub fn stats(&self) -> GapStats { + GapStats { + count: self.count, + mean_secs: self.mean_secs(), + stddev_secs: self.stddev_secs(), + max_secs: self.max_secs, + } + } + pub fn bucket_limits() -> &'static [f64; 7] { &GAP_BUCKET_LIMITS_SECS } } diff --git a/crates/ente-zero/src/graph/capabilities.rs b/crates/ente-zero/src/graph/capabilities.rs index 049ed1d..af449ad 100644 --- a/crates/ente-zero/src/graph/capabilities.rs +++ b/crates/ente-zero/src/graph/capabilities.rs @@ -1,13 +1,15 @@ -//! Mediación de capabilities: emisión y revocación de tokens. +//! Mediación de capabilities: emisión, renovación, revocación de tokens. //! -//! El Init no fuerza políticas — sólo verifica que el proveedor existe y -//! emite tokens. Las políticas reales (quién puede pedir qué, rate limits, -//! audit) se aplican en capas superiores. +//! Los grants tienen TTL (`DEFAULT_GRANT_TTL`). El cliente debe renovarlos +//! periódicamente con `renew_grant(token)`; en caso contrario, el background +//! task `purge_expired_grants` los revoca al vencimiento. -use super::{EnteGraph, GrantedCapability}; +use super::{EnteGraph, GrantedCapability, DEFAULT_GRANT_TTL}; use crate::events::CapabilityGrant; use ente_card::Capability; +use std::time::Instant; use tokio::sync::oneshot; +use tracing::debug; use ulid::Ulid; impl EnteGraph { @@ -22,12 +24,54 @@ impl EnteGraph { Some(provider) => { let token = self.next_token; self.next_token += 1; + let expires_at = Instant::now() + DEFAULT_GRANT_TTL; self.grants.insert(token, GrantedCapability { - cap: cap.clone(), provider, holder: from, + cap: cap.clone(), + provider, + holder: from, + expires_at, }); CapabilityGrant::Granted { token } } }; let _ = reply.send(grant); } + + /// Extiende un grant existente. Devuelve `true` si renovó. Si el token + /// no existe o ya expiró, `false` (el cliente debe re-acquire). + pub fn renew_grant(&mut self, token: u64) -> bool { + let now = Instant::now(); + if let Some(g) = self.grants.get_mut(&token) { + if g.expires_at > now { + g.expires_at = now + DEFAULT_GRANT_TTL; + return true; + } + // Expired — purgamos aquí mismo. + self.grants.remove(&token); + } + false + } + + /// GC: elimina grants vencidos. Devuelve cuántos fueron purgados. + pub fn purge_expired_grants(&mut self) -> usize { + let now = Instant::now(); + let expired: Vec = self.grants.iter() + .filter(|(_, g)| g.expires_at <= now) + .map(|(t, _)| *t) + .collect(); + for t in &expired { + self.grants.remove(t); + } + if !expired.is_empty() { + debug!(count = expired.len(), "grants expirados purgados"); + } + expired.len() + } + + /// Cuenta de grants vivos (no expirados). Usado por métricas. + pub fn active_grants_count(&self) -> usize { + let now = Instant::now(); + self.grants.values().filter(|g| g.expires_at > now).count() + } } + diff --git a/crates/ente-zero/src/graph/mod.rs b/crates/ente-zero/src/graph/mod.rs index 2b99a75..9582ab7 100644 --- a/crates/ente-zero/src/graph/mod.rs +++ b/crates/ente-zero/src/graph/mod.rs @@ -66,8 +66,14 @@ pub(in crate::graph) struct GrantedCapability { pub cap: Capability, pub provider: Ulid, pub holder: Ulid, + /// Instante en el que el grant deja de ser válido. El garbage collector + /// del cerebro purga grants con `Instant::now() > expires_at`. + pub expires_at: std::time::Instant, } +/// TTL default para nuevos grants. Configurable por bus en el futuro. +pub const DEFAULT_GRANT_TTL: std::time::Duration = std::time::Duration::from_secs(60); + impl EnteGraph { pub fn new(mut seed: EntityCard) -> Self { // Extraemos genesis antes de almacenar la Semilla — evita duplicación diff --git a/crates/ente-zero/src/main.rs b/crates/ente-zero/src/main.rs index 17dbf2e..1367042 100644 --- a/crates/ente-zero/src/main.rs +++ b/crates/ente-zero/src/main.rs @@ -237,6 +237,10 @@ async fn primordial_loop( }; tokio::pin!(dev_exit); + // GC de capability grants expirados — corre cada 10 segundos. + let mut grant_purge = tokio::time::interval(Duration::from_secs(10)); + grant_purge.tick().await; // descartar primer tick inmediato + loop { tokio::select! { biased; @@ -258,6 +262,13 @@ async fn primordial_loop( } } + _ = grant_purge.tick() => { + let n = graph.purge_expired_grants(); + if n > 0 { + info!(purged = n, active = graph.active_grants_count(), "GC capability grants"); + } + } + _ = async { dev_exit.as_mut().as_pin_mut().unwrap().await }, if dev_mode => { info!("dev mode: timer expirado, cerrando bucle primordial"); let _ = graph_tx.send(GraphEvent::Shutdown {