Pausa: 11 crates del fractal Ente #0 con cerebro completo

PID 1 boot + bus interno autenticado + cerebro KCL/Rust:
- 6 lib crates de infra (card, bus, cas, kernel, soma, wasm, snapshot)
- ente-brain: motor de reglas O(1), observer Shannon, cristalización,
  audit hash-chain, persistencia rules.k, Prometheus /metrics
- KCL schemas card.k + rule.k como gramática autoritativa
- compat-logind D-Bus, ente-echo demo provider, ente-zero PID 1
- 22 tests OK, ~3.8k LOC Rust + ~300 LOC KCL

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-03 22:57:44 +00:00
parent dc9c99528d
commit d6b8f18b43
53 changed files with 7753 additions and 0 deletions
+330
View File
@@ -0,0 +1,330 @@
//! Introspect API. Unix Domain Socket + framing length-prefijo + bincode.
//!
//! Una herramienta externa (ej. `brainctl`) puede consultar el estado del
//! cerebro sin tocar el bus interno del fractal. Esto separa observación de
//! ejecución — la introspección es read-only por diseño.
use crate::crystallize::{detect_crystals, Crystal, CrystallizationParams};
use crate::engine::RuleEngine;
use crate::observer::Observer;
use crate::rules::Rule;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::RwLock;
use tracing::{debug, info, trace, warn};
use ulid::Ulid;
const MAX_FRAME: usize = 4 * 1024 * 1024; // 4 MiB — correlation matrices crecen
/// Estado compartido entre el bucle del Init y el servidor de introspección.
/// `Arc<RwLock<...>>` permite muchos lectores concurrentes (introspect) y
/// un escritor (el dispatcher de eventos en el bucle primordial).
#[derive(Clone)]
pub struct BrainState {
pub engine: Arc<RwLock<RuleEngine>>,
pub observer: Arc<RwLock<Observer>>,
pub params: CrystallizationParams,
/// Path opcional donde apendear reglas promovidas como KCL. Si Some,
/// cada PromoteCrystal añade el snippet al archivo (append-only).
pub rules_out: Option<Arc<PathBuf>>,
/// Audit log en memoria. Cada promote/remove deja huella aquí.
pub audit: Arc<RwLock<crate::audit::AuditLog>>,
}
impl BrainState {
pub fn new(window_size: usize) -> Self {
Self::with_params(window_size, CrystallizationParams::default())
}
pub fn with_params(window_size: usize, params: CrystallizationParams) -> Self {
Self {
engine: Arc::new(RwLock::new(RuleEngine::empty())),
observer: Arc::new(RwLock::new(Observer::new(window_size))),
params,
rules_out: None,
audit: Arc::new(RwLock::new(crate::audit::AuditLog::new())),
}
}
pub fn with_rules_out(mut self, path: PathBuf) -> Self {
self.rules_out = Some(Arc::new(path));
self
}
}
/// Append-only writer del KCL snippet a `rules_out`. Crea el archivo con
/// header si no existe; en caso contrario sólo apendea.
pub fn append_kcl_snippet(path: &Path, snippet: &str) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let exists = path.exists();
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
if !exists {
writeln!(file, "# Reglas promovidas automáticamente desde cristales.")?;
writeln!(file, "# Cada bloque proviene de PromoteCrystal vía brainctl.")?;
writeln!(file)?;
}
writeln!(file, "{snippet}")?;
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub enum IntrospectRequest {
/// Lista resumida de reglas vivas.
ListRules,
/// Detalle de una regla concreta.
GetRule(Ulid),
/// Snapshot de la entropía y conteos básicos.
EntropySnapshot,
/// Top N pares (a, b) por co-ocurrencia.
TopCorrelations { n: usize },
/// Cristales detectados con los parámetros del BrainState.
Crystals,
/// Genera el snippet KCL de un cristal específico (índice tras Crystals).
CrystalKcl { index: usize },
/// Promueve el cristal #index a regla viva en el motor. Devuelve el
/// rule_id asignado y el snippet KCL para auditoría/persistencia.
PromoteCrystal { index: usize },
/// Elimina una regla viva por id. Útil para revertir un promote.
RemoveRule { id: Ulid },
/// Lista las últimas N entradas del audit log. limit=0 = todas.
ListAudit { limit: usize },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum IntrospectResponse {
Rules(Vec<RuleSummary>),
Rule(Option<Rule>),
Entropy { value_bits: f64, sample_size: u64, distinct_kinds: usize, window_full: bool },
Correlations(Vec<CorrelationEntry>),
Crystals(Vec<Crystal>),
Kcl(String),
/// Resultado de PromoteCrystal: id de la regla creada + snippet KCL para
/// que el operador lo persista en disco si quiere.
Promoted { rule_id: Ulid, kcl_snippet: String },
/// Resultado de RemoveRule: true si existía, false si ya no.
Removed(bool),
/// Entradas del audit log (más recientes al final).
AuditEntries(Vec<crate::audit::AuditEntry>),
Error(String),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RuleSummary {
pub id: Ulid,
pub priority: u8,
pub event_kind_tag: String,
pub action_count: usize,
pub scope_wildcard: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CorrelationEntry {
pub a: String,
pub b: String,
pub joint_count: u64,
pub conditional_prob: f64,
pub pmi_bits: f64,
}
pub struct IntrospectServer {
state: BrainState,
}
impl IntrospectServer {
pub fn new(state: BrainState) -> Self { Self { state } }
/// Spawn del listener. Devuelve cuando bind() falla; en caso contrario
/// corre indefinidamente.
pub async fn serve(self, path: &Path) -> anyhow::Result<()> {
let _ = std::fs::remove_file(path);
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let listener = UnixListener::bind(path)?;
info!(path = %path.display(), "brain introspect escuchando");
let arc_self = Arc::new(self);
loop {
match listener.accept().await {
Ok((stream, _)) => {
trace!("introspect conn aceptada");
let me = arc_self.clone();
tokio::spawn(async move {
if let Err(e) = me.handle(stream).await {
warn!(?e, "introspect conn ended");
}
});
}
Err(e) => {
warn!(?e, "introspect accept failed");
return Ok(());
}
}
}
}
async fn handle(self: Arc<Self>, mut stream: UnixStream) -> anyhow::Result<()> {
loop {
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() {
return Ok(()); // EOF
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > MAX_FRAME {
anyhow::bail!("frame oversize: {len}");
}
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).await?;
let req: IntrospectRequest = bincode::deserialize(&buf)?;
debug!(?req, "introspect request");
let resp = self.dispatch(req).await;
let out = bincode::serialize(&resp)?;
stream.write_u32(out.len() as u32).await?;
stream.write_all(&out).await?;
}
}
async fn dispatch(&self, req: IntrospectRequest) -> IntrospectResponse {
match req {
IntrospectRequest::ListRules => {
let engine = self.state.engine.read().await;
let rules = engine.rules()
.map(|r| RuleSummary {
id: r.id,
priority: r.priority,
event_kind_tag: format!("{:?}", r.when),
action_count: r.then.len(),
scope_wildcard: r.scope.is_wildcard(),
})
.collect();
IntrospectResponse::Rules(rules)
}
IntrospectRequest::GetRule(id) => {
let engine = self.state.engine.read().await;
let found = engine.rules()
.find(|r| r.id == id)
.map(|r| Rule::clone(r));
IntrospectResponse::Rule(found)
}
IntrospectRequest::EntropySnapshot => {
let obs = self.state.observer.read().await;
IntrospectResponse::Entropy {
value_bits: obs.shannon_entropy(),
sample_size: obs.total(),
distinct_kinds: obs.marginals().len(),
window_full: obs.current_window() >= obs.window_size(),
}
}
IntrospectRequest::TopCorrelations { n } => {
let obs = self.state.observer.read().await;
let mut entries: Vec<CorrelationEntry> = obs.cooccurrences().iter()
.map(|((a, b), &joint)| CorrelationEntry {
a: format!("{:?}", a),
b: format!("{:?}", b),
joint_count: joint,
conditional_prob: obs.conditional_prob(a, b),
pmi_bits: obs.pmi(a, b),
})
.collect();
entries.sort_by(|x, y| y.joint_count.cmp(&x.joint_count));
entries.truncate(n);
IntrospectResponse::Correlations(entries)
}
IntrospectRequest::Crystals => {
let obs = self.state.observer.read().await;
let crystals = detect_crystals(&obs, &self.state.params);
IntrospectResponse::Crystals(crystals)
}
IntrospectRequest::CrystalKcl { index } => {
let obs = self.state.observer.read().await;
let crystals = detect_crystals(&obs, &self.state.params);
match crystals.get(index) {
Some(c) => IntrospectResponse::Kcl(crate::crystallize::crystal_to_kcl(c)),
None => IntrospectResponse::Error(format!("no crystal at index {index}")),
}
}
IntrospectRequest::PromoteCrystal { index } => {
let crystals = {
let obs = self.state.observer.read().await;
detect_crystals(&obs, &self.state.params)
};
match crystals.get(index) {
Some(c) => {
let rule = crate::crystallize::crystal_to_rule(c);
let snippet = crate::crystallize::crystal_to_kcl(c);
let rule_id = rule.id;
self.state.engine.write().await.insert(rule);
// Persistencia opcional al archivo KCL.
if let Some(path) = self.state.rules_out.as_ref() {
if let Err(e) = append_kcl_snippet(path, &snippet) {
warn!(?e, path = %path.display(), "rules_out append falló");
} else {
info!(path = %path.display(), %rule_id, "regla persistida a .k");
}
}
// Audit entry
self.state.audit.write().await.append(
crate::audit::AuditAction::PromoteCrystal {
rule_id, crystal: c.clone(),
}
);
IntrospectResponse::Promoted { rule_id, kcl_snippet: snippet }
}
None => IntrospectResponse::Error(format!("no crystal at index {index}")),
}
}
IntrospectRequest::RemoveRule { id } => {
let removed = self.state.engine.write().await.remove(id);
if removed {
self.state.audit.write().await.append(
crate::audit::AuditAction::RemoveRule { rule_id: id }
);
}
IntrospectResponse::Removed(removed)
}
IntrospectRequest::ListAudit { limit } => {
let audit = self.state.audit.read().await;
IntrospectResponse::AuditEntries(audit.recent(limit).cloned().collect())
}
}
}
}
// Cliente helper para tools externos (brainctl).
pub async fn call(path: &Path, req: IntrospectRequest) -> anyhow::Result<IntrospectResponse> {
let mut stream = UnixStream::connect(path).await?;
let buf = bincode::serialize(&req)?;
stream.write_u32(buf.len() as u32).await?;
stream.write_all(&buf).await?;
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > MAX_FRAME {
anyhow::bail!("response oversize: {len}");
}
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).await?;
Ok(bincode::deserialize(&buf)?)
}
/// Consume la lista marginal del observer para humanos. Suprime el detalle
/// crudo de `EventKind` (ej. payloads largos en BusInvokeOf).
pub fn marginal_summary(obs: &Observer) -> Vec<(String, u64)> {
let mut entries: Vec<(String, u64)> = obs.marginals().iter()
.map(|(k, &c)| (format!("{:?}", k), c))
.collect();
entries.sort_by(|x, y| y.1.cmp(&x.1));
entries
}