diff --git a/crates/ente-brain/src/audit.rs b/crates/ente-brain/src/audit.rs index 680c9a8..d6cdf19 100644 --- a/crates/ente-brain/src/audit.rs +++ b/crates/ente-brain/src/audit.rs @@ -48,6 +48,8 @@ pub struct AuditLog { /// Subscribers a entries en tiempo real. Cada `append` empuja a todos. /// Subscribers cuyo receiver se dropeó se purgan en el siguiente push. subscribers: Vec>, + /// Wall-clock del último flush exitoso a CAS. None si aún no se flush. + last_flush_at_ms: Option, } impl AuditLog { @@ -64,6 +66,7 @@ impl AuditLog { last_flushed_sha: None, head_pointer_path: None, subscribers: Vec::new(), + last_flush_at_ms: None, } } @@ -147,6 +150,9 @@ impl AuditLog { } self.flushed_count += written as u64; self.last_flushed_sha = last_sha; + if written > 0 { + self.last_flush_at_ms = Some(now_ms()); + } // Persistir head pointer si está configurado. if let (Some(path), Some(sha)) = (&self.head_pointer_path, last_sha) { let pointer = AuditHeadPointer { @@ -167,6 +173,14 @@ impl AuditLog { pub fn flushed_count(&self) -> u64 { self.flushed_count } pub fn last_flushed_sha(&self) -> Option<[u8; 32]> { self.last_flushed_sha } + pub fn last_flush_at_ms(&self) -> Option { self.last_flush_at_ms } + + /// Segundos transcurridos desde el último flush. None si nunca se flush. + pub fn last_flush_age_secs(&self) -> Option { + let then = self.last_flush_at_ms?; + let now = now_ms(); + Some((now.saturating_sub(then)) as f64 / 1000.0) + } } /// Pointer al head del audit log — escrito atómicamente en disco tras cada @@ -387,4 +401,132 @@ mod tests { // El primer seq superviviente debe ser 2. assert_eq!(log.recent(0).next().unwrap().seq, 2); } + + // ---------- Tests de integración con CAS real (en directorio temporal) ---------- + + use crate::engine::RuleEngine; + use std::sync::Mutex; + + /// Lock para serializar tests que mutan ENTE_CAS_ROOT (test threads + /// comparten env vars). Sin esto, dos tests en paralelo pisan el path. + static CAS_TEST_LOCK: Mutex<()> = Mutex::new(()); + + fn with_temp_cas(f: F) { + let _guard = CAS_TEST_LOCK.lock().unwrap(); + let dir = std::env::temp_dir().join(format!("ente-cas-test-{}", Ulid::new())); + std::env::set_var("ENTE_CAS_ROOT", &dir); + let _cleanup = scopeguard(&dir); + f(); + } + + fn scopeguard(dir: &std::path::Path) -> impl Drop + '_ { + struct G<'a>(&'a std::path::Path); + impl<'a> Drop for G<'a> { + fn drop(&mut self) { + std::env::remove_var("ENTE_CAS_ROOT"); + let _ = std::fs::remove_dir_all(self.0); + } + } + G(dir) + } + + fn dummy_crystal(ant: EventKind, con: EventKind) -> Crystal { + Crystal { + antecedent: ant, + consequent: con, + conditional_prob: 0.9, + pmi: 1.5, + support: 7, + gap_stats: None, + } + } + + use crate::rules::EventKind; + + #[test] + fn flush_round_trip_preserves_chain() { + with_temp_cas(|| { + let mut log = AuditLog::new(); + let id1 = Ulid::new(); + let id2 = Ulid::new(); + log.append(AuditAction::PromoteCrystal { + rule_id: id1, + crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied), + }); + log.append(AuditAction::PromoteCrystal { + rule_id: id2, + crystal: dummy_crystal(EventKind::BusAnnounce, EventKind::BusInvoke), + }); + log.append(AuditAction::RemoveRule { rule_id: id1 }); + + assert_eq!(log.flush_to_cas().unwrap(), 3); + let head = log.last_flushed_sha().expect("head set"); + let report = verify_chain_from_cas(head); + assert!(report.error.is_none(), "verification failed: {:?}", report.error); + assert_eq!(report.verified, 3); + }); + } + + #[test] + fn replay_reconstructs_engine_state() { + with_temp_cas(|| { + let mut log = AuditLog::new(); + let id1: Ulid = "01KQR3000000000000000000A1".parse().unwrap(); + let id2: Ulid = "01KQR3000000000000000000A2".parse().unwrap(); + let id3: Ulid = "01KQR3000000000000000000A3".parse().unwrap(); + log.append(AuditAction::PromoteCrystal { + rule_id: id1, + crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied), + }); + log.append(AuditAction::PromoteCrystal { + rule_id: id2, + crystal: dummy_crystal(EventKind::BusAnnounce, EventKind::BusInvoke), + }); + log.append(AuditAction::PromoteCrystal { + rule_id: id3, + crystal: dummy_crystal(EventKind::DeviceAdded, EventKind::DeviceRemoved), + }); + log.append(AuditAction::RemoveRule { rule_id: id2 }); + log.flush_to_cas().unwrap(); + let head = log.last_flushed_sha().unwrap(); + + let mut engine = RuleEngine::empty(); + let rep = replay_chain(head, &mut engine); + assert!(rep.error.is_none(), "replay error: {:?}", rep.error); + assert_eq!(rep.applied, 4); + assert_eq!(engine.len(), 2, "id2 should be removed, id1 + id3 remain"); + // Ulids preservados + let ids: Vec = engine.rules().map(|r| r.id).collect(); + assert!(ids.contains(&id1)); + assert!(!ids.contains(&id2)); + assert!(ids.contains(&id3)); + }); + } + + #[test] + fn replay_after_eviction_still_works() { + with_temp_cas(|| { + // Cap pequeño: la mayoría de entries se evictan de memoria pero + // siguen en CAS. Replay debe poder reconstruir desde CAS solo. + let mut log = AuditLog::with_cap(2); + let mut ids = Vec::new(); + for _ in 0..6 { + let id = Ulid::new(); + ids.push(id); + log.append(AuditAction::PromoteCrystal { + rule_id: id, + crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied), + }); + log.flush_to_cas().unwrap(); + } + assert_eq!(log.len(), 2, "cap eviction limita memoria"); + let head = log.last_flushed_sha().unwrap(); + + let mut engine = RuleEngine::empty(); + let rep = replay_chain(head, &mut engine); + assert!(rep.error.is_none()); + assert_eq!(rep.applied, 6); + assert_eq!(engine.len(), 6); + }); + } } diff --git a/crates/ente-brain/src/metrics.rs b/crates/ente-brain/src/metrics.rs index 10b9a29..ad44be2 100644 --- a/crates/ente-brain/src/metrics.rs +++ b/crates/ente-brain/src/metrics.rs @@ -58,6 +58,7 @@ async fn handle_scrape(mut stream: TcpStream, state: BrainState) -> anyhow::Resu async fn format_metrics(state: &BrainState) -> String { let obs = state.observer.read().await; let engine = state.engine.read().await; + let audit = state.audit.read().await; let mut out = String::with_capacity(2048); @@ -102,6 +103,35 @@ async fn format_metrics(state: &BrainState) -> String { out.push_str("# TYPE ente_brain_crystals_total gauge\n"); out.push_str(&format!("ente_brain_crystals_total {}\n", crystals.len())); + // ---- Audit log ---- + out.push_str("# HELP ente_brain_audit_chain_length Total entries persisted to CAS.\n"); + out.push_str("# TYPE ente_brain_audit_chain_length counter\n"); + out.push_str(&format!("ente_brain_audit_chain_length {}\n", audit.flushed_count())); + + out.push_str("# HELP ente_brain_audit_in_memory Entries currently in the in-memory ring.\n"); + out.push_str("# TYPE ente_brain_audit_in_memory gauge\n"); + out.push_str(&format!("ente_brain_audit_in_memory {}\n", audit.len())); + + out.push_str("# HELP ente_brain_audit_subscribers Active stream-audit subscribers.\n"); + out.push_str("# TYPE ente_brain_audit_subscribers gauge\n"); + out.push_str(&format!("ente_brain_audit_subscribers {}\n", audit.subscriber_count())); + + if let Some(age) = audit.last_flush_age_secs() { + out.push_str("# HELP ente_brain_audit_last_flush_age_seconds Time since last flush to CAS.\n"); + out.push_str("# TYPE ente_brain_audit_last_flush_age_seconds gauge\n"); + out.push_str(&format!("ente_brain_audit_last_flush_age_seconds {:.3}\n", age)); + } + if let Some(sha) = audit.last_flushed_sha() { + // Info-style metric con head sha como label. Útil para dashboards + // que quieran mostrar "current head". + out.push_str("# HELP ente_brain_audit_head_info Current head SHA of the audit chain.\n"); + out.push_str("# TYPE ente_brain_audit_head_info gauge\n"); + out.push_str(&format!( + "ente_brain_audit_head_info{{sha=\"{}\"}} 1\n", + ente_cas::hex(&sha) + )); + } + // ---- Histogramas de gaps temporales (top-32 pares más frecuentes) ---- out.push_str("# HELP ente_brain_pair_gap_seconds Time gap between correlated events.\n"); out.push_str("# TYPE ente_brain_pair_gap_seconds histogram\n"); diff --git a/crates/ente-brain/src/observer.rs b/crates/ente-brain/src/observer.rs index d7e5c20..d1c0f8d 100644 --- a/crates/ente-brain/src/observer.rs +++ b/crates/ente-brain/src/observer.rs @@ -22,7 +22,7 @@ pub struct TimedEvent { /// Histograma de gaps temporales con buckets exponenciales en segundos. /// Cubre 6 órdenes de magnitud: 1ms hasta 1000s. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] pub struct GapHistogram { /// Buckets cumulativos (Prometheus-style): cada índice cuenta eventos /// con gap ≤ ese límite. Limites: 1ms, 10ms, 100ms, 1s, 10s, 100s, 1000s. @@ -262,6 +262,78 @@ impl Observer { pairs.truncate(k); pairs } + + /// Snapshot serializable del estado estadístico (sin Instants — no son + /// portables a través de reboots). El window deslizante se descarta — + /// se reconstruye desde cero al restore. + pub fn snapshot(&self) -> ObserverSnapshot { + ObserverSnapshot { + schema_version: OBSERVER_SCHEMA_VERSION, + window_size: self.window_size, + half_life_secs: self.half_life_secs, + total: self.total, + marginal: self.marginal.iter() + .map(|(k, v)| (k.clone(), *v)) + .collect(), + cooccur: self.cooccur.iter() + .map(|((a, b), c)| (a.clone(), b.clone(), *c)) + .collect(), + gap_histograms: self.gap_histograms.iter() + .map(|((a, b), h)| (a.clone(), b.clone(), h.clone())) + .collect(), + } + } + + /// Reconstruye Observer desde un snapshot. El window queda vacío; + /// last_seen_* se inicializa en `now()` para que el decay arranque + /// "ahora" para todos los counts (aproximación razonable post-reboot). + pub fn from_snapshot(snap: ObserverSnapshot) -> Self { + let now = Instant::now(); + let mut marginal = HashMap::new(); + let mut last_seen_marginal = HashMap::new(); + for (k, v) in snap.marginal { + last_seen_marginal.insert(k.clone(), now); + marginal.insert(k, v); + } + let mut cooccur = HashMap::new(); + let mut last_seen_cooccur = HashMap::new(); + for (a, b, c) in snap.cooccur { + last_seen_cooccur.insert((a.clone(), b.clone()), now); + cooccur.insert((a, b), c); + } + let gap_histograms = snap.gap_histograms.into_iter() + .map(|(a, b, h)| ((a, b), h)) + .collect(); + Self { + window: VecDeque::with_capacity(snap.window_size), + window_size: snap.window_size, + marginal, + cooccur, + total: snap.total, + last_seen_marginal, + last_seen_cooccur, + half_life_secs: snap.half_life_secs, + gap_histograms, + } + } +} + +const OBSERVER_SCHEMA_VERSION: u16 = 1; + +/// Snapshot serializable. Se persiste a JSON en disco y se restaura al +/// reboot para preservar contadores, co-ocurrencias e histogramas. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ObserverSnapshot { + pub schema_version: u16, + pub window_size: usize, + pub half_life_secs: Option, + pub total: u64, + /// Marginales serializados como Vec porque HashMap usa + /// EventKind como key — y EventKind tiene variantes con payloads que + /// no son JSON-key-serializable (BusInvokeOf, Custom). + pub marginal: Vec<(EventKind, u64)>, + pub cooccur: Vec<(EventKind, EventKind, u64)>, + pub gap_histograms: Vec<(EventKind, EventKind, GapHistogram)>, } #[cfg(test)] diff --git a/crates/ente-zero/src/graph/capabilities.rs b/crates/ente-zero/src/graph/capabilities.rs index af449ad..1103ad4 100644 --- a/crates/ente-zero/src/graph/capabilities.rs +++ b/crates/ente-zero/src/graph/capabilities.rs @@ -4,7 +4,7 @@ //! periódicamente con `renew_grant(token)`; en caso contrario, el background //! task `purge_expired_grants` los revoca al vencimiento. -use super::{EnteGraph, GrantedCapability, DEFAULT_GRANT_TTL}; +use super::{ttl_for_capability, EnteGraph, GrantedCapability}; use crate::events::CapabilityGrant; use ente_card::Capability; use std::time::Instant; @@ -24,7 +24,10 @@ impl EnteGraph { Some(provider) => { let token = self.next_token; self.next_token += 1; - let expires_at = Instant::now() + DEFAULT_GRANT_TTL; + // TTL específico por variante de capability — caps escaladas + // (Spawn, FilesystemRoot) viven menos. + let ttl = ttl_for_capability(&cap); + let expires_at = Instant::now() + ttl; self.grants.insert(token, GrantedCapability { cap: cap.clone(), provider, @@ -39,11 +42,12 @@ impl EnteGraph { /// Extiende un grant existente. Devuelve `true` si renovó. Si el token /// no existe o ya expiró, `false` (el cliente debe re-acquire). + /// Usa el TTL específico de la cap del grant. pub fn renew_grant(&mut self, token: u64) -> bool { let now = Instant::now(); if let Some(g) = self.grants.get_mut(&token) { if g.expires_at > now { - g.expires_at = now + DEFAULT_GRANT_TTL; + g.expires_at = now + ttl_for_capability(&g.cap); return true; } // Expired — purgamos aquí mismo. diff --git a/crates/ente-zero/src/graph/mod.rs b/crates/ente-zero/src/graph/mod.rs index 9582ab7..8051530 100644 --- a/crates/ente-zero/src/graph/mod.rs +++ b/crates/ente-zero/src/graph/mod.rs @@ -71,9 +71,32 @@ pub(in crate::graph) struct GrantedCapability { pub expires_at: std::time::Instant, } -/// TTL default para nuevos grants. Configurable por bus en el futuro. +/// TTL default para grants cuando la cap no tiene override. 60s es un +/// compromiso: largo enough para evitar churn en patrones interactivos, +/// corto enough para que credenciales filtradas expiren rápidamente. pub const DEFAULT_GRANT_TTL: std::time::Duration = std::time::Duration::from_secs(60); +/// TTL específico por variante de Capability. Caps de mayor riesgo / costo +/// (Spawn, FilesystemRoot) tienen TTL más corto; caps "logging" como +/// Journal pueden vivir más. +/// +/// Cualquier cap no listada cae al `DEFAULT_GRANT_TTL`. +pub fn ttl_for_capability(cap: &Capability) -> std::time::Duration { + use std::time::Duration; + match cap { + // Caps escaladas: TTL corto para forzar renovación frecuente. + Capability::Spawn => Duration::from_secs(30), + Capability::FilesystemRoot => Duration::from_secs(30), + Capability::Device { .. } => Duration::from_secs(60), + // Caps de propósito general. + Capability::Endpoint { .. } => Duration::from_secs(300), // 5 min + Capability::KernelNetlink(_) => Duration::from_secs(300), + Capability::LegacyLogind => Duration::from_secs(300), + // Logging puede vivir mucho. + Capability::Journal => Duration::from_secs(3600), // 1h + } +} + impl EnteGraph { pub fn new(mut seed: EntityCard) -> Self { // Extraemos genesis antes de almacenar la Semilla — evita duplicación diff --git a/crates/ente-zero/src/main.rs b/crates/ente-zero/src/main.rs index 1367042..2e3fcb2 100644 --- a/crates/ente-zero/src/main.rs +++ b/crates/ente-zero/src/main.rs @@ -98,7 +98,7 @@ fn main() -> anyhow::Result<()> { rt.block_on(primordial_loop( card, dev_mode, - cli.checkpoint, cli.rules, cli.rules_out, + cli.checkpoint, cli.restore, cli.rules, cli.rules_out, cli.audit_head, cli.metrics_addr, cli.brain_half_life, cli.autopromote_secs, )) @@ -108,6 +108,7 @@ async fn primordial_loop( seed_card: ente_card::EntityCard, dev_mode: bool, checkpoint_path: Option, + restore_path: Option, rules_path: Option, rules_out: Option, audit_head: Option, @@ -175,6 +176,28 @@ async fn primordial_loop( }, ); } + + // Brain restore: si hay --restore , cargamos el snapshot adjunto + // .brain.json. Counters preservados across reboots. + if let Some(rpath) = &restore_path { + let brain_path = rpath.with_extension("brain.json"); + if brain_path.exists() { + match read_brain_snapshot(&brain_path) { + Ok(snap) => { + let total = snap.total; + let kinds = snap.marginal.len(); + let restored = ente_brain::Observer::from_snapshot(snap); + *brain.observer.write().await = restored; + info!( + path = %brain_path.display(), + total, kinds, + "brain snapshot restaurado" + ); + } + Err(e) => warn!(?e, path = %brain_path.display(), "brain snapshot read falló"), + } + } + } // Si --audit-head, configuramos el head pointer y arrancamos auto-flush. if let Some(head_path) = audit_head { // Re-creamos el AuditLog con head pointer. @@ -257,7 +280,7 @@ async fn primordial_loop( // Cerebro observa antes que el grafo mute. Snapshot del // SubjectInfo se hace contra el estado pre-mutación. feed_brain(&brain, &brain_sink, &graph, &evt).await; - if dispatch_graph_event(&mut graph, evt, &graph_tx, &checkpoint_path).await { + if dispatch_graph_event(&mut graph, evt, &graph_tx, &checkpoint_path, &brain).await { return Ok(()); } } @@ -285,6 +308,7 @@ async fn dispatch_graph_event( evt: GraphEvent, tx: &mpsc::Sender, checkpoint: &Option, + brain: &BrainState, ) -> bool { match evt { GraphEvent::EnteDied { id, status } => { @@ -310,11 +334,24 @@ async fn dispatch_graph_event( GraphEvent::Shutdown { reason } => { warn!(?reason, "shutdown del fractal"); if let Some(path) = checkpoint.as_ref() { + // Snapshot del grafo let snap = graph.snapshot(); match snap.write(path) { - Ok(()) => info!(path = %path.display(), entes = snap.entes.len(), "snapshot persistido"), + Ok(()) => info!(path = %path.display(), entes = snap.entes.len(), "snapshot fractal persistido"), Err(e) => warn!(?e, "snapshot write falló"), } + // Snapshot del cerebro (observer state) en archivo adjunto + let brain_path = path.with_extension("brain.json"); + let obs_snap = brain.observer.read().await.snapshot(); + match write_brain_snapshot(&brain_path, &obs_snap) { + Ok(()) => info!( + path = %brain_path.display(), + total = obs_snap.total, + kinds = obs_snap.marginal.len(), + "snapshot brain persistido" + ), + Err(e) => warn!(?e, "brain snapshot write falló"), + } } graph.cascade_shutdown().await; return true; @@ -382,6 +419,21 @@ fn spawn_echo_smoke_test(bus_path: PathBuf) { }); } +fn write_brain_snapshot(path: &std::path::Path, snap: &ente_brain::observer::ObserverSnapshot) -> anyhow::Result<()> { + let bytes = serde_json::to_vec_pretty(snap)?; + if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } + let tmp = path.with_extension("tmp"); + std::fs::write(&tmp, &bytes)?; + std::fs::rename(&tmp, path)?; + Ok(()) +} + +fn read_brain_snapshot(path: &std::path::Path) -> anyhow::Result { + let bytes = std::fs::read(path)?; + let snap: ente_brain::observer::ObserverSnapshot = serde_json::from_slice(&bytes)?; + Ok(snap) +} + fn init_tracing() { use tracing_subscriber::{fmt, EnvFilter}; let filter = EnvFilter::try_from_default_env()