//! Introspect API. Unix Domain Socket + framing length-prefijo + bincode. //! //! Una herramienta externa (ej. `brainctl`) puede consultar el estado del //! cerebro sin tocar el bus interno del fractal. Esto separa observación de //! ejecución — la introspección es read-only por diseño. use crate::crystallize::{detect_crystals, Crystal, CrystallizationParams}; use crate::engine::RuleEngine; use crate::observer::Observer; use crate::rules::Rule; use serde::{Deserialize, Serialize}; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::RwLock; use tracing::{debug, info, trace, warn}; use ulid::Ulid; const MAX_FRAME: usize = 4 * 1024 * 1024; // 4 MiB — correlation matrices crecen /// Estado compartido entre el bucle del Init y el servidor de introspección. /// `Arc>` permite muchos lectores concurrentes (introspect) y /// un escritor (el dispatcher de eventos en el bucle primordial). #[derive(Clone)] pub struct BrainState { pub engine: Arc>, pub observer: Arc>, pub params: CrystallizationParams, /// Path opcional donde apendear reglas promovidas como KCL. Si Some, /// cada PromoteCrystal añade el snippet al archivo (append-only). pub rules_out: Option>, /// Audit log en memoria. Cada promote/remove deja huella aquí. pub audit: Arc>, } impl BrainState { pub fn new(window_size: usize) -> Self { Self::with_params(window_size, CrystallizationParams::default()) } pub fn with_params(window_size: usize, params: CrystallizationParams) -> Self { Self { engine: Arc::new(RwLock::new(RuleEngine::empty())), observer: Arc::new(RwLock::new(Observer::new(window_size))), params, rules_out: None, audit: Arc::new(RwLock::new(crate::audit::AuditLog::new())), } } pub fn with_rules_out(mut self, path: PathBuf) -> Self { self.rules_out = Some(Arc::new(path)); self } } /// Append-only writer del KCL snippet a `rules_out`. Crea el archivo con /// header si no existe; en caso contrario sólo apendea. pub fn append_kcl_snippet(path: &Path, snippet: &str) -> std::io::Result<()> { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent)?; } let exists = path.exists(); let mut file = std::fs::OpenOptions::new() .create(true) .append(true) .open(path)?; if !exists { writeln!(file, "# Reglas promovidas automáticamente desde cristales.")?; writeln!(file, "# Cada bloque proviene de PromoteCrystal vía brainctl.")?; writeln!(file)?; } writeln!(file, "{snippet}")?; Ok(()) } #[derive(Debug, Serialize, Deserialize)] pub enum IntrospectRequest { /// Lista resumida de reglas vivas. ListRules, /// Detalle de una regla concreta. GetRule(Ulid), /// Snapshot de la entropía y conteos básicos. EntropySnapshot, /// Top N pares (a, b) por co-ocurrencia. TopCorrelations { n: usize }, /// Cristales detectados con los parámetros del BrainState. Crystals, /// Genera el snippet KCL de un cristal específico (índice tras Crystals). CrystalKcl { index: usize }, /// Promueve el cristal #index a regla viva en el motor. Devuelve el /// rule_id asignado y el snippet KCL para auditoría/persistencia. PromoteCrystal { index: usize }, /// Elimina una regla viva por id. Útil para revertir un promote. RemoveRule { id: Ulid }, /// Lista las últimas N entradas del audit log. limit=0 = todas. ListAudit { limit: usize }, /// Persiste todas las entries pendientes al CAS y actualiza el head /// pointer si el log lo tiene configurado. FlushAudit, /// Recarga reglas desde el archivo configurado por --rules-out (o el /// path provisto). Vacía el engine antes de cargar. ReloadRules { path: Option }, } #[derive(Debug, Serialize, Deserialize)] pub enum IntrospectResponse { Rules(Vec), Rule(Option), Entropy { value_bits: f64, sample_size: u64, distinct_kinds: usize, window_full: bool }, Correlations(Vec), Crystals(Vec), Kcl(String), /// Resultado de PromoteCrystal: id de la regla creada + snippet KCL para /// que el operador lo persista en disco si quiere. Promoted { rule_id: Ulid, kcl_snippet: String }, /// Resultado de RemoveRule: true si existía, false si ya no. Removed(bool), /// Entradas del audit log (más recientes al final). AuditEntries(Vec), /// Resultado de FlushAudit: cuántas entries se escribieron y SHA del head. Flushed { written: usize, head_sha: Option<[u8; 32]>, total_flushed: u64 }, /// Resultado de ReloadRules: número total de reglas tras el reload. Reloaded { count: usize }, Error(String), } #[derive(Debug, Serialize, Deserialize)] pub struct RuleSummary { pub id: Ulid, pub priority: u8, pub event_kind_tag: String, pub action_count: usize, pub scope_wildcard: bool, } #[derive(Debug, Serialize, Deserialize)] pub struct CorrelationEntry { pub a: String, pub b: String, pub joint_count: u64, pub conditional_prob: f64, pub pmi_bits: f64, } pub struct IntrospectServer { state: BrainState, } impl IntrospectServer { pub fn new(state: BrainState) -> Self { Self { state } } /// Spawn del listener. Devuelve cuando bind() falla; en caso contrario /// corre indefinidamente. pub async fn serve(self, path: &Path) -> anyhow::Result<()> { let _ = std::fs::remove_file(path); if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let listener = UnixListener::bind(path)?; info!(path = %path.display(), "brain introspect escuchando"); let arc_self = Arc::new(self); loop { match listener.accept().await { Ok((stream, _)) => { trace!("introspect conn aceptada"); let me = arc_self.clone(); tokio::spawn(async move { if let Err(e) = me.handle(stream).await { warn!(?e, "introspect conn ended"); } }); } Err(e) => { warn!(?e, "introspect accept failed"); return Ok(()); } } } } async fn handle(self: Arc, mut stream: UnixStream) -> anyhow::Result<()> { loop { let mut len_buf = [0u8; 4]; if stream.read_exact(&mut len_buf).await.is_err() { return Ok(()); // EOF } let len = u32::from_be_bytes(len_buf) as usize; if len > MAX_FRAME { anyhow::bail!("frame oversize: {len}"); } let mut buf = vec![0u8; len]; stream.read_exact(&mut buf).await?; let req: IntrospectRequest = bincode::deserialize(&buf)?; debug!(?req, "introspect request"); let resp = self.dispatch(req).await; let out = bincode::serialize(&resp)?; stream.write_u32(out.len() as u32).await?; stream.write_all(&out).await?; } } async fn dispatch(&self, req: IntrospectRequest) -> IntrospectResponse { match req { IntrospectRequest::ListRules => { let engine = self.state.engine.read().await; let rules = engine.rules() .map(|r| RuleSummary { id: r.id, priority: r.priority, event_kind_tag: format!("{:?}", r.when), action_count: r.then.len(), scope_wildcard: r.scope.is_wildcard(), }) .collect(); IntrospectResponse::Rules(rules) } IntrospectRequest::GetRule(id) => { let engine = self.state.engine.read().await; let found = engine.rules() .find(|r| r.id == id) .map(|r| Rule::clone(r)); IntrospectResponse::Rule(found) } IntrospectRequest::EntropySnapshot => { let obs = self.state.observer.read().await; IntrospectResponse::Entropy { value_bits: obs.shannon_entropy(), sample_size: obs.total(), distinct_kinds: obs.marginals().len(), window_full: obs.current_window() >= obs.window_size(), } } IntrospectRequest::TopCorrelations { n } => { let obs = self.state.observer.read().await; let mut entries: Vec = obs.cooccurrences().iter() .map(|((a, b), &joint)| CorrelationEntry { a: format!("{:?}", a), b: format!("{:?}", b), joint_count: joint, conditional_prob: obs.conditional_prob(a, b), pmi_bits: obs.pmi(a, b), }) .collect(); entries.sort_by(|x, y| y.joint_count.cmp(&x.joint_count)); entries.truncate(n); IntrospectResponse::Correlations(entries) } IntrospectRequest::Crystals => { let obs = self.state.observer.read().await; let crystals = detect_crystals(&obs, &self.state.params); IntrospectResponse::Crystals(crystals) } IntrospectRequest::CrystalKcl { index } => { let obs = self.state.observer.read().await; let crystals = detect_crystals(&obs, &self.state.params); match crystals.get(index) { Some(c) => IntrospectResponse::Kcl(crate::crystallize::crystal_to_kcl(c)), None => IntrospectResponse::Error(format!("no crystal at index {index}")), } } IntrospectRequest::PromoteCrystal { index } => { let crystals = { let obs = self.state.observer.read().await; detect_crystals(&obs, &self.state.params) }; match crystals.get(index) { Some(c) => { let rule = crate::crystallize::crystal_to_rule(c); let snippet = crate::crystallize::crystal_to_kcl(c); let rule_id = rule.id; self.state.engine.write().await.insert(rule); // Persistencia opcional al archivo KCL. if let Some(path) = self.state.rules_out.as_ref() { if let Err(e) = append_kcl_snippet(path, &snippet) { warn!(?e, path = %path.display(), "rules_out append falló"); } else { info!(path = %path.display(), %rule_id, "regla persistida a .k"); } } // Audit entry self.state.audit.write().await.append( crate::audit::AuditAction::PromoteCrystal { rule_id, crystal: c.clone(), } ); IntrospectResponse::Promoted { rule_id, kcl_snippet: snippet } } None => IntrospectResponse::Error(format!("no crystal at index {index}")), } } IntrospectRequest::RemoveRule { id } => { let removed = self.state.engine.write().await.remove(id); if removed { self.state.audit.write().await.append( crate::audit::AuditAction::RemoveRule { rule_id: id } ); } IntrospectResponse::Removed(removed) } IntrospectRequest::ListAudit { limit } => { let audit = self.state.audit.read().await; IntrospectResponse::AuditEntries(audit.recent(limit).cloned().collect()) } IntrospectRequest::FlushAudit => { let mut audit = self.state.audit.write().await; match audit.flush_to_cas() { Ok(written) => IntrospectResponse::Flushed { written, head_sha: audit.last_flushed_sha(), total_flushed: audit.flushed_count(), }, Err(e) => IntrospectResponse::Error(format!("flush_to_cas: {e}")), } } IntrospectRequest::ReloadRules { path } => { // Path explícito gana sobre el rules_out configurado. let resolved = path.map(std::path::PathBuf::from) .or_else(|| self.state.rules_out.as_ref().map(|p| p.as_path().to_path_buf())); let path = match resolved { Some(p) => p, None => return IntrospectResponse::Error( "ReloadRules sin path y sin rules_out configurado".into() ), }; let rules = match crate::kcl_loader::load_rules_file(&path) { Ok(r) => r, Err(e) => return IntrospectResponse::Error(format!("load: {e}")), }; // Vaciamos el engine antes de re-cargar — semántica clean-slate. let mut engine = self.state.engine.write().await; *engine = crate::engine::RuleEngine::empty(); let count = rules.len(); for r in rules { engine.insert(r); } drop(engine); self.state.audit.write().await.append( crate::audit::AuditAction::LoadRulesFile { path: path.to_string_lossy().into_owned(), count, } ); IntrospectResponse::Reloaded { count } } } } } // Cliente helper para tools externos (brainctl). pub async fn call(path: &Path, req: IntrospectRequest) -> anyhow::Result { let mut stream = UnixStream::connect(path).await?; let buf = bincode::serialize(&req)?; stream.write_u32(buf.len() as u32).await?; stream.write_all(&buf).await?; let mut len_buf = [0u8; 4]; stream.read_exact(&mut len_buf).await?; let len = u32::from_be_bytes(len_buf) as usize; if len > MAX_FRAME { anyhow::bail!("response oversize: {len}"); } let mut buf = vec![0u8; len]; stream.read_exact(&mut buf).await?; Ok(bincode::deserialize(&buf)?) } /// Consume la lista marginal del observer para humanos. Suprime el detalle /// crudo de `EventKind` (ej. payloads largos en BusInvokeOf). pub fn marginal_summary(obs: &Observer) -> Vec<(String, u64)> { let mut entries: Vec<(String, u64)> = obs.marginals().iter() .map(|(k, &c)| (format!("{:?}", k), c)) .collect(); entries.sort_by(|x, y| y.1.cmp(&x.1)); entries }