Files
arje/crates/ente-brain/examples/brainctl.rs
T
Sergio f4eb7dd944 Capability quotas, CAS gc, patrones burst/silence, snapshot incremental
- quota_for_capability(cap) tabla por variante: Spawn 2, FilesystemRoot 2,
  Endpoint 16, Journal 32. mediate_capability rechaza con QuotaExceeded
  si holder ya tiene N tokens activos.
- ente_cas::list_all_shas() + gc(reachable). audit::reachable_from_head()
  walks la cadena. Endpoint GcCas con extra_roots para Wasm SHAs.
  brainctl gc-cas.
- PatternCrystal::Burst (kind, count, freq_hz) y Silence (kind, since_secs).
  detect_pattern_crystals + endpoint PatternCrystals + brainctl patterns.
- Observer.dirty_marginal/dirty_cooccur tracking. snapshot() marca
  consumo (clears dirty); snapshot_delta() emite sólo lo cambiado.
  apply_delta() merges incremental sobre estado existente. Útil para
  checkpoints frecuentes con poco overhead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 00:43:28 +00:00

235 lines
9.9 KiB
Rust

//! brainctl: cliente CLI del introspect API.
//!
//! Uso:
//! cargo run --example brainctl -p ente-brain -- list-rules
//! cargo run --example brainctl -p ente-brain -- entropy
//! cargo run --example brainctl -p ente-brain -- top 10
//! cargo run --example brainctl -p ente-brain -- crystals
//! cargo run --example brainctl -p ente-brain -- crystal-kcl 0
//!
//! Path del socket: $ENTE_BRAIN_SOCK o $XDG_RUNTIME_DIR/ente-brain.sock
use ente_brain::introspect::{call, IntrospectRequest, IntrospectResponse};
use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
fn socket_path() -> PathBuf {
if let Ok(p) = std::env::var("ENTE_BRAIN_SOCK") {
return p.into();
}
let runtime = std::env::var("XDG_RUNTIME_DIR")
.unwrap_or_else(|_| std::env::var("TMPDIR").unwrap_or_else(|_| "/tmp".into()));
format!("{runtime}/ente-brain.sock").into()
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
let cmd = args.get(1).map(|s| s.as_str()).unwrap_or("entropy");
// Comando especial: streaming. Mantiene la conn abierta y lee frames
// hasta Ctrl-C o EOF del servidor.
if cmd == "stream-audit" || cmd == "stream" {
return run_stream_audit(socket_path()).await;
}
let req = match cmd {
"list-rules" | "rules" => IntrospectRequest::ListRules,
"entropy" => IntrospectRequest::EntropySnapshot,
"top" => {
let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(10);
IntrospectRequest::TopCorrelations { n }
}
"crystals" => IntrospectRequest::Crystals,
"crystal-kcl" => {
let i: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
IntrospectRequest::CrystalKcl { index: i }
}
"promote" => {
let i: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
IntrospectRequest::PromoteCrystal { index: i }
}
"remove" => {
let id_s = args.get(2).ok_or_else(|| anyhow::anyhow!("se requiere <ulid>"))?;
let id: ulid::Ulid = id_s.parse()?;
IntrospectRequest::RemoveRule { id }
}
"audit" => {
let limit: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(20);
IntrospectRequest::ListAudit { limit }
}
"flush-audit" => IntrospectRequest::FlushAudit,
"audit-verify" | "verify" => IntrospectRequest::VerifyAudit,
"replay" => IntrospectRequest::ReplayAudit,
"gc-cas" => IntrospectRequest::GcCas { extra_roots: Vec::new() },
"patterns" => IntrospectRequest::PatternCrystals,
"reload" => {
let path = args.get(2).cloned();
IntrospectRequest::ReloadRules { path }
}
other => {
eprintln!("subcomando desconocido: {other}");
eprintln!("válidos: list-rules | entropy | top <n> | crystals | crystal-kcl <i> | promote <i> | remove <ulid> | audit <limit> | flush-audit | reload [path]");
std::process::exit(2);
}
};
let path = socket_path();
let resp = call(&path, req).await?;
print_response(&resp);
Ok(())
}
fn print_response(r: &IntrospectResponse) {
match r {
IntrospectResponse::Rules(rs) => {
println!("{} reglas vivas:", rs.len());
for r in rs {
println!(" {} prio={} kind={} actions={} wildcard={}",
r.id, r.priority, r.event_kind_tag, r.action_count, r.scope_wildcard);
}
}
IntrospectResponse::Rule(rule) => match rule {
Some(r) => println!("{r:#?}"),
None => println!("regla no encontrada"),
},
IntrospectResponse::Entropy { value_bits, sample_size, distinct_kinds, window_full } => {
println!("Shannon entropy : {value_bits:.4} bits");
println!("Sample size : {sample_size}");
println!("Distinct kinds : {distinct_kinds}");
println!("Window full : {window_full}");
}
IntrospectResponse::Correlations(entries) => {
println!("{} pares (top, ordenado por co-ocurrencia):", entries.len());
for e in entries {
println!(" n={:>4} P(b|a)={:.3} PMI={:>6.3}b {}{}",
e.joint_count, e.conditional_prob, e.pmi_bits, e.a, e.b);
}
}
IntrospectResponse::Crystals(cs) => {
println!("{} cristales detectados:", cs.len());
for (i, c) in cs.iter().enumerate() {
println!(" [{i}] {:?}{:?} P={:.3} PMI={:.3}b n={}",
c.antecedent, c.consequent, c.conditional_prob, c.pmi, c.support);
}
}
IntrospectResponse::Kcl(s) => println!("{s}"),
IntrospectResponse::Promoted { rule_id, kcl_snippet } => {
println!("regla creada: {rule_id}");
println!("--- KCL para auditoría / persistencia ---");
println!("{kcl_snippet}");
}
IntrospectResponse::Removed(was_present) => {
if *was_present { println!("regla eliminada"); }
else { println!("regla no encontrada"); }
}
IntrospectResponse::AuditEntries(entries) => {
println!("{} entries de audit log:", entries.len());
for e in entries {
let prev = e.prev_sha.map(hex_short).unwrap_or_else(|| "".into());
let sha = hex_short(e.sha);
println!(" seq={:>4} t={} prev={} sha={} {:?}",
e.seq, e.timestamp_ms, prev, sha, e.action);
}
}
IntrospectResponse::Flushed { written, head_sha, total_flushed } => {
println!("flushed: {written} entries esta pasada, total acumulado: {total_flushed}");
if let Some(sha) = head_sha {
println!("head sha: {}", hex_long(*sha));
}
}
IntrospectResponse::Reloaded { count } => {
println!("reload OK: {count} reglas activas tras reload");
}
IntrospectResponse::Replayed(rep) => {
if let Some(e) = &rep.error {
println!("✗ replay falló: {e}");
} else {
println!("✓ replay completo — {} actions aplicadas, {} reglas finales",
rep.applied, rep.final_rule_count);
}
}
IntrospectResponse::AuditVerified(rep) => {
if let Some(seq) = rep.broken_at_seq {
println!("✗ verificación FALLÓ tras seq={seq}");
if let Some(e) = &rep.error { println!(" motivo: {e}"); }
println!(" entries verificadas: {}", rep.verified);
} else {
println!("✓ chain verificada — {} entries íntegras", rep.verified);
if let Some(g) = rep.genesis_sha { println!(" genesis: {}", hex_long(g)); }
}
}
IntrospectResponse::Patterns(ps) => {
println!("{} cristales pattern detectados:", ps.len());
for p in ps {
match p {
ente_brain::crystallize::PatternCrystal::Burst { kind, count, frequency_per_sec } => {
println!(" burst: {kind:?} count={count} freq={frequency_per_sec:.2} Hz");
}
ente_brain::crystallize::PatternCrystal::Silence { kind, last_count, since_secs } => {
println!(" silence: {kind:?} last_count={last_count} ausente={since_secs:.1}s");
}
}
}
}
IntrospectResponse::GcResult { deleted, freed_bytes } => {
println!("CAS gc: {deleted} blobs eliminados, {freed_bytes} bytes liberados");
}
IntrospectResponse::AuditStreamFrame(_) => {
// En modo request/response no debería llegar; solo aparece en
// run_stream_audit. Si llega aquí es un bug del servidor.
eprintln!("frame de stream recibido fuera de stream-audit (bug)");
}
IntrospectResponse::Error(e) => eprintln!("error: {e}"),
}
}
fn hex_short(sha: [u8; 32]) -> String {
sha[..4].iter().map(|b| format!("{:02x}", b)).collect::<String>() + ".."
}
fn hex_long(sha: [u8; 32]) -> String {
sha.iter().map(|b| format!("{:02x}", b)).collect()
}
async fn run_stream_audit(path: PathBuf) -> anyhow::Result<()> {
let mut stream = UnixStream::connect(&path).await?;
let req = IntrospectRequest::StreamAudit;
let buf = bincode::serialize(&req)?;
stream.write_u32(buf.len() as u32).await?;
stream.write_all(&buf).await?;
eprintln!("audit stream conectado a {} — Ctrl-C para salir", path.display());
loop {
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() {
eprintln!("\nstream cerrado por el servidor");
return Ok(());
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > 4 * 1024 * 1024 { anyhow::bail!("frame oversize"); }
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).await?;
let resp: IntrospectResponse = bincode::deserialize(&buf)?;
match resp {
IntrospectResponse::AuditStreamFrame(entry) => {
let prev = entry.prev_sha
.map(|s| s[..4].iter().map(|b| format!("{:02x}", b)).collect::<String>() + "..")
.unwrap_or_else(|| "".into());
let sha = entry.sha[..4].iter().map(|b| format!("{:02x}", b))
.collect::<String>() + "..";
println!("[stream] seq={} prev={} sha={} {:?}",
entry.seq, prev, sha, entry.action);
}
other => {
eprintln!("frame no esperado en stream: {other:?}");
return Ok(());
}
}
}
}
#[allow(dead_code)]
fn _suppress(_: &Path) {} // mantener Path import si compilador se queja