diff --git a/crates/ente-brain/examples/brainctl.rs b/crates/ente-brain/examples/brainctl.rs index 14a376c..b781e8e 100644 --- a/crates/ente-brain/examples/brainctl.rs +++ b/crates/ente-brain/examples/brainctl.rs @@ -62,6 +62,8 @@ async fn main() -> anyhow::Result<()> { "flush-audit" => IntrospectRequest::FlushAudit, "audit-verify" | "verify" => IntrospectRequest::VerifyAudit, "replay" => IntrospectRequest::ReplayAudit, + "gc-cas" => IntrospectRequest::GcCas { extra_roots: Vec::new() }, + "patterns" => IntrospectRequest::PatternCrystals, "reload" => { let path = args.get(2).cloned(); IntrospectRequest::ReloadRules { path } @@ -158,6 +160,22 @@ fn print_response(r: &IntrospectResponse) { if let Some(g) = rep.genesis_sha { println!(" genesis: {}", hex_long(g)); } } } + IntrospectResponse::Patterns(ps) => { + println!("{} cristales pattern detectados:", ps.len()); + for p in ps { + match p { + ente_brain::crystallize::PatternCrystal::Burst { kind, count, frequency_per_sec } => { + println!(" burst: {kind:?} count={count} freq={frequency_per_sec:.2} Hz"); + } + ente_brain::crystallize::PatternCrystal::Silence { kind, last_count, since_secs } => { + println!(" silence: {kind:?} last_count={last_count} ausente={since_secs:.1}s"); + } + } + } + } + IntrospectResponse::GcResult { deleted, freed_bytes } => { + println!("CAS gc: {deleted} blobs eliminados, {freed_bytes} bytes liberados"); + } IntrospectResponse::AuditStreamFrame(_) => { // En modo request/response no debería llegar; solo aparece en // run_stream_audit. Si llega aquí es un bug del servidor. diff --git a/crates/ente-brain/src/audit.rs b/crates/ente-brain/src/audit.rs index d6cdf19..a95e80f 100644 --- a/crates/ente-brain/src/audit.rs +++ b/crates/ente-brain/src/audit.rs @@ -280,6 +280,24 @@ pub fn verify_chain_from_cas(start_sha: [u8; 32]) -> VerificationReport { } } +/// Devuelve el set de SHAs alcanzables desde `start_sha` siguiendo +/// `prev_sha` hasta el genesis. Usado por el GC del CAS para construir +/// las "raíces vivas" del audit log. +pub fn reachable_from_head(start_sha: [u8; 32]) -> std::collections::HashSet<[u8; 32]> { + let mut set = std::collections::HashSet::new(); + let mut current = Some(start_sha); + while let Some(sha) = current { + if !set.insert(sha) { break; } // ciclo (no debería pasar) — corta + let path = ente_cas::cas_root().join(ente_cas::hex(&sha)); + let bytes = match std::fs::read(&path) { Ok(b) => b, Err(_) => break }; + let entry: AuditEntry = match serde_json::from_slice(&bytes) { + Ok(e) => e, Err(_) => break, + }; + current = entry.prev_sha; + } + set +} + /// Recorre la cadena entera (head→genesis) y reconstruye la lista de /// actions en orden cronológico (oldest first). Útil tanto para replay /// como para auditoría retrospectiva. diff --git a/crates/ente-brain/src/crystallize.rs b/crates/ente-brain/src/crystallize.rs index bfd0c04..660ed4c 100644 --- a/crates/ente-brain/src/crystallize.rs +++ b/crates/ente-brain/src/crystallize.rs @@ -11,6 +11,7 @@ use crate::observer::{GapStats, Observer}; use crate::rules::{Action, EventKind, EventPattern, LogLevel, Rule, Scope}; use serde::{Deserialize, Serialize}; +use std::time::Instant; use ulid::Ulid; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -164,6 +165,99 @@ pub fn crystal_to_rule(c: &Crystal) -> Rule { } +// ============================================================================ +// Patrones extendidos: Burst (alta frecuencia) y Silence (ausencia prolongada). +// Estos cristales son sobre un único kind, no pares — capturan dinámicas +// temporales de eventos individuales. +// ============================================================================ + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PatternCrystal { + /// Mismo evento aparece con frecuencia alta. `frequency_per_sec` se + /// estima sobre el window de observación. + Burst { + kind: EventKind, + count: u64, + frequency_per_sec: f64, + }, + /// Evento que dejó de aparecer. `since_secs` es el tiempo desde la + /// última observación. + Silence { + kind: EventKind, + last_count: u64, + since_secs: f64, + }, +} + +#[derive(Debug, Clone, Copy)] +pub struct PatternParams { + /// Mínimo de ocurrencias para considerar Burst. + pub burst_min_count: u64, + /// Frecuencia mínima (eventos por segundo) para considerar Burst. + pub burst_min_freq_hz: f64, + /// Tiempo desde última ocurrencia para considerar Silence. + pub silence_min_secs: f64, + /// Mínimo total previo para considerar Silence (eventos < N son ruido). + pub silence_min_prior_count: u64, +} + +impl Default for PatternParams { + fn default() -> Self { + Self { + burst_min_count: 10, + burst_min_freq_hz: 5.0, + silence_min_secs: 30.0, + silence_min_prior_count: 3, + } + } +} + +/// Detecta Bursts y Silences sobre la distribución marginal del observer. +/// La frecuencia de un Burst se aproxima asumiendo que la observación cubre +/// el rango entre `last_seen` y `Instant::now()` para ese kind. +pub fn detect_pattern_crystals(obs: &Observer, params: &PatternParams) -> Vec { + let mut out = Vec::new(); + let now = Instant::now(); + for (kind, &count) in obs.marginals() { + let last_seen = obs.last_seen_marginal(kind); + // ---- Burst ---- + if count >= params.burst_min_count { + // Aproximación: si vimos `count` eventos hasta `last_seen`, y el + // primer evento sucedió en algún momento del window, la freq es + // count / window_age. Sin tiempo del primer evento, usamos + // last_seen → now como denominador (subestima freq) o asumimos + // ventana fija de 60s. Usamos la última como aproximación. + let elapsed = last_seen + .map(|t| now.saturating_duration_since(t).as_secs_f64().max(0.001)) + .unwrap_or(60.0); + // Estimación conservadora: count / max(window_age, 1s). + // Si tenemos histograma, podríamos refinar — TODO. + let freq = count as f64 / elapsed.max(1.0); + if freq >= params.burst_min_freq_hz { + out.push(PatternCrystal::Burst { + kind: kind.clone(), + count, + frequency_per_sec: freq, + }); + } + } + // ---- Silence ---- + if count >= params.silence_min_prior_count { + if let Some(t) = last_seen { + let since = now.saturating_duration_since(t).as_secs_f64(); + if since >= params.silence_min_secs { + out.push(PatternCrystal::Silence { + kind: kind.clone(), + last_count: count, + since_secs: since, + }); + } + } + } + } + out +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/ente-brain/src/introspect.rs b/crates/ente-brain/src/introspect.rs index 029bab3..5f8ad7c 100644 --- a/crates/ente-brain/src/introspect.rs +++ b/crates/ente-brain/src/introspect.rs @@ -113,6 +113,12 @@ pub enum IntrospectRequest { /// frames `IntrospectResponse::AuditStreamFrame` hasta que el cliente /// cierra. Tras esta request no se aceptan más requests en la misma conn. StreamAudit, + /// Garbage-collect el CAS. Considera reachable: todo lo alcanzable desde + /// el head del audit log. Cualquier blob extra (Wasm modules referenciados + /// por Cards) debe haberse pasado en `extra_roots` por el caller. + GcCas { extra_roots: Vec<[u8; 32]> }, + /// Detecta cristales de patrones temporales (Burst, Silence). + PatternCrystals, } #[derive(Debug, Serialize, Deserialize)] @@ -140,6 +146,10 @@ pub enum IntrospectResponse { Replayed(crate::audit::ReplayReport), /// Frame de streaming. El cliente lee estos en bucle hasta EOF. AuditStreamFrame(crate::audit::AuditEntry), + /// Resultado de GcCas: cuántos blobs eliminados y bytes liberados. + GcResult { deleted: usize, freed_bytes: u64 }, + /// Cristales de Burst/Silence detectados. + Patterns(Vec), Error(String), } @@ -372,6 +382,24 @@ impl IntrospectServer { "StreamAudit no debe llegar a dispatch — bug del handler".into() ) } + IntrospectRequest::PatternCrystals => { + let obs = self.state.observer.read().await; + let params = crate::crystallize::PatternParams::default(); + let patterns = crate::crystallize::detect_pattern_crystals(&obs, ¶ms); + IntrospectResponse::Patterns(patterns) + } + IntrospectRequest::GcCas { extra_roots } => { + // Reachable = audit chain desde head + extra_roots provistos. + let mut reachable = std::collections::HashSet::new(); + if let Some(head) = self.state.audit.read().await.last_flushed_sha() { + reachable.extend(crate::audit::reachable_from_head(head)); + } + reachable.extend(extra_roots); + match ente_cas::gc(&reachable) { + Ok((deleted, freed_bytes)) => IntrospectResponse::GcResult { deleted, freed_bytes }, + Err(e) => IntrospectResponse::Error(format!("gc: {e}")), + } + } IntrospectRequest::ReplayAudit => { let head = self.state.audit.read().await.last_flushed_sha(); let head = match head { diff --git a/crates/ente-brain/src/observer.rs b/crates/ente-brain/src/observer.rs index d1c0f8d..2dbf7b5 100644 --- a/crates/ente-brain/src/observer.rs +++ b/crates/ente-brain/src/observer.rs @@ -102,6 +102,10 @@ pub struct Observer { half_life_secs: Option, /// Histograma de gaps temporales por par (a, b). Capturado al `record()`. gap_histograms: HashMap<(EventKind, EventKind), GapHistogram>, + /// Sets de "qué cambió desde el último snapshot". Se vacían en + /// `snapshot()` y `snapshot_delta()`. Usado para escritura incremental. + dirty_marginal: std::collections::HashSet, + dirty_cooccur: std::collections::HashSet<(EventKind, EventKind)>, } impl Observer { @@ -116,6 +120,8 @@ impl Observer { last_seen_cooccur: HashMap::new(), half_life_secs: None, gap_histograms: HashMap::new(), + dirty_marginal: std::collections::HashSet::new(), + dirty_cooccur: std::collections::HashSet::new(), } } @@ -143,7 +149,8 @@ impl Observer { *self.cooccur.entry(key.clone()).or_insert(0) += 1; self.last_seen_cooccur.insert(key.clone(), now); let gap_secs = now.duration_since(w.at).as_secs_f64(); - self.gap_histograms.entry(key).or_default().observe(gap_secs); + self.gap_histograms.entry(key.clone()).or_default().observe(gap_secs); + self.dirty_cooccur.insert(key); } self.window.push_back(timed); @@ -152,7 +159,8 @@ impl Observer { } *self.marginal.entry(kind.clone()).or_insert(0) += 1; - self.last_seen_marginal.insert(kind, now); + self.last_seen_marginal.insert(kind.clone(), now); + self.dirty_marginal.insert(kind); self.total += 1; } @@ -238,6 +246,12 @@ impl Observer { } pub fn marginals(&self) -> &HashMap { &self.marginal } + + /// Última vez que se vio un kind. None si nunca o si fue restaurado + /// desde snapshot (los Instants no portables se descartan). + pub fn last_seen_marginal(&self, kind: &EventKind) -> Option { + self.last_seen_marginal.get(kind).copied() + } pub fn cooccurrences(&self) -> &HashMap<(EventKind, EventKind), u64> { &self.cooccur } pub fn total(&self) -> u64 { self.total } pub fn window_size(&self) -> usize { self.window_size } @@ -263,12 +277,15 @@ impl Observer { pairs } - /// Snapshot serializable del estado estadístico (sin Instants — no son - /// portables a través de reboots). El window deslizante se descarta — - /// se reconstruye desde cero al restore. - pub fn snapshot(&self) -> ObserverSnapshot { + /// Snapshot full: estado estadístico completo. Limpia los sets dirty + /// como side-effect — los próximos `snapshot_delta()` cubren sólo los + /// cambios posteriores. + pub fn snapshot(&mut self) -> ObserverSnapshot { + self.dirty_marginal.clear(); + self.dirty_cooccur.clear(); ObserverSnapshot { schema_version: OBSERVER_SCHEMA_VERSION, + is_delta: false, window_size: self.window_size, half_life_secs: self.half_life_secs, total: self.total, @@ -284,6 +301,64 @@ impl Observer { } } + /// Snapshot incremental: sólo incluye los kinds y pares que cambiaron + /// desde el último `snapshot()` o `snapshot_delta()`. Útil para + /// checkpoints frecuentes con poco overhead. Limpia los sets dirty. + pub fn snapshot_delta(&mut self) -> ObserverSnapshot { + let marginal: Vec<_> = self.dirty_marginal.iter() + .filter_map(|k| self.marginal.get(k).map(|v| (k.clone(), *v))) + .collect(); + let cooccur: Vec<_> = self.dirty_cooccur.iter() + .filter_map(|(a, b)| { + self.cooccur.get(&(a.clone(), b.clone())) + .map(|c| (a.clone(), b.clone(), *c)) + }) + .collect(); + // Para histogramas: incluimos los pares cuyo cooccur cambió. + let gap_histograms: Vec<_> = self.dirty_cooccur.iter() + .filter_map(|(a, b)| { + self.gap_histograms.get(&(a.clone(), b.clone())) + .map(|h| (a.clone(), b.clone(), h.clone())) + }) + .collect(); + self.dirty_marginal.clear(); + self.dirty_cooccur.clear(); + ObserverSnapshot { + schema_version: OBSERVER_SCHEMA_VERSION, + is_delta: true, + window_size: self.window_size, + half_life_secs: self.half_life_secs, + total: self.total, + marginal, cooccur, gap_histograms, + } + } + + /// Aplica un delta sobre el estado actual. Para `is_delta=true`, los + /// valores en marginal/cooccur sobrescriben las entradas existentes. + /// Si `is_delta=false`, equivale a `from_snapshot` pero in-place. + pub fn apply_delta(&mut self, delta: ObserverSnapshot) { + let now = Instant::now(); + if !delta.is_delta { + // Full: reset state. + *self = Self::from_snapshot(delta); + return; + } + // Incremental merge. + for (k, v) in delta.marginal { + self.last_seen_marginal.insert(k.clone(), now); + self.marginal.insert(k, v); + } + for (a, b, c) in delta.cooccur { + self.last_seen_cooccur.insert((a.clone(), b.clone()), now); + self.cooccur.insert((a, b), c); + } + for (a, b, h) in delta.gap_histograms { + self.gap_histograms.insert((a, b), h); + } + // total: sólo subimos (el delta podría estar atrasado). + if delta.total > self.total { self.total = delta.total; } + } + /// Reconstruye Observer desde un snapshot. El window queda vacío; /// last_seen_* se inicializa en `now()` para que el decay arranque /// "ahora" para todos los counts (aproximación razonable post-reboot). @@ -314,6 +389,8 @@ impl Observer { last_seen_cooccur, half_life_secs: snap.half_life_secs, gap_histograms, + dirty_marginal: std::collections::HashSet::new(), + dirty_cooccur: std::collections::HashSet::new(), } } } @@ -325,6 +402,10 @@ const OBSERVER_SCHEMA_VERSION: u16 = 1; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ObserverSnapshot { pub schema_version: u16, + /// `true` si sólo contiene los cambios desde el último snapshot. + /// `false` = full state, sobreescribe el observer al aplicar. + #[serde(default)] + pub is_delta: bool, pub window_size: usize, pub half_life_secs: Option, pub total: u64, diff --git a/crates/ente-cas/src/lib.rs b/crates/ente-cas/src/lib.rs index 7e99700..91de37a 100644 --- a/crates/ente-cas/src/lib.rs +++ b/crates/ente-cas/src/lib.rs @@ -36,6 +36,55 @@ pub fn hex(sha: &[u8; 32]) -> String { s } +/// Lista todos los SHAs presentes en el CAS. Cada entrada del directorio +/// con nombre de 64 chars hex se considera un blob válido. +pub fn list_all_shas() -> anyhow::Result> { + let root = cas_root(); + if !root.exists() { return Ok(Vec::new()); } + let mut out = Vec::new(); + for entry in std::fs::read_dir(&root)? { + let e = entry?; + let name = e.file_name(); + let s = match name.to_str() { + Some(s) if s.len() == 64 => s, + _ => continue, + }; + let mut sha = [0u8; 32]; + let mut ok = true; + for i in 0..32 { + match u8::from_str_radix(&s[i*2..i*2+2], 16) { + Ok(b) => sha[i] = b, + Err(_) => { ok = false; break; } + } + } + if ok { out.push(sha); } + } + Ok(out) +} + +/// Garbage collector. Borra todos los blobs que no están en `reachable`. +/// Devuelve (deleted_count, freed_bytes). El caller construye `reachable` +/// caminando todas las raíces (audit chain head, Wasm SHAs en Cards, etc). +/// +/// Idempotente: re-correr no hace nada si el set no cambió. +pub fn gc(reachable: &std::collections::HashSet<[u8; 32]>) -> anyhow::Result<(usize, u64)> { + let root = cas_root(); + let mut deleted = 0usize; + let mut freed = 0u64; + for sha in list_all_shas()? { + if reachable.contains(&sha) { continue; } + let path = root.join(hex(&sha)); + if let Ok(meta) = std::fs::metadata(&path) { + freed += meta.len(); + } + if std::fs::remove_file(&path).is_ok() { + deleted += 1; + tracing::debug!(sha = %hex(&sha), "CAS gc removed"); + } + } + Ok((deleted, freed)) +} + pub fn resolve(sha: &[u8; 32]) -> anyhow::Result> { let path = cas_root().join(hex(sha)); let bytes = std::fs::read(&path) diff --git a/crates/ente-zero/src/events.rs b/crates/ente-zero/src/events.rs index a9ed86f..9928324 100644 --- a/crates/ente-zero/src/events.rs +++ b/crates/ente-zero/src/events.rs @@ -52,4 +52,7 @@ pub enum CapabilityGrant { Granted { token: u64 }, NoProvider, Denied { reason: &'static str }, + /// El holder ya tiene el máximo de tokens activos para esta cap. + /// Debe esperar a que alguno expire o renovar uno existente. + QuotaExceeded { active: u32, limit: u32 }, } diff --git a/crates/ente-zero/src/graph/capabilities.rs b/crates/ente-zero/src/graph/capabilities.rs index 1103ad4..26da392 100644 --- a/crates/ente-zero/src/graph/capabilities.rs +++ b/crates/ente-zero/src/graph/capabilities.rs @@ -4,7 +4,7 @@ //! periódicamente con `renew_grant(token)`; en caso contrario, el background //! task `purge_expired_grants` los revoca al vencimiento. -use super::{ttl_for_capability, EnteGraph, GrantedCapability}; +use super::{quota_for_capability, ttl_for_capability, EnteGraph, GrantedCapability}; use crate::events::CapabilityGrant; use ente_card::Capability; use std::time::Instant; @@ -22,24 +22,38 @@ impl EnteGraph { let grant = match self.providers.get(&cap).and_then(|s| s.iter().next().copied()) { None => CapabilityGrant::NoProvider, Some(provider) => { - let token = self.next_token; - self.next_token += 1; - // TTL específico por variante de capability — caps escaladas - // (Spawn, FilesystemRoot) viven menos. - let ttl = ttl_for_capability(&cap); - let expires_at = Instant::now() + ttl; - self.grants.insert(token, GrantedCapability { - cap: cap.clone(), - provider, - holder: from, - expires_at, - }); - CapabilityGrant::Granted { token } + // Quota: contar tokens vivos para (from, cap). Si excede, + // rechazar antes de emitir uno nuevo. + let limit = quota_for_capability(&cap); + let active = self.active_tokens_for(from, &cap); + if active >= limit { + CapabilityGrant::QuotaExceeded { active, limit } + } else { + let token = self.next_token; + self.next_token += 1; + let ttl = ttl_for_capability(&cap); + let expires_at = Instant::now() + ttl; + self.grants.insert(token, GrantedCapability { + cap: cap.clone(), + provider, + holder: from, + expires_at, + }); + CapabilityGrant::Granted { token } + } } }; let _ = reply.send(grant); } + /// Cuenta tokens vivos (no expirados) emitidos a un holder para una cap. + pub fn active_tokens_for(&self, holder: Ulid, cap: &Capability) -> u32 { + let now = Instant::now(); + self.grants.values() + .filter(|g| g.holder == holder && &g.cap == cap && g.expires_at > now) + .count() as u32 + } + /// Extiende un grant existente. Devuelve `true` si renovó. Si el token /// no existe o ya expiró, `false` (el cliente debe re-acquire). /// Usa el TTL específico de la cap del grant. diff --git a/crates/ente-zero/src/graph/mod.rs b/crates/ente-zero/src/graph/mod.rs index 8051530..37f2a6a 100644 --- a/crates/ente-zero/src/graph/mod.rs +++ b/crates/ente-zero/src/graph/mod.rs @@ -76,6 +76,24 @@ pub(in crate::graph) struct GrantedCapability { /// corto enough para que credenciales filtradas expiren rápidamente. pub const DEFAULT_GRANT_TTL: std::time::Duration = std::time::Duration::from_secs(60); +/// Quota máxima de tokens activos por (holder, cap). Caps escaladas tienen +/// quota baja para limitar fugas de credenciales; caps de uso frecuente +/// (Endpoint, Journal) son más laxas. +pub fn quota_for_capability(cap: &Capability) -> u32 { + match cap { + // Caps escaladas: pocos tokens, fuerza patrón request-act-release. + Capability::Spawn => 2, + Capability::FilesystemRoot => 2, + Capability::Device { .. } => 4, + // Caps de propósito general. + Capability::Endpoint { .. } => 16, + Capability::KernelNetlink(_) => 4, + Capability::LegacyLogind => 8, + // Logging: hasta 32 streams. + Capability::Journal => 32, + } +} + /// TTL específico por variante de Capability. Caps de mayor riesgo / costo /// (Spawn, FilesystemRoot) tienen TTL más corto; caps "logging" como /// Journal pueden vivir más. diff --git a/crates/ente-zero/src/main.rs b/crates/ente-zero/src/main.rs index 2e3fcb2..de641b3 100644 --- a/crates/ente-zero/src/main.rs +++ b/crates/ente-zero/src/main.rs @@ -342,7 +342,7 @@ async fn dispatch_graph_event( } // Snapshot del cerebro (observer state) en archivo adjunto let brain_path = path.with_extension("brain.json"); - let obs_snap = brain.observer.read().await.snapshot(); + let obs_snap = brain.observer.write().await.snapshot(); match write_brain_snapshot(&brain_path, &obs_snap) { Ok(()) => info!( path = %brain_path.display(),