Cristales temporales, replay, lease/renew y audit streaming

- GapHistogram añade sum_squares_secs → stddev en O(1). GapStats serializable
  con count/mean/stddev/max.
- Crystal incluye gap_stats?: GapStats. crystal_to_rule emite Sequence con
  within_ms = (mean+2σ)*1000 cuando gap_stats.count >= 4; fallback a Single.
- audit::collect_chain_from_cas() recoge la cadena en orden cronológico.
  replay_chain() reconstruye RuleEngine aplicando PromoteCrystal/RemoveRule.
  Endpoint ReplayAudit + brainctl replay.
- GrantedCapability con expires_at: Instant. DEFAULT_GRANT_TTL = 60s.
  EnteGraph::renew_grant + purge_expired_grants. Tick cada 10s en el bucle
  primordial.
- AuditLog::subscribe() entrega un mpsc::UnboundedReceiver. append() empuja
  a todos los subscribers, purgando los muertos. IntrospectRequest::StreamAudit
  toma posesión de la conn y envía AuditStreamFrame en bucle. brainctl
  stream-audit imprime entries en directo.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-03 23:51:36 +00:00
parent badf4257ec
commit ca75ba185f
8 changed files with 340 additions and 21 deletions
+63 -1
View File
@@ -10,7 +10,9 @@
//! Path del socket: $ENTE_BRAIN_SOCK o $XDG_RUNTIME_DIR/ente-brain.sock
use ente_brain::introspect::{call, IntrospectRequest, IntrospectResponse};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
fn socket_path() -> PathBuf {
if let Ok(p) = std::env::var("ENTE_BRAIN_SOCK") {
@@ -26,6 +28,12 @@ async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
let cmd = args.get(1).map(|s| s.as_str()).unwrap_or("entropy");
// Comando especial: streaming. Mantiene la conn abierta y lee frames
// hasta Ctrl-C o EOF del servidor.
if cmd == "stream-audit" || cmd == "stream" {
return run_stream_audit(socket_path()).await;
}
let req = match cmd {
"list-rules" | "rules" => IntrospectRequest::ListRules,
"entropy" => IntrospectRequest::EntropySnapshot,
@@ -53,6 +61,7 @@ async fn main() -> anyhow::Result<()> {
}
"flush-audit" => IntrospectRequest::FlushAudit,
"audit-verify" | "verify" => IntrospectRequest::VerifyAudit,
"replay" => IntrospectRequest::ReplayAudit,
"reload" => {
let path = args.get(2).cloned();
IntrospectRequest::ReloadRules { path }
@@ -131,6 +140,14 @@ fn print_response(r: &IntrospectResponse) {
IntrospectResponse::Reloaded { count } => {
println!("reload OK: {count} reglas activas tras reload");
}
IntrospectResponse::Replayed(rep) => {
if let Some(e) = &rep.error {
println!("✗ replay falló: {e}");
} else {
println!("✓ replay completo — {} actions aplicadas, {} reglas finales",
rep.applied, rep.final_rule_count);
}
}
IntrospectResponse::AuditVerified(rep) => {
if let Some(seq) = rep.broken_at_seq {
println!("✗ verificación FALLÓ tras seq={seq}");
@@ -141,6 +158,11 @@ fn print_response(r: &IntrospectResponse) {
if let Some(g) = rep.genesis_sha { println!(" genesis: {}", hex_long(g)); }
}
}
IntrospectResponse::AuditStreamFrame(_) => {
// En modo request/response no debería llegar; solo aparece en
// run_stream_audit. Si llega aquí es un bug del servidor.
eprintln!("frame de stream recibido fuera de stream-audit (bug)");
}
IntrospectResponse::Error(e) => eprintln!("error: {e}"),
}
}
@@ -152,3 +174,43 @@ fn hex_short(sha: [u8; 32]) -> String {
fn hex_long(sha: [u8; 32]) -> String {
sha.iter().map(|b| format!("{:02x}", b)).collect()
}
async fn run_stream_audit(path: PathBuf) -> anyhow::Result<()> {
let mut stream = UnixStream::connect(&path).await?;
let req = IntrospectRequest::StreamAudit;
let buf = bincode::serialize(&req)?;
stream.write_u32(buf.len() as u32).await?;
stream.write_all(&buf).await?;
eprintln!("audit stream conectado a {} — Ctrl-C para salir", path.display());
loop {
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() {
eprintln!("\nstream cerrado por el servidor");
return Ok(());
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > 4 * 1024 * 1024 { anyhow::bail!("frame oversize"); }
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).await?;
let resp: IntrospectResponse = bincode::deserialize(&buf)?;
match resp {
IntrospectResponse::AuditStreamFrame(entry) => {
let prev = entry.prev_sha
.map(|s| s[..4].iter().map(|b| format!("{:02x}", b)).collect::<String>() + "..")
.unwrap_or_else(|| "".into());
let sha = entry.sha[..4].iter().map(|b| format!("{:02x}", b))
.collect::<String>() + "..";
println!("[stream] seq={} prev={} sha={} {:?}",
entry.seq, prev, sha, entry.action);
}
other => {
eprintln!("frame no esperado en stream: {other:?}");
return Ok(());
}
}
}
}
#[allow(dead_code)]
fn _suppress(_: &Path) {} // mantener Path import si compilador se queja
+85
View File
@@ -45,6 +45,9 @@ pub struct AuditLog {
last_flushed_sha: Option<[u8; 32]>,
/// Path opcional donde escribir el head pointer tras cada flush.
head_pointer_path: Option<std::path::PathBuf>,
/// Subscribers a entries en tiempo real. Cada `append` empuja a todos.
/// Subscribers cuyo receiver se dropeó se purgan en el siguiente push.
subscribers: Vec<tokio::sync::mpsc::UnboundedSender<AuditEntry>>,
}
impl AuditLog {
@@ -60,9 +63,21 @@ impl AuditLog {
flushed_count: 0,
last_flushed_sha: None,
head_pointer_path: None,
subscribers: Vec::new(),
}
}
/// Registra un nuevo subscriber. El receiver recibe cada `AuditEntry`
/// futuro hasta que el receiver se dropee (subscriber se purga al
/// siguiente `append`).
pub fn subscribe(&mut self) -> tokio::sync::mpsc::UnboundedReceiver<AuditEntry> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.subscribers.push(tx);
rx
}
pub fn subscriber_count(&self) -> usize { self.subscribers.len() }
pub fn with_head_pointer(mut self, path: std::path::PathBuf) -> Self {
self.head_pointer_path = Some(path);
self
@@ -86,6 +101,8 @@ impl AuditLog {
self.entries.pop_front();
}
self.entries.push_back(entry.clone());
// Empujar a subscribers, purgando los muertos in-place.
self.subscribers.retain(|tx| tx.send(entry.clone()).is_ok());
entry
}
@@ -164,6 +181,14 @@ pub struct AuditHeadPointer {
pub timestamp_ms: u64,
}
/// Reporte de un replay: número de actions aplicadas + reglas finales.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayReport {
pub applied: u64,
pub final_rule_count: usize,
pub error: Option<String>,
}
/// Reporte de verificación de la cadena audit.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerificationReport {
@@ -241,6 +266,66 @@ pub fn verify_chain_from_cas(start_sha: [u8; 32]) -> VerificationReport {
}
}
/// Recorre la cadena entera (head→genesis) y reconstruye la lista de
/// actions en orden cronológico (oldest first). Útil tanto para replay
/// como para auditoría retrospectiva.
pub fn collect_chain_from_cas(start_sha: [u8; 32]) -> anyhow::Result<Vec<AuditEntry>> {
let mut entries = Vec::new();
let mut current = Some(start_sha);
while let Some(sha) = current {
let path = ente_cas::cas_root().join(ente_cas::hex(&sha));
let bytes = std::fs::read(&path)?;
let mut entry: AuditEntry = serde_json::from_slice(&bytes)?;
entry.sha = sha;
let prev = entry.prev_sha;
entries.push(entry);
current = prev;
}
// entries está en orden head→genesis. Reverse para chronological.
entries.reverse();
Ok(entries)
}
/// Aplica las actions de la cadena en orden cronológico contra un engine
/// fresco. PromoteCrystal → insert. RemoveRule → remove. LoadRulesFile →
/// log informativo (los archivos pueden no existir en el ambiente actual).
pub fn replay_chain(
start_sha: [u8; 32],
engine: &mut crate::engine::RuleEngine,
) -> ReplayReport {
let entries = match collect_chain_from_cas(start_sha) {
Ok(es) => es,
Err(e) => return ReplayReport {
applied: 0, final_rule_count: engine.len(),
error: Some(format!("collect chain: {e}")),
},
};
let mut applied = 0u64;
for entry in &entries {
match &entry.action {
AuditAction::PromoteCrystal { rule_id, crystal } => {
let mut rule = crate::crystallize::crystal_to_rule(crystal);
rule.id = *rule_id; // preservar identidad histórica
engine.insert(rule);
}
AuditAction::RemoveRule { rule_id } => {
engine.remove(*rule_id);
}
AuditAction::LoadRulesFile { path: _, count: _ } => {
// Los archivos referenciados por path pueden haber cambiado
// o no existir. Log y skip — el replay sólo reconstruye
// promotes/removes que tienen estado en CAS.
}
}
applied += 1;
}
ReplayReport {
applied,
final_rule_count: engine.len(),
error: None,
}
}
impl Default for AuditLog {
fn default() -> Self { Self::new() }
}
+40 -13
View File
@@ -8,7 +8,7 @@
//! Cada cristal puede emitirse como snippet KCL (texto humano-readable) o
//! como `Rule` ejecutable directamente por el motor.
use crate::observer::Observer;
use crate::observer::{GapStats, Observer};
use crate::rules::{Action, EventKind, EventPattern, LogLevel, Rule, Scope};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
@@ -20,6 +20,11 @@ pub struct Crystal {
pub conditional_prob: f64,
pub pmi: f64,
pub support: u64,
/// Estadísticas del gap temporal entre antecedent → consequent.
/// None si no hay histograma. Habilita generación de reglas Sequence
/// con `within_ms = (mean + 2σ) * 1000`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub gap_stats: Option<GapStats>,
}
#[derive(Debug, Clone, Copy)]
@@ -47,15 +52,19 @@ pub fn detect_crystals(obs: &Observer, params: &CrystallizationParams) -> Vec<Cr
if cp < params.min_conditional_prob { continue; }
let mi = obs.pmi(a, b);
if mi < params.min_pmi { continue; }
// Stats del histograma si existen para este par.
let gap_stats = obs.gap_histograms()
.get(&(a.clone(), b.clone()))
.map(|h| h.stats());
out.push(Crystal {
antecedent: a.clone(),
consequent: b.clone(),
conditional_prob: cp,
pmi: mi,
support: count,
gap_stats,
});
}
// Orden estable: por confianza descendente para fácil inspección.
out.sort_by(|x, y| y.conditional_prob.partial_cmp(&x.conditional_prob).unwrap_or(std::cmp::Ordering::Equal));
out
}
@@ -117,22 +126,40 @@ fn kind_extra(k: &EventKind) -> String {
}
}
/// Convierte un cristal a una `Rule` ejecutable por el motor. Útil para
/// "auto-aprendizaje" donde cristales se promueven a reglas vivas tras
/// validar con el operador.
/// Convierte un cristal a una `Rule` ejecutable. Si hay gap_stats con
/// muestras suficientes (≥ 4), genera una regla `Sequence` con
/// `within_ms = (mean + 2σ) * 1000`. 2σ cubre ~95% de la distribución
/// asumiendo normalidad — captura el "tiempo típico de respuesta" del
/// patrón observado. Si no hay stats, fallback a `Single { antecedent }`.
pub fn crystal_to_rule(c: &Crystal) -> Rule {
let when = match &c.gap_stats {
Some(s) if s.count >= 4 => {
// Mínimo 1ms para evitar within_ms=0 cuando varianza colapsa.
let bound_secs = (s.mean_secs + 2.0 * s.stddev_secs).max(0.001);
EventPattern::Sequence {
kinds: vec![c.antecedent.clone(), c.consequent.clone()],
within_ms: (bound_secs * 1000.0).ceil() as u64,
}
}
_ => EventPattern::Single { kind: c.antecedent.clone() },
};
let message = match &c.gap_stats {
Some(s) if s.count >= 4 => format!(
"crystal seq: {:?}{:?} (P={:.2}, PMI={:.2}, gap={:.3}±{:.3}s)",
c.antecedent, c.consequent, c.conditional_prob, c.pmi,
s.mean_secs, s.stddev_secs,
),
_ => format!(
"crystal: {:?}{:?} (P={:.2}, PMI={:.2}, n={})",
c.antecedent, c.consequent, c.conditional_prob, c.pmi, c.support
),
};
Rule {
id: Ulid::new(),
priority: 5,
when: EventPattern::Single { kind: c.antecedent.clone() },
when,
scope: Scope::default(),
then: vec![Action::Log {
level: LogLevel::Info,
message: format!(
"crystal: {:?} → {:?} (P={:.2}, PMI={:.2}, n={})",
c.antecedent, c.consequent, c.conditional_prob, c.pmi, c.support
),
}],
then: vec![Action::Log { level: LogLevel::Info, message }],
}
}
+52
View File
@@ -106,6 +106,13 @@ pub enum IntrospectRequest {
/// Verifica la cadena audit recorriendo prev_sha hasta el genesis,
/// validando integridad de cada entry contra el CAS.
VerifyAudit,
/// Reconstruye el engine desde la cadena audit. Vacía engine y aplica
/// PromoteCrystal/RemoveRule en orden cronológico.
ReplayAudit,
/// Mantiene la conexión abierta y empuja cada `AuditEntry` nuevo en
/// frames `IntrospectResponse::AuditStreamFrame` hasta que el cliente
/// cierra. Tras esta request no se aceptan más requests en la misma conn.
StreamAudit,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -129,6 +136,10 @@ pub enum IntrospectResponse {
Reloaded { count: usize },
/// Resultado de VerifyAudit.
AuditVerified(crate::audit::VerificationReport),
/// Resultado de ReplayAudit.
Replayed(crate::audit::ReplayReport),
/// Frame de streaming. El cliente lee estos en bucle hasta EOF.
AuditStreamFrame(crate::audit::AuditEntry),
Error(String),
}
@@ -201,6 +212,11 @@ impl IntrospectServer {
let req: IntrospectRequest = bincode::deserialize(&buf)?;
debug!(?req, "introspect request");
// StreamAudit toma posesión de la conn — no más requests aquí.
if matches!(req, IntrospectRequest::StreamAudit) {
return self.stream_audit(stream).await;
}
let resp = self.dispatch(req).await;
let out = bincode::serialize(&resp)?;
@@ -209,6 +225,22 @@ impl IntrospectServer {
}
}
/// Modo streaming: subscribe al audit log y empuja cada entry como
/// frame `AuditStreamFrame`. La función retorna cuando el cliente
/// cierra (write falla) o el subscriber se desconecta.
async fn stream_audit(self: Arc<Self>, mut stream: UnixStream) -> anyhow::Result<()> {
let mut rx = self.state.audit.write().await.subscribe();
info!("audit stream client conectado");
while let Some(entry) = rx.recv().await {
let frame = IntrospectResponse::AuditStreamFrame(entry);
let bytes = bincode::serialize(&frame)?;
if stream.write_u32(bytes.len() as u32).await.is_err() { break; }
if stream.write_all(&bytes).await.is_err() { break; }
}
info!("audit stream client desconectado");
Ok(())
}
async fn dispatch(&self, req: IntrospectRequest) -> IntrospectResponse {
match req {
IntrospectRequest::ListRules => {
@@ -333,6 +365,26 @@ impl IntrospectServer {
let report = crate::audit::verify_chain_from_cas(head);
IntrospectResponse::AuditVerified(report)
}
IntrospectRequest::StreamAudit => {
// Inalcanzable por construcción: handle() detecta StreamAudit
// antes de llamar a dispatch(). Pero el match exige cubrir.
IntrospectResponse::Error(
"StreamAudit no debe llegar a dispatch — bug del handler".into()
)
}
IntrospectRequest::ReplayAudit => {
let head = self.state.audit.read().await.last_flushed_sha();
let head = match head {
Some(h) => h,
None => return IntrospectResponse::Error(
"audit log sin entries flushadas — nada que replayar".into()
),
};
let mut engine = self.state.engine.write().await;
*engine = crate::engine::RuleEngine::empty();
let report = crate::audit::replay_chain(head, &mut engine);
IntrospectResponse::Replayed(report)
}
IntrospectRequest::ReloadRules { path } => {
// Path explícito gana sobre el rules_out configurado.
let resolved = path.map(std::path::PathBuf::from)
+33 -1
View File
@@ -21,7 +21,7 @@ pub struct TimedEvent {
}
/// Histograma de gaps temporales con buckets exponenciales en segundos.
/// Cubre 10 órdenes de magnitud: 1ms hasta 1000s.
/// Cubre 6 órdenes de magnitud: 1ms hasta 1000s.
#[derive(Debug, Clone, Default)]
pub struct GapHistogram {
/// Buckets cumulativos (Prometheus-style): cada índice cuenta eventos
@@ -29,6 +29,17 @@ pub struct GapHistogram {
pub buckets: [u64; 7],
pub count: u64,
pub sum_secs: f64,
/// Suma de cuadrados — permite calcular varianza/stddev en O(1).
pub sum_squares_secs: f64,
pub max_secs: f64,
}
/// Estadísticas resumidas de un GapHistogram, usables en cristales temporales.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GapStats {
pub count: u64,
pub mean_secs: f64,
pub stddev_secs: f64,
pub max_secs: f64,
}
@@ -45,6 +56,7 @@ impl GapHistogram {
}
self.count += 1;
self.sum_secs += gap_secs;
self.sum_squares_secs += gap_secs * gap_secs;
if gap_secs > self.max_secs { self.max_secs = gap_secs; }
}
@@ -52,6 +64,26 @@ impl GapHistogram {
if self.count == 0 { 0.0 } else { self.sum_secs / self.count as f64 }
}
/// Desviación estándar muestral. Computada vía `sum_squares - n*mean²`
/// para precisión razonable sin almacenar las muestras.
pub fn stddev_secs(&self) -> f64 {
if self.count < 2 { return 0.0; }
let n = self.count as f64;
let mean = self.mean_secs();
let var = (self.sum_squares_secs - n * mean * mean) / (n - 1.0);
// Numerical floor: var puede ser ligeramente negativo por float ε.
if var <= 0.0 { 0.0 } else { var.sqrt() }
}
pub fn stats(&self) -> GapStats {
GapStats {
count: self.count,
mean_secs: self.mean_secs(),
stddev_secs: self.stddev_secs(),
max_secs: self.max_secs,
}
}
pub fn bucket_limits() -> &'static [f64; 7] { &GAP_BUCKET_LIMITS_SECS }
}
+50 -6
View File
@@ -1,13 +1,15 @@
//! Mediación de capabilities: emisión y revocación de tokens.
//! Mediación de capabilities: emisión, renovación, revocación de tokens.
//!
//! El Init no fuerza políticas — sólo verifica que el proveedor existe y
//! emite tokens. Las políticas reales (quién puede pedir qué, rate limits,
//! audit) se aplican en capas superiores.
//! Los grants tienen TTL (`DEFAULT_GRANT_TTL`). El cliente debe renovarlos
//! periódicamente con `renew_grant(token)`; en caso contrario, el background
//! task `purge_expired_grants` los revoca al vencimiento.
use super::{EnteGraph, GrantedCapability};
use super::{EnteGraph, GrantedCapability, DEFAULT_GRANT_TTL};
use crate::events::CapabilityGrant;
use ente_card::Capability;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::debug;
use ulid::Ulid;
impl EnteGraph {
@@ -22,12 +24,54 @@ impl EnteGraph {
Some(provider) => {
let token = self.next_token;
self.next_token += 1;
let expires_at = Instant::now() + DEFAULT_GRANT_TTL;
self.grants.insert(token, GrantedCapability {
cap: cap.clone(), provider, holder: from,
cap: cap.clone(),
provider,
holder: from,
expires_at,
});
CapabilityGrant::Granted { token }
}
};
let _ = reply.send(grant);
}
/// Extiende un grant existente. Devuelve `true` si renovó. Si el token
/// no existe o ya expiró, `false` (el cliente debe re-acquire).
pub fn renew_grant(&mut self, token: u64) -> bool {
let now = Instant::now();
if let Some(g) = self.grants.get_mut(&token) {
if g.expires_at > now {
g.expires_at = now + DEFAULT_GRANT_TTL;
return true;
}
// Expired — purgamos aquí mismo.
self.grants.remove(&token);
}
false
}
/// GC: elimina grants vencidos. Devuelve cuántos fueron purgados.
pub fn purge_expired_grants(&mut self) -> usize {
let now = Instant::now();
let expired: Vec<u64> = self.grants.iter()
.filter(|(_, g)| g.expires_at <= now)
.map(|(t, _)| *t)
.collect();
for t in &expired {
self.grants.remove(t);
}
if !expired.is_empty() {
debug!(count = expired.len(), "grants expirados purgados");
}
expired.len()
}
/// Cuenta de grants vivos (no expirados). Usado por métricas.
pub fn active_grants_count(&self) -> usize {
let now = Instant::now();
self.grants.values().filter(|g| g.expires_at > now).count()
}
}
+6
View File
@@ -66,8 +66,14 @@ pub(in crate::graph) struct GrantedCapability {
pub cap: Capability,
pub provider: Ulid,
pub holder: Ulid,
/// Instante en el que el grant deja de ser válido. El garbage collector
/// del cerebro purga grants con `Instant::now() > expires_at`.
pub expires_at: std::time::Instant,
}
/// TTL default para nuevos grants. Configurable por bus en el futuro.
pub const DEFAULT_GRANT_TTL: std::time::Duration = std::time::Duration::from_secs(60);
impl EnteGraph {
pub fn new(mut seed: EntityCard) -> Self {
// Extraemos genesis antes de almacenar la Semilla — evita duplicación
+11
View File
@@ -237,6 +237,10 @@ async fn primordial_loop(
};
tokio::pin!(dev_exit);
// GC de capability grants expirados — corre cada 10 segundos.
let mut grant_purge = tokio::time::interval(Duration::from_secs(10));
grant_purge.tick().await; // descartar primer tick inmediato
loop {
tokio::select! {
biased;
@@ -258,6 +262,13 @@ async fn primordial_loop(
}
}
_ = grant_purge.tick() => {
let n = graph.purge_expired_grants();
if n > 0 {
info!(purged = n, active = graph.active_grants_count(), "GC capability grants");
}
}
_ = async { dev_exit.as_mut().as_pin_mut().unwrap().await }, if dev_mode => {
info!("dev mode: timer expirado, cerrando bucle primordial");
let _ = graph_tx.send(GraphEvent::Shutdown {