Tests audit replay, métricas chain, TTL por cap, brain snapshot

- 3 tests integración audit con CAS aislado por test (ENTE_CAS_ROOT en
  tempdir): flush_round_trip_preserves_chain, replay_reconstructs_engine_state,
  replay_after_eviction_still_works.
- AuditLog tracks last_flush_at_ms + subscriber_count. metrics expone:
  audit_chain_length, audit_in_memory, audit_subscribers,
  audit_last_flush_age_seconds, audit_head_info{sha=...}.
- ttl_for_capability() tabla per-variant: Spawn/FilesystemRoot 30s,
  Endpoint/KernelNetlink/LegacyLogind 5min, Journal 1h.
  mediate_capability y renew_grant consultan la tabla.
- ObserverSnapshot serializable (sin Instants — last_seen se anchora a
  now() al restore). Counters, cooccurrencias e histogramas persistidos.
  Snapshot adjunto al fractal: <checkpoint>.brain.json. --restore lo
  carga si existe.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-04 00:18:55 +00:00
parent ca75ba185f
commit 6aee9254d4
6 changed files with 331 additions and 8 deletions
+142
View File
@@ -48,6 +48,8 @@ pub struct AuditLog {
/// Subscribers a entries en tiempo real. Cada `append` empuja a todos.
/// Subscribers cuyo receiver se dropeó se purgan en el siguiente push.
subscribers: Vec<tokio::sync::mpsc::UnboundedSender<AuditEntry>>,
/// Wall-clock del último flush exitoso a CAS. None si aún no se flush.
last_flush_at_ms: Option<u64>,
}
impl AuditLog {
@@ -64,6 +66,7 @@ impl AuditLog {
last_flushed_sha: None,
head_pointer_path: None,
subscribers: Vec::new(),
last_flush_at_ms: None,
}
}
@@ -147,6 +150,9 @@ impl AuditLog {
}
self.flushed_count += written as u64;
self.last_flushed_sha = last_sha;
if written > 0 {
self.last_flush_at_ms = Some(now_ms());
}
// Persistir head pointer si está configurado.
if let (Some(path), Some(sha)) = (&self.head_pointer_path, last_sha) {
let pointer = AuditHeadPointer {
@@ -167,6 +173,14 @@ impl AuditLog {
pub fn flushed_count(&self) -> u64 { self.flushed_count }
pub fn last_flushed_sha(&self) -> Option<[u8; 32]> { self.last_flushed_sha }
pub fn last_flush_at_ms(&self) -> Option<u64> { self.last_flush_at_ms }
/// Segundos transcurridos desde el último flush. None si nunca se flush.
pub fn last_flush_age_secs(&self) -> Option<f64> {
let then = self.last_flush_at_ms?;
let now = now_ms();
Some((now.saturating_sub(then)) as f64 / 1000.0)
}
}
/// Pointer al head del audit log — escrito atómicamente en disco tras cada
@@ -387,4 +401,132 @@ mod tests {
// El primer seq superviviente debe ser 2.
assert_eq!(log.recent(0).next().unwrap().seq, 2);
}
// ---------- Tests de integración con CAS real (en directorio temporal) ----------
use crate::engine::RuleEngine;
use std::sync::Mutex;
/// Lock para serializar tests que mutan ENTE_CAS_ROOT (test threads
/// comparten env vars). Sin esto, dos tests en paralelo pisan el path.
static CAS_TEST_LOCK: Mutex<()> = Mutex::new(());
fn with_temp_cas<F: FnOnce()>(f: F) {
let _guard = CAS_TEST_LOCK.lock().unwrap();
let dir = std::env::temp_dir().join(format!("ente-cas-test-{}", Ulid::new()));
std::env::set_var("ENTE_CAS_ROOT", &dir);
let _cleanup = scopeguard(&dir);
f();
}
fn scopeguard(dir: &std::path::Path) -> impl Drop + '_ {
struct G<'a>(&'a std::path::Path);
impl<'a> Drop for G<'a> {
fn drop(&mut self) {
std::env::remove_var("ENTE_CAS_ROOT");
let _ = std::fs::remove_dir_all(self.0);
}
}
G(dir)
}
fn dummy_crystal(ant: EventKind, con: EventKind) -> Crystal {
Crystal {
antecedent: ant,
consequent: con,
conditional_prob: 0.9,
pmi: 1.5,
support: 7,
gap_stats: None,
}
}
use crate::rules::EventKind;
#[test]
fn flush_round_trip_preserves_chain() {
with_temp_cas(|| {
let mut log = AuditLog::new();
let id1 = Ulid::new();
let id2 = Ulid::new();
log.append(AuditAction::PromoteCrystal {
rule_id: id1,
crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied),
});
log.append(AuditAction::PromoteCrystal {
rule_id: id2,
crystal: dummy_crystal(EventKind::BusAnnounce, EventKind::BusInvoke),
});
log.append(AuditAction::RemoveRule { rule_id: id1 });
assert_eq!(log.flush_to_cas().unwrap(), 3);
let head = log.last_flushed_sha().expect("head set");
let report = verify_chain_from_cas(head);
assert!(report.error.is_none(), "verification failed: {:?}", report.error);
assert_eq!(report.verified, 3);
});
}
#[test]
fn replay_reconstructs_engine_state() {
with_temp_cas(|| {
let mut log = AuditLog::new();
let id1: Ulid = "01KQR3000000000000000000A1".parse().unwrap();
let id2: Ulid = "01KQR3000000000000000000A2".parse().unwrap();
let id3: Ulid = "01KQR3000000000000000000A3".parse().unwrap();
log.append(AuditAction::PromoteCrystal {
rule_id: id1,
crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied),
});
log.append(AuditAction::PromoteCrystal {
rule_id: id2,
crystal: dummy_crystal(EventKind::BusAnnounce, EventKind::BusInvoke),
});
log.append(AuditAction::PromoteCrystal {
rule_id: id3,
crystal: dummy_crystal(EventKind::DeviceAdded, EventKind::DeviceRemoved),
});
log.append(AuditAction::RemoveRule { rule_id: id2 });
log.flush_to_cas().unwrap();
let head = log.last_flushed_sha().unwrap();
let mut engine = RuleEngine::empty();
let rep = replay_chain(head, &mut engine);
assert!(rep.error.is_none(), "replay error: {:?}", rep.error);
assert_eq!(rep.applied, 4);
assert_eq!(engine.len(), 2, "id2 should be removed, id1 + id3 remain");
// Ulids preservados
let ids: Vec<Ulid> = engine.rules().map(|r| r.id).collect();
assert!(ids.contains(&id1));
assert!(!ids.contains(&id2));
assert!(ids.contains(&id3));
});
}
#[test]
fn replay_after_eviction_still_works() {
with_temp_cas(|| {
// Cap pequeño: la mayoría de entries se evictan de memoria pero
// siguen en CAS. Replay debe poder reconstruir desde CAS solo.
let mut log = AuditLog::with_cap(2);
let mut ids = Vec::new();
for _ in 0..6 {
let id = Ulid::new();
ids.push(id);
log.append(AuditAction::PromoteCrystal {
rule_id: id,
crystal: dummy_crystal(EventKind::EnteSpawned, EventKind::EnteDied),
});
log.flush_to_cas().unwrap();
}
assert_eq!(log.len(), 2, "cap eviction limita memoria");
let head = log.last_flushed_sha().unwrap();
let mut engine = RuleEngine::empty();
let rep = replay_chain(head, &mut engine);
assert!(rep.error.is_none());
assert_eq!(rep.applied, 6);
assert_eq!(engine.len(), 6);
});
}
}
+30
View File
@@ -58,6 +58,7 @@ async fn handle_scrape(mut stream: TcpStream, state: BrainState) -> anyhow::Resu
async fn format_metrics(state: &BrainState) -> String {
let obs = state.observer.read().await;
let engine = state.engine.read().await;
let audit = state.audit.read().await;
let mut out = String::with_capacity(2048);
@@ -102,6 +103,35 @@ async fn format_metrics(state: &BrainState) -> String {
out.push_str("# TYPE ente_brain_crystals_total gauge\n");
out.push_str(&format!("ente_brain_crystals_total {}\n", crystals.len()));
// ---- Audit log ----
out.push_str("# HELP ente_brain_audit_chain_length Total entries persisted to CAS.\n");
out.push_str("# TYPE ente_brain_audit_chain_length counter\n");
out.push_str(&format!("ente_brain_audit_chain_length {}\n", audit.flushed_count()));
out.push_str("# HELP ente_brain_audit_in_memory Entries currently in the in-memory ring.\n");
out.push_str("# TYPE ente_brain_audit_in_memory gauge\n");
out.push_str(&format!("ente_brain_audit_in_memory {}\n", audit.len()));
out.push_str("# HELP ente_brain_audit_subscribers Active stream-audit subscribers.\n");
out.push_str("# TYPE ente_brain_audit_subscribers gauge\n");
out.push_str(&format!("ente_brain_audit_subscribers {}\n", audit.subscriber_count()));
if let Some(age) = audit.last_flush_age_secs() {
out.push_str("# HELP ente_brain_audit_last_flush_age_seconds Time since last flush to CAS.\n");
out.push_str("# TYPE ente_brain_audit_last_flush_age_seconds gauge\n");
out.push_str(&format!("ente_brain_audit_last_flush_age_seconds {:.3}\n", age));
}
if let Some(sha) = audit.last_flushed_sha() {
// Info-style metric con head sha como label. Útil para dashboards
// que quieran mostrar "current head".
out.push_str("# HELP ente_brain_audit_head_info Current head SHA of the audit chain.\n");
out.push_str("# TYPE ente_brain_audit_head_info gauge\n");
out.push_str(&format!(
"ente_brain_audit_head_info{{sha=\"{}\"}} 1\n",
ente_cas::hex(&sha)
));
}
// ---- Histogramas de gaps temporales (top-32 pares más frecuentes) ----
out.push_str("# HELP ente_brain_pair_gap_seconds Time gap between correlated events.\n");
out.push_str("# TYPE ente_brain_pair_gap_seconds histogram\n");
+73 -1
View File
@@ -22,7 +22,7 @@ pub struct TimedEvent {
/// Histograma de gaps temporales con buckets exponenciales en segundos.
/// Cubre 6 órdenes de magnitud: 1ms hasta 1000s.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct GapHistogram {
/// Buckets cumulativos (Prometheus-style): cada índice cuenta eventos
/// con gap ≤ ese límite. Limites: 1ms, 10ms, 100ms, 1s, 10s, 100s, 1000s.
@@ -262,6 +262,78 @@ impl Observer {
pairs.truncate(k);
pairs
}
/// Snapshot serializable del estado estadístico (sin Instants — no son
/// portables a través de reboots). El window deslizante se descarta —
/// se reconstruye desde cero al restore.
pub fn snapshot(&self) -> ObserverSnapshot {
ObserverSnapshot {
schema_version: OBSERVER_SCHEMA_VERSION,
window_size: self.window_size,
half_life_secs: self.half_life_secs,
total: self.total,
marginal: self.marginal.iter()
.map(|(k, v)| (k.clone(), *v))
.collect(),
cooccur: self.cooccur.iter()
.map(|((a, b), c)| (a.clone(), b.clone(), *c))
.collect(),
gap_histograms: self.gap_histograms.iter()
.map(|((a, b), h)| (a.clone(), b.clone(), h.clone()))
.collect(),
}
}
/// Reconstruye Observer desde un snapshot. El window queda vacío;
/// last_seen_* se inicializa en `now()` para que el decay arranque
/// "ahora" para todos los counts (aproximación razonable post-reboot).
pub fn from_snapshot(snap: ObserverSnapshot) -> Self {
let now = Instant::now();
let mut marginal = HashMap::new();
let mut last_seen_marginal = HashMap::new();
for (k, v) in snap.marginal {
last_seen_marginal.insert(k.clone(), now);
marginal.insert(k, v);
}
let mut cooccur = HashMap::new();
let mut last_seen_cooccur = HashMap::new();
for (a, b, c) in snap.cooccur {
last_seen_cooccur.insert((a.clone(), b.clone()), now);
cooccur.insert((a, b), c);
}
let gap_histograms = snap.gap_histograms.into_iter()
.map(|(a, b, h)| ((a, b), h))
.collect();
Self {
window: VecDeque::with_capacity(snap.window_size),
window_size: snap.window_size,
marginal,
cooccur,
total: snap.total,
last_seen_marginal,
last_seen_cooccur,
half_life_secs: snap.half_life_secs,
gap_histograms,
}
}
}
const OBSERVER_SCHEMA_VERSION: u16 = 1;
/// Snapshot serializable. Se persiste a JSON en disco y se restaura al
/// reboot para preservar contadores, co-ocurrencias e histogramas.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ObserverSnapshot {
pub schema_version: u16,
pub window_size: usize,
pub half_life_secs: Option<f64>,
pub total: u64,
/// Marginales serializados como Vec porque HashMap<EventKind, _> usa
/// EventKind como key — y EventKind tiene variantes con payloads que
/// no son JSON-key-serializable (BusInvokeOf, Custom).
pub marginal: Vec<(EventKind, u64)>,
pub cooccur: Vec<(EventKind, EventKind, u64)>,
pub gap_histograms: Vec<(EventKind, EventKind, GapHistogram)>,
}
#[cfg(test)]