From a4fa42c781d76852355931d82a28e80230855af8 Mon Sep 17 00:00:00 2001 From: Sergio Date: Sun, 3 May 2026 23:16:41 +0000 Subject: [PATCH] =?UTF-8?q?Audit=E2=86=92CAS,=20reload=20rules,=20time-dec?= =?UTF-8?q?ay=20y=20forma=20can=C3=B3nica=20del=20hash=20chain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AuditLog::flush_to_cas() persiste entries pendientes con bytes canónicos (sha=[0;32]) para que CAS-sha == entry.sha. AuditHeadPointer en disco tras cada flush — verificación posterior sin escanear el log entero. - IntrospectRequest::FlushAudit / ReloadRules. brainctl flush-audit / reload. - Auto-flush task cada 10s cuando --audit-head está configurado. - ReloadRules { path? } vacía engine + carga (.k vía kcl CLI o .json). - Observer con time-decay opcional: count * 0.5^(age/half_life). conditional_prob y pmi consumen valores decayed transparentemente. - --brain-half-life flag CLI. - KCL Rust SDK descartado: kcl-* en crates.io son del proyecto KittyCAD, no KusionStack. Subprocess al CLI sigue siendo la vía canónica. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/ente-brain/examples/brainctl.rs | 20 +++++- crates/ente-brain/src/audit.rs | 96 +++++++++++++++++++++++--- crates/ente-brain/src/introspect.rs | 49 +++++++++++++ crates/ente-brain/src/kcl_loader.rs | 14 +++- crates/ente-brain/src/observer.rs | 93 +++++++++++++++++++------ crates/ente-zero/src/main.rs | 50 +++++++++++++- 6 files changed, 282 insertions(+), 40 deletions(-) diff --git a/crates/ente-brain/examples/brainctl.rs b/crates/ente-brain/examples/brainctl.rs index 814c2c7..3e1fb52 100644 --- a/crates/ente-brain/examples/brainctl.rs +++ b/crates/ente-brain/examples/brainctl.rs @@ -51,9 +51,14 @@ async fn main() -> anyhow::Result<()> { let limit: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(20); IntrospectRequest::ListAudit { limit } } + "flush-audit" => IntrospectRequest::FlushAudit, + "reload" => { + let path = args.get(2).cloned(); + IntrospectRequest::ReloadRules { path } + } other => { eprintln!("subcomando desconocido: {other}"); - eprintln!("válidos: list-rules | entropy | top | crystals | crystal-kcl | promote | remove | audit "); + eprintln!("válidos: list-rules | entropy | top | crystals | crystal-kcl | promote | remove | audit | flush-audit | reload [path]"); std::process::exit(2); } }; @@ -116,6 +121,15 @@ fn print_response(r: &IntrospectResponse) { e.seq, e.timestamp_ms, prev, sha, e.action); } } + IntrospectResponse::Flushed { written, head_sha, total_flushed } => { + println!("flushed: {written} entries esta pasada, total acumulado: {total_flushed}"); + if let Some(sha) = head_sha { + println!("head sha: {}", hex_long(*sha)); + } + } + IntrospectResponse::Reloaded { count } => { + println!("reload OK: {count} reglas activas tras reload"); + } IntrospectResponse::Error(e) => eprintln!("error: {e}"), } } @@ -123,3 +137,7 @@ fn print_response(r: &IntrospectResponse) { fn hex_short(sha: [u8; 32]) -> String { sha[..4].iter().map(|b| format!("{:02x}", b)).collect::() + ".." } + +fn hex_long(sha: [u8; 32]) -> String { + sha.iter().map(|b| format!("{:02x}", b)).collect() +} diff --git a/crates/ente-brain/src/audit.rs b/crates/ente-brain/src/audit.rs index b38e53d..2686fe6 100644 --- a/crates/ente-brain/src/audit.rs +++ b/crates/ente-brain/src/audit.rs @@ -39,6 +39,12 @@ pub struct AuditLog { next_seq: u64, /// Cap del log en memoria. Entries más viejos se descartan tras flush. cap: usize, + /// Total acumulado de entries flusheadas a CAS. + flushed_count: u64, + /// SHA del último entry persistido a CAS — el "head pointer" del log. + last_flushed_sha: Option<[u8; 32]>, + /// Path opcional donde escribir el head pointer tras cada flush. + head_pointer_path: Option, } impl AuditLog { @@ -47,7 +53,19 @@ impl AuditLog { } pub fn with_cap(cap: usize) -> Self { - Self { entries: VecDeque::new(), next_seq: 0, cap } + Self { + entries: VecDeque::new(), + next_seq: 0, + cap, + flushed_count: 0, + last_flushed_sha: None, + head_pointer_path: None, + } + } + + pub fn with_head_pointer(mut self, path: std::path::PathBuf) -> Self { + self.head_pointer_path = Some(path); + self } /// Apendea una acción. Calcula el SHA encadenado contra el último entry. @@ -90,6 +108,60 @@ impl AuditLog { let sha = ente_cas::store(&bytes)?; Ok(sha) } + + /// Persiste TODOS los entries actuales al CAS y actualiza el head pointer. + /// Idempotente: re-flushar dos veces da los mismos SHAs (CAS dedup). + /// Devuelve cuántas entries se flushearon en esta pasada. + /// + /// Forma canónica: serializamos `entry` con `sha = [0; 32]` (formato + /// pre-hash). El CAS computa sha256 sobre esos bytes y devuelve un SHA + /// que por construcción coincide con `entry.sha` calculado al append. + pub fn flush_to_cas(&mut self) -> anyhow::Result { + let mut written = 0; + let mut last_sha = self.last_flushed_sha; + for entry in &self.entries { + if entry.seq < self.flushed_count { continue; } + let bytes = canonical_bytes(entry); + let sha = ente_cas::store(&bytes)?; + debug_assert_eq!(sha, entry.sha, + "CAS sha != entry.sha — fórmula canónica rota"); + last_sha = Some(sha); + written += 1; + } + self.flushed_count += written as u64; + self.last_flushed_sha = last_sha; + // Persistir head pointer si está configurado. + if let (Some(path), Some(sha)) = (&self.head_pointer_path, last_sha) { + let pointer = AuditHeadPointer { + last_seq: self.next_seq.saturating_sub(1), + last_sha: sha, + flushed_count: self.flushed_count, + timestamp_ms: now_ms(), + }; + let json = serde_json::to_vec_pretty(&pointer)?; + // Escritura atómica: tmp + rename + let tmp = path.with_extension("tmp"); + if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } + std::fs::write(&tmp, json)?; + std::fs::rename(&tmp, path)?; + } + Ok(written) + } + + pub fn flushed_count(&self) -> u64 { self.flushed_count } + pub fn last_flushed_sha(&self) -> Option<[u8; 32]> { self.last_flushed_sha } +} + +/// Pointer al head del audit log — escrito atómicamente en disco tras cada +/// flush. Permite verificar la integridad del log sin escanearlo entero: +/// el cliente lee el head, recupera el blob desde CAS, valida `prev_sha` +/// recursivamente hasta el genesis. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuditHeadPointer { + pub last_seq: u64, + pub last_sha: [u8; 32], + pub flushed_count: u64, + pub timestamp_ms: u64, } impl Default for AuditLog { @@ -103,20 +175,22 @@ fn now_ms() -> u64 { .unwrap_or(0) } -/// SHA256 sobre el entry serializado, EXCLUYENDO el campo sha mismo -/// (que está en cero al momento del cálculo). Determinístico vía postcard -/// para que la verificación sea reproducible. +/// SHA256 sobre el entry en forma canónica (sha=[0;32]). Hash y CAS storage +/// ven los mismos bytes, así que `ente_cas::store(canonical)` devuelve el +/// mismo SHA que `compute_sha(entry)`. fn compute_sha(entry: &AuditEntry) -> [u8; 32] { - let bytes = postcard_or_json(entry); + let bytes = canonical_bytes(entry); ente_cas::sha256_of(&bytes) } -fn postcard_or_json(entry: &AuditEntry) -> Vec { - // Preferimos postcard por estabilidad bit-a-bit. Fallback JSON si falla. - match postcard::to_stdvec(entry) { - Ok(b) => b, - Err(_) => serde_json::to_vec(entry).unwrap_or_default(), - } +/// Forma canónica: el entry serializado JSON con `sha = [0; 32]`. +/// JSON sin pretty-print es determinístico para nuestros tipos. +fn canonical_bytes(entry: &AuditEntry) -> Vec { + let canonical = AuditEntry { + sha: [0u8; 32], + ..entry.clone() + }; + serde_json::to_vec(&canonical).unwrap_or_default() } #[cfg(test)] diff --git a/crates/ente-brain/src/introspect.rs b/crates/ente-brain/src/introspect.rs index 21a2c32..e1c54f7 100644 --- a/crates/ente-brain/src/introspect.rs +++ b/crates/ente-brain/src/introspect.rs @@ -97,6 +97,12 @@ pub enum IntrospectRequest { RemoveRule { id: Ulid }, /// Lista las últimas N entradas del audit log. limit=0 = todas. ListAudit { limit: usize }, + /// Persiste todas las entries pendientes al CAS y actualiza el head + /// pointer si el log lo tiene configurado. + FlushAudit, + /// Recarga reglas desde el archivo configurado por --rules-out (o el + /// path provisto). Vacía el engine antes de cargar. + ReloadRules { path: Option }, } #[derive(Debug, Serialize, Deserialize)] @@ -114,6 +120,10 @@ pub enum IntrospectResponse { Removed(bool), /// Entradas del audit log (más recientes al final). AuditEntries(Vec), + /// Resultado de FlushAudit: cuántas entries se escribieron y SHA del head. + Flushed { written: usize, head_sha: Option<[u8; 32]>, total_flushed: u64 }, + /// Resultado de ReloadRules: número total de reglas tras el reload. + Reloaded { count: usize }, Error(String), } @@ -296,6 +306,45 @@ impl IntrospectServer { let audit = self.state.audit.read().await; IntrospectResponse::AuditEntries(audit.recent(limit).cloned().collect()) } + IntrospectRequest::FlushAudit => { + let mut audit = self.state.audit.write().await; + match audit.flush_to_cas() { + Ok(written) => IntrospectResponse::Flushed { + written, + head_sha: audit.last_flushed_sha(), + total_flushed: audit.flushed_count(), + }, + Err(e) => IntrospectResponse::Error(format!("flush_to_cas: {e}")), + } + } + IntrospectRequest::ReloadRules { path } => { + // Path explícito gana sobre el rules_out configurado. + let resolved = path.map(std::path::PathBuf::from) + .or_else(|| self.state.rules_out.as_ref().map(|p| p.as_path().to_path_buf())); + let path = match resolved { + Some(p) => p, + None => return IntrospectResponse::Error( + "ReloadRules sin path y sin rules_out configurado".into() + ), + }; + let rules = match crate::kcl_loader::load_rules_file(&path) { + Ok(r) => r, + Err(e) => return IntrospectResponse::Error(format!("load: {e}")), + }; + // Vaciamos el engine antes de re-cargar — semántica clean-slate. + let mut engine = self.state.engine.write().await; + *engine = crate::engine::RuleEngine::empty(); + let count = rules.len(); + for r in rules { engine.insert(r); } + drop(engine); + self.state.audit.write().await.append( + crate::audit::AuditAction::LoadRulesFile { + path: path.to_string_lossy().into_owned(), + count, + } + ); + IntrospectResponse::Reloaded { count } + } } } } diff --git a/crates/ente-brain/src/kcl_loader.rs b/crates/ente-brain/src/kcl_loader.rs index 35acac1..cc447a6 100644 --- a/crates/ente-brain/src/kcl_loader.rs +++ b/crates/ente-brain/src/kcl_loader.rs @@ -1,8 +1,16 @@ //! Loader de reglas desde archivos `.k` vía subprocess al CLI de KCL. //! -//! No usamos el SDK Rust de KCL para no arrastrar la dependencia de Go runtime -//! ni cgo. El CLI `kcl` produce JSON validado contra el schema declarado -//! en el propio `.k` — equivalente funcional al SDK con coste cero de compile. +//! ## ¿Por qué subprocess y no SDK Rust? +//! +//! El SDK Rust de KusionStack KCL (en el monorepo `kcl-lang/kcl`) no se +//! publica como crate independiente en crates.io. Los crates `kcl-*` que +//! sí están publicados (kcl-lib, kcl-api, etc.) pertenecen al proyecto +//! KittyCAD — un lenguaje CAD distinto pese al nombre. Verificado 2026-05. +//! +//! Subprocess al CLI `kcl` (instalable vía `go install kcl-lang.io/cli/cmd/kcl@latest` +//! o desde el release de GitHub) es funcionalmente equivalente al SDK: +//! produce JSON validado contra el schema KCL declarado, sin dependencia +//! de Go runtime en el binario final del fractal. //! //! Si `kcl` no está en PATH, el caller decide: cargar JSON crudo (skip KCL), //! o fallar el boot. diff --git a/crates/ente-brain/src/observer.rs b/crates/ente-brain/src/observer.rs index df303fb..21cf7b7 100644 --- a/crates/ente-brain/src/observer.rs +++ b/crates/ente-brain/src/observer.rs @@ -26,6 +26,13 @@ pub struct Observer { marginal: HashMap, cooccur: HashMap<(EventKind, EventKind), u64>, total: u64, + /// Last-seen timestamps para aplicar decay en query time. None = sin + /// time-decay (modo tradicional). + last_seen_marginal: HashMap, + last_seen_cooccur: HashMap<(EventKind, EventKind), Instant>, + /// Half-life del decay exponencial en segundos. None = sin decay + /// (las consultas devuelven los counts crudos). + half_life_secs: Option, } impl Observer { @@ -36,9 +43,23 @@ impl Observer { marginal: HashMap::new(), cooccur: HashMap::new(), total: 0, + last_seen_marginal: HashMap::new(), + last_seen_cooccur: HashMap::new(), + half_life_secs: None, } } + /// Activa decay exponencial con half-life en segundos. λ = ln(2)/half_life. + /// Aplicado en query time sobre los counts crudos usando last_seen. + pub fn with_half_life(mut self, half_life_secs: f64) -> Self { + if half_life_secs > 0.0 { + self.half_life_secs = Some(half_life_secs); + } + self + } + + pub fn half_life(&self) -> Option { self.half_life_secs } + /// Registra un evento. Actualiza marginales y co-ocurrencias contra todo /// evento aún en la ventana. pub fn record(&mut self, kind: EventKind) { @@ -47,9 +68,9 @@ impl Observer { // Co-ocurrencias: este evento con cada uno previo en ventana. for w in &self.window { - *self.cooccur - .entry((w.kind.clone(), kind.clone())) - .or_insert(0) += 1; + let key = (w.kind.clone(), kind.clone()); + *self.cooccur.entry(key.clone()).or_insert(0) += 1; + self.last_seen_cooccur.insert(key, now); } self.window.push_back(timed); @@ -57,10 +78,37 @@ impl Observer { self.window.pop_front(); } - *self.marginal.entry(kind).or_insert(0) += 1; + *self.marginal.entry(kind.clone()).or_insert(0) += 1; + self.last_seen_marginal.insert(kind, now); self.total += 1; } + /// Aplica el decay sobre un count crudo dado el `last_seen` correspondiente. + /// Si half_life es None, devuelve el count tal cual (sin decay). + fn decay(&self, count: u64, last_seen: Option) -> f64 { + let raw = count as f64; + let (hl, last) = match (self.half_life_secs, last_seen) { + (Some(hl), Some(t)) => (hl, t), + _ => return raw, + }; + let age_secs = Instant::now().duration_since(last).as_secs_f64(); + raw * 0.5_f64.powf(age_secs / hl) + } + + /// Marginal con decay aplicado. + pub fn marginal_decayed(&self, k: &EventKind) -> f64 { + let raw = self.marginal.get(k).copied().unwrap_or(0); + let last = self.last_seen_marginal.get(k).copied(); + self.decay(raw, last) + } + + /// Cooccurrence con decay aplicado. + pub fn cooccur_decayed(&self, a: &EventKind, b: &EventKind) -> f64 { + let raw = self.cooccur.get(&(a.clone(), b.clone())).copied().unwrap_or(0); + let last = self.last_seen_cooccur.get(&(a.clone(), b.clone())).copied(); + self.decay(raw, last) + } + /// Entropía de Shannon de la distribución marginal de eventos. /// H(X) = −Σ p(x) log₂ p(x). Unidad: bits. pub fn shannon_entropy(&self) -> f64 { @@ -77,32 +125,31 @@ impl Observer { /// P(b | a) = "dado que algo siguió a `a` dentro del window, qué fracción /// fue `b`". Suma 1 sobre todos los b posibles para un a fijo. /// - /// Implementación: cooccur(a, b) / Σ_x cooccur(a, x). Esto da una - /// probabilidad condicional propia [0, 1]. + /// Implementación: cooccur_decayed(a, b) / Σ_x cooccur_decayed(a, x). + /// Si half_life is None, los decayed values son los counts crudos. pub fn conditional_prob(&self, a: &EventKind, b: &EventKind) -> f64 { - let joint = self.cooccur - .get(&(a.clone(), b.clone())) - .copied() - .unwrap_or(0) as f64; - let row_total: u64 = self.cooccur.iter() - .filter_map(|((x, _), c)| if x == a { Some(*c) } else { None }) + let joint = self.cooccur_decayed(a, b); + let row_total: f64 = self.cooccur.keys() + .filter(|(x, _)| x == a) + .map(|(x, y)| self.cooccur_decayed(x, y)) .sum(); - if row_total == 0 { 0.0 } else { joint / row_total as f64 } + if row_total <= 0.0 { 0.0 } else { joint / row_total } } - /// Información mutua puntual entre `a` y `b`: + /// Información mutua puntual entre `a` y `b` con decay aplicado: /// PMI(a, b) = log₂( P(a, b) / (P(a) · P(b)) ). /// Positivo → más correlacionados de lo que sugiere independencia. pub fn pmi(&self, a: &EventKind, b: &EventKind) -> f64 { - if self.total == 0 { return 0.0; } - let total = self.total as f64; - let joint = self.cooccur - .get(&(a.clone(), b.clone())) - .copied() - .unwrap_or(0) as f64 / total; - let pa = self.marginal.get(a).copied().unwrap_or(0) as f64 / total; - let pb = self.marginal.get(b).copied().unwrap_or(0) as f64 / total; - if joint == 0.0 || pa == 0.0 || pb == 0.0 { return 0.0; } + // Total decayed: suma de marginales con decay (no usamos self.total + // directo porque debería ser consistente con los decayed values). + let total_decayed: f64 = self.marginal.keys() + .map(|k| self.marginal_decayed(k)) + .sum(); + if total_decayed <= 0.0 { return 0.0; } + let joint = self.cooccur_decayed(a, b) / total_decayed; + let pa = self.marginal_decayed(a) / total_decayed; + let pb = self.marginal_decayed(b) / total_decayed; + if joint <= 0.0 || pa <= 0.0 || pb <= 0.0 { return 0.0; } (joint / (pa * pb)).log2() } diff --git a/crates/ente-zero/src/main.rs b/crates/ente-zero/src/main.rs index 5b19cb6..eb8f677 100644 --- a/crates/ente-zero/src/main.rs +++ b/crates/ente-zero/src/main.rs @@ -39,7 +39,9 @@ struct CliArgs { restore: Option, rules: Option, rules_out: Option, + audit_head: Option, metrics_addr: Option, + brain_half_life: Option, } fn parse_args() -> CliArgs { @@ -48,18 +50,22 @@ fn parse_args() -> CliArgs { let mut restore = None; let mut rules = None; let mut rules_out = None; + let mut audit_head = None; let mut metrics_addr = None; + let mut brain_half_life = None; while let Some(a) = args.next() { match a.as_str() { "--checkpoint" => checkpoint = args.next().map(PathBuf::from), "--restore" => restore = args.next().map(PathBuf::from), "--rules" => rules = args.next().map(PathBuf::from), "--rules-out" => rules_out = args.next().map(PathBuf::from), + "--audit-head" => audit_head = args.next().map(PathBuf::from), "--metrics-addr" => metrics_addr = args.next(), + "--brain-half-life" => brain_half_life = args.next().and_then(|s| s.parse().ok()), other => warn!(arg = %other, "argumento desconocido, ignorado"), } } - CliArgs { checkpoint, restore, rules, rules_out, metrics_addr } + CliArgs { checkpoint, restore, rules, rules_out, audit_head, metrics_addr, brain_half_life } } fn main() -> anyhow::Result<()> { @@ -84,7 +90,11 @@ fn main() -> anyhow::Result<()> { .enable_time() .build()?; - rt.block_on(primordial_loop(card, dev_mode, cli.checkpoint, cli.rules, cli.rules_out, cli.metrics_addr)) + rt.block_on(primordial_loop( + card, dev_mode, + cli.checkpoint, cli.rules, cli.rules_out, + cli.audit_head, cli.metrics_addr, cli.brain_half_life, + )) } async fn primordial_loop( @@ -93,7 +103,9 @@ async fn primordial_loop( checkpoint_path: Option, rules_path: Option, rules_out: Option, + audit_head: Option, metrics_addr: Option, + brain_half_life: Option, ) -> anyhow::Result<()> { info!(seed_id = %seed_card.id, label = %seed_card.label, "Ente #0 entra al bucle primordial"); @@ -139,6 +151,21 @@ async fn primordial_loop( if let Some(out_path) = rules_out { brain = brain.with_rules_out(out_path); } + if let Some(hl) = brain_half_life { + let mut obs = brain.observer.write().await; + // Reemplazar con un observer nuevo que tenga half-life. Estado + // anterior (vacío en este punto) descartado. + *obs = ente_brain::Observer::new(1024).with_half_life(hl); + info!(hl_secs = hl, "observer con time-decay activo"); + } + // Si --audit-head, configuramos el head pointer y arrancamos auto-flush. + if let Some(head_path) = audit_head { + // Re-creamos el AuditLog con head pointer. + let new_audit = ente_brain::audit::AuditLog::new() + .with_head_pointer(head_path); + *brain.audit.write().await = new_audit; + spawn_audit_auto_flush(brain.clone()); + } // Carga inicial de reglas vía KCL o JSON, si --rules path proporcionado. if let Some(path) = &rules_path { @@ -343,6 +370,25 @@ fn brain_introspect_path() -> PathBuf { format!("{runtime}/ente-brain.sock").into() } +/// Auto-flush del audit log a CAS cada 10 segundos. Ejecuta best-effort: +/// si el flush falla lo logeamos pero no abortamos. La integridad del log +/// queda garantizada por su hash chain — re-flushar es idempotente. +fn spawn_audit_auto_flush(state: BrainState) { + tokio::spawn(async move { + let mut tick = tokio::time::interval(std::time::Duration::from_secs(10)); + tick.tick().await; // descartar primer tick inmediato + loop { + tick.tick().await; + let mut audit = state.audit.write().await; + match audit.flush_to_cas() { + Ok(0) => {} // nada nuevo + Ok(n) => info!(written = n, total = audit.flushed_count(), "audit auto-flush"), + Err(e) => warn!(?e, "audit auto-flush falló"), + } + } + }); +} + fn spawn_brain_introspect(state: BrainState) { let path = brain_introspect_path(); tokio::spawn(async move {