Capability quotas, CAS gc, patrones burst/silence, snapshot incremental

- quota_for_capability(cap) tabla por variante: Spawn 2, FilesystemRoot 2,
  Endpoint 16, Journal 32. mediate_capability rechaza con QuotaExceeded
  si holder ya tiene N tokens activos.
- ente_cas::list_all_shas() + gc(reachable). audit::reachable_from_head()
  walks la cadena. Endpoint GcCas con extra_roots para Wasm SHAs.
  brainctl gc-cas.
- PatternCrystal::Burst (kind, count, freq_hz) y Silence (kind, since_secs).
  detect_pattern_crystals + endpoint PatternCrystals + brainctl patterns.
- Observer.dirty_marginal/dirty_cooccur tracking. snapshot() marca
  consumo (clears dirty); snapshot_delta() emite sólo lo cambiado.
  apply_delta() merges incremental sobre estado existente. Útil para
  checkpoints frecuentes con poco overhead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-04 00:43:28 +00:00
parent 6aee9254d4
commit f4eb7dd944
10 changed files with 344 additions and 21 deletions
+18
View File
@@ -62,6 +62,8 @@ async fn main() -> anyhow::Result<()> {
"flush-audit" => IntrospectRequest::FlushAudit,
"audit-verify" | "verify" => IntrospectRequest::VerifyAudit,
"replay" => IntrospectRequest::ReplayAudit,
"gc-cas" => IntrospectRequest::GcCas { extra_roots: Vec::new() },
"patterns" => IntrospectRequest::PatternCrystals,
"reload" => {
let path = args.get(2).cloned();
IntrospectRequest::ReloadRules { path }
@@ -158,6 +160,22 @@ fn print_response(r: &IntrospectResponse) {
if let Some(g) = rep.genesis_sha { println!(" genesis: {}", hex_long(g)); }
}
}
IntrospectResponse::Patterns(ps) => {
println!("{} cristales pattern detectados:", ps.len());
for p in ps {
match p {
ente_brain::crystallize::PatternCrystal::Burst { kind, count, frequency_per_sec } => {
println!(" burst: {kind:?} count={count} freq={frequency_per_sec:.2} Hz");
}
ente_brain::crystallize::PatternCrystal::Silence { kind, last_count, since_secs } => {
println!(" silence: {kind:?} last_count={last_count} ausente={since_secs:.1}s");
}
}
}
}
IntrospectResponse::GcResult { deleted, freed_bytes } => {
println!("CAS gc: {deleted} blobs eliminados, {freed_bytes} bytes liberados");
}
IntrospectResponse::AuditStreamFrame(_) => {
// En modo request/response no debería llegar; solo aparece en
// run_stream_audit. Si llega aquí es un bug del servidor.
+18
View File
@@ -280,6 +280,24 @@ pub fn verify_chain_from_cas(start_sha: [u8; 32]) -> VerificationReport {
}
}
/// Devuelve el set de SHAs alcanzables desde `start_sha` siguiendo
/// `prev_sha` hasta el genesis. Usado por el GC del CAS para construir
/// las "raíces vivas" del audit log.
pub fn reachable_from_head(start_sha: [u8; 32]) -> std::collections::HashSet<[u8; 32]> {
let mut set = std::collections::HashSet::new();
let mut current = Some(start_sha);
while let Some(sha) = current {
if !set.insert(sha) { break; } // ciclo (no debería pasar) — corta
let path = ente_cas::cas_root().join(ente_cas::hex(&sha));
let bytes = match std::fs::read(&path) { Ok(b) => b, Err(_) => break };
let entry: AuditEntry = match serde_json::from_slice(&bytes) {
Ok(e) => e, Err(_) => break,
};
current = entry.prev_sha;
}
set
}
/// Recorre la cadena entera (head→genesis) y reconstruye la lista de
/// actions en orden cronológico (oldest first). Útil tanto para replay
/// como para auditoría retrospectiva.
+94
View File
@@ -11,6 +11,7 @@
use crate::observer::{GapStats, Observer};
use crate::rules::{Action, EventKind, EventPattern, LogLevel, Rule, Scope};
use serde::{Deserialize, Serialize};
use std::time::Instant;
use ulid::Ulid;
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -164,6 +165,99 @@ pub fn crystal_to_rule(c: &Crystal) -> Rule {
}
// ============================================================================
// Patrones extendidos: Burst (alta frecuencia) y Silence (ausencia prolongada).
// Estos cristales son sobre un único kind, no pares — capturan dinámicas
// temporales de eventos individuales.
// ============================================================================
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PatternCrystal {
/// Mismo evento aparece con frecuencia alta. `frequency_per_sec` se
/// estima sobre el window de observación.
Burst {
kind: EventKind,
count: u64,
frequency_per_sec: f64,
},
/// Evento que dejó de aparecer. `since_secs` es el tiempo desde la
/// última observación.
Silence {
kind: EventKind,
last_count: u64,
since_secs: f64,
},
}
#[derive(Debug, Clone, Copy)]
pub struct PatternParams {
/// Mínimo de ocurrencias para considerar Burst.
pub burst_min_count: u64,
/// Frecuencia mínima (eventos por segundo) para considerar Burst.
pub burst_min_freq_hz: f64,
/// Tiempo desde última ocurrencia para considerar Silence.
pub silence_min_secs: f64,
/// Mínimo total previo para considerar Silence (eventos < N son ruido).
pub silence_min_prior_count: u64,
}
impl Default for PatternParams {
fn default() -> Self {
Self {
burst_min_count: 10,
burst_min_freq_hz: 5.0,
silence_min_secs: 30.0,
silence_min_prior_count: 3,
}
}
}
/// Detecta Bursts y Silences sobre la distribución marginal del observer.
/// La frecuencia de un Burst se aproxima asumiendo que la observación cubre
/// el rango entre `last_seen` y `Instant::now()` para ese kind.
pub fn detect_pattern_crystals(obs: &Observer, params: &PatternParams) -> Vec<PatternCrystal> {
let mut out = Vec::new();
let now = Instant::now();
for (kind, &count) in obs.marginals() {
let last_seen = obs.last_seen_marginal(kind);
// ---- Burst ----
if count >= params.burst_min_count {
// Aproximación: si vimos `count` eventos hasta `last_seen`, y el
// primer evento sucedió en algún momento del window, la freq es
// count / window_age. Sin tiempo del primer evento, usamos
// last_seen → now como denominador (subestima freq) o asumimos
// ventana fija de 60s. Usamos la última como aproximación.
let elapsed = last_seen
.map(|t| now.saturating_duration_since(t).as_secs_f64().max(0.001))
.unwrap_or(60.0);
// Estimación conservadora: count / max(window_age, 1s).
// Si tenemos histograma, podríamos refinar — TODO.
let freq = count as f64 / elapsed.max(1.0);
if freq >= params.burst_min_freq_hz {
out.push(PatternCrystal::Burst {
kind: kind.clone(),
count,
frequency_per_sec: freq,
});
}
}
// ---- Silence ----
if count >= params.silence_min_prior_count {
if let Some(t) = last_seen {
let since = now.saturating_duration_since(t).as_secs_f64();
if since >= params.silence_min_secs {
out.push(PatternCrystal::Silence {
kind: kind.clone(),
last_count: count,
since_secs: since,
});
}
}
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
+28
View File
@@ -113,6 +113,12 @@ pub enum IntrospectRequest {
/// frames `IntrospectResponse::AuditStreamFrame` hasta que el cliente
/// cierra. Tras esta request no se aceptan más requests en la misma conn.
StreamAudit,
/// Garbage-collect el CAS. Considera reachable: todo lo alcanzable desde
/// el head del audit log. Cualquier blob extra (Wasm modules referenciados
/// por Cards) debe haberse pasado en `extra_roots` por el caller.
GcCas { extra_roots: Vec<[u8; 32]> },
/// Detecta cristales de patrones temporales (Burst, Silence).
PatternCrystals,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -140,6 +146,10 @@ pub enum IntrospectResponse {
Replayed(crate::audit::ReplayReport),
/// Frame de streaming. El cliente lee estos en bucle hasta EOF.
AuditStreamFrame(crate::audit::AuditEntry),
/// Resultado de GcCas: cuántos blobs eliminados y bytes liberados.
GcResult { deleted: usize, freed_bytes: u64 },
/// Cristales de Burst/Silence detectados.
Patterns(Vec<crate::crystallize::PatternCrystal>),
Error(String),
}
@@ -372,6 +382,24 @@ impl IntrospectServer {
"StreamAudit no debe llegar a dispatch — bug del handler".into()
)
}
IntrospectRequest::PatternCrystals => {
let obs = self.state.observer.read().await;
let params = crate::crystallize::PatternParams::default();
let patterns = crate::crystallize::detect_pattern_crystals(&obs, &params);
IntrospectResponse::Patterns(patterns)
}
IntrospectRequest::GcCas { extra_roots } => {
// Reachable = audit chain desde head + extra_roots provistos.
let mut reachable = std::collections::HashSet::new();
if let Some(head) = self.state.audit.read().await.last_flushed_sha() {
reachable.extend(crate::audit::reachable_from_head(head));
}
reachable.extend(extra_roots);
match ente_cas::gc(&reachable) {
Ok((deleted, freed_bytes)) => IntrospectResponse::GcResult { deleted, freed_bytes },
Err(e) => IntrospectResponse::Error(format!("gc: {e}")),
}
}
IntrospectRequest::ReplayAudit => {
let head = self.state.audit.read().await.last_flushed_sha();
let head = match head {
+87 -6
View File
@@ -102,6 +102,10 @@ pub struct Observer {
half_life_secs: Option<f64>,
/// Histograma de gaps temporales por par (a, b). Capturado al `record()`.
gap_histograms: HashMap<(EventKind, EventKind), GapHistogram>,
/// Sets de "qué cambió desde el último snapshot". Se vacían en
/// `snapshot()` y `snapshot_delta()`. Usado para escritura incremental.
dirty_marginal: std::collections::HashSet<EventKind>,
dirty_cooccur: std::collections::HashSet<(EventKind, EventKind)>,
}
impl Observer {
@@ -116,6 +120,8 @@ impl Observer {
last_seen_cooccur: HashMap::new(),
half_life_secs: None,
gap_histograms: HashMap::new(),
dirty_marginal: std::collections::HashSet::new(),
dirty_cooccur: std::collections::HashSet::new(),
}
}
@@ -143,7 +149,8 @@ impl Observer {
*self.cooccur.entry(key.clone()).or_insert(0) += 1;
self.last_seen_cooccur.insert(key.clone(), now);
let gap_secs = now.duration_since(w.at).as_secs_f64();
self.gap_histograms.entry(key).or_default().observe(gap_secs);
self.gap_histograms.entry(key.clone()).or_default().observe(gap_secs);
self.dirty_cooccur.insert(key);
}
self.window.push_back(timed);
@@ -152,7 +159,8 @@ impl Observer {
}
*self.marginal.entry(kind.clone()).or_insert(0) += 1;
self.last_seen_marginal.insert(kind, now);
self.last_seen_marginal.insert(kind.clone(), now);
self.dirty_marginal.insert(kind);
self.total += 1;
}
@@ -238,6 +246,12 @@ impl Observer {
}
pub fn marginals(&self) -> &HashMap<EventKind, u64> { &self.marginal }
/// Última vez que se vio un kind. None si nunca o si fue restaurado
/// desde snapshot (los Instants no portables se descartan).
pub fn last_seen_marginal(&self, kind: &EventKind) -> Option<Instant> {
self.last_seen_marginal.get(kind).copied()
}
pub fn cooccurrences(&self) -> &HashMap<(EventKind, EventKind), u64> { &self.cooccur }
pub fn total(&self) -> u64 { self.total }
pub fn window_size(&self) -> usize { self.window_size }
@@ -263,12 +277,15 @@ impl Observer {
pairs
}
/// Snapshot serializable del estado estadístico (sin Instants — no son
/// portables a través de reboots). El window deslizante se descarta —
/// se reconstruye desde cero al restore.
pub fn snapshot(&self) -> ObserverSnapshot {
/// Snapshot full: estado estadístico completo. Limpia los sets dirty
/// como side-effect — los próximos `snapshot_delta()` cubren sólo los
/// cambios posteriores.
pub fn snapshot(&mut self) -> ObserverSnapshot {
self.dirty_marginal.clear();
self.dirty_cooccur.clear();
ObserverSnapshot {
schema_version: OBSERVER_SCHEMA_VERSION,
is_delta: false,
window_size: self.window_size,
half_life_secs: self.half_life_secs,
total: self.total,
@@ -284,6 +301,64 @@ impl Observer {
}
}
/// Snapshot incremental: sólo incluye los kinds y pares que cambiaron
/// desde el último `snapshot()` o `snapshot_delta()`. Útil para
/// checkpoints frecuentes con poco overhead. Limpia los sets dirty.
pub fn snapshot_delta(&mut self) -> ObserverSnapshot {
let marginal: Vec<_> = self.dirty_marginal.iter()
.filter_map(|k| self.marginal.get(k).map(|v| (k.clone(), *v)))
.collect();
let cooccur: Vec<_> = self.dirty_cooccur.iter()
.filter_map(|(a, b)| {
self.cooccur.get(&(a.clone(), b.clone()))
.map(|c| (a.clone(), b.clone(), *c))
})
.collect();
// Para histogramas: incluimos los pares cuyo cooccur cambió.
let gap_histograms: Vec<_> = self.dirty_cooccur.iter()
.filter_map(|(a, b)| {
self.gap_histograms.get(&(a.clone(), b.clone()))
.map(|h| (a.clone(), b.clone(), h.clone()))
})
.collect();
self.dirty_marginal.clear();
self.dirty_cooccur.clear();
ObserverSnapshot {
schema_version: OBSERVER_SCHEMA_VERSION,
is_delta: true,
window_size: self.window_size,
half_life_secs: self.half_life_secs,
total: self.total,
marginal, cooccur, gap_histograms,
}
}
/// Aplica un delta sobre el estado actual. Para `is_delta=true`, los
/// valores en marginal/cooccur sobrescriben las entradas existentes.
/// Si `is_delta=false`, equivale a `from_snapshot` pero in-place.
pub fn apply_delta(&mut self, delta: ObserverSnapshot) {
let now = Instant::now();
if !delta.is_delta {
// Full: reset state.
*self = Self::from_snapshot(delta);
return;
}
// Incremental merge.
for (k, v) in delta.marginal {
self.last_seen_marginal.insert(k.clone(), now);
self.marginal.insert(k, v);
}
for (a, b, c) in delta.cooccur {
self.last_seen_cooccur.insert((a.clone(), b.clone()), now);
self.cooccur.insert((a, b), c);
}
for (a, b, h) in delta.gap_histograms {
self.gap_histograms.insert((a, b), h);
}
// total: sólo subimos (el delta podría estar atrasado).
if delta.total > self.total { self.total = delta.total; }
}
/// Reconstruye Observer desde un snapshot. El window queda vacío;
/// last_seen_* se inicializa en `now()` para que el decay arranque
/// "ahora" para todos los counts (aproximación razonable post-reboot).
@@ -314,6 +389,8 @@ impl Observer {
last_seen_cooccur,
half_life_secs: snap.half_life_secs,
gap_histograms,
dirty_marginal: std::collections::HashSet::new(),
dirty_cooccur: std::collections::HashSet::new(),
}
}
}
@@ -325,6 +402,10 @@ const OBSERVER_SCHEMA_VERSION: u16 = 1;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ObserverSnapshot {
pub schema_version: u16,
/// `true` si sólo contiene los cambios desde el último snapshot.
/// `false` = full state, sobreescribe el observer al aplicar.
#[serde(default)]
pub is_delta: bool,
pub window_size: usize,
pub half_life_secs: Option<f64>,
pub total: u64,