b83d40a833
Rename batch de la Fase A del PLAN_MACRO: - 25 crates ente-* → arje-* (protocol/init/runtime/compat). El linaje arje (init Linux) queda con prefijo coherente. - vista → revista (revista-core + revista-web). - pluma → fana (fana-md + fana-md-reader-web). fana absorbe el linaje markdown de pluma; será el writer DAG editor (prioridad alta). Cambios: - git mv de 29 crate dirs + 2 SDDs - package/lib/bin names + path refs + imports .rs reescritos - workspace Cargo.toml + comentarios de sección - SDDs de init/runtime/compat/protocol actualizados a arje- - SDD de revista + SDD de fana (reescrito: writer DAG editor) - docs/STATUS.md, ROADMAP.md, PLAN_MACRO.md, arje-boot.md, arje-replace-systemd.md actualizados - docs/changelog/akasha.md → chasqui.md scripts/rename-fase-a.py idempotente (--dry-run soportado). cargo check --workspace verde. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
792 lines
30 KiB
Rust
792 lines
30 KiB
Rust
//! Ente #0 — el primer Ente. PID 1 del fractal.
|
|
//!
|
|
//! Reglas no negociables:
|
|
//! 1. NUNCA lógica de servicio aquí. Sólo: leer Semilla, cosechar zombis,
|
|
//! mediar capacidades, propagar eventos.
|
|
//! 2. Single-threaded. Cualquier paralelismo se delega a Entes worker.
|
|
//! Un panic en un thread de PID 1 = kernel panic.
|
|
//! 3. Errores de hijos son *eventos* en `graph_tx`, no `Result` propagado.
|
|
//!
|
|
//! Este archivo es sólo wireup. La lógica vive en:
|
|
//! - `seed` : construcción/restauración de la Tarjeta Semilla
|
|
//! - `bus` : listener Unix + auth via SO_PEERCRED
|
|
//! - `graph::*` : estado del fractal (lifecycle, topology, shutdown,
|
|
//! bus_mediator, devices, capabilities)
|
|
//! - `events` : tipos de eventos del bucle primordial
|
|
//! - crates externos del workspace para CAS, soma, wasm, snapshot, kernel.
|
|
|
|
mod brain_glue;
|
|
mod bus;
|
|
mod events;
|
|
mod graph;
|
|
mod keypair_store;
|
|
mod seed;
|
|
|
|
use anyhow::Context;
|
|
use arje_brain::{BrainState, IntrospectServer};
|
|
use arje_kernel::{become_child_subreaper, bootstrap_kernel_surface, spawn_sigchld_stream, spawn_uevent_stream};
|
|
use events::{ExitStatus, GraphEvent, ShutdownReason};
|
|
use graph::EnteGraph;
|
|
use nix::errno::Errno;
|
|
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
|
|
use nix::unistd::{getpid, Pid};
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tracing::{error, info, warn};
|
|
|
|
struct CliArgs {
|
|
checkpoint: Option<PathBuf>,
|
|
restore: Option<PathBuf>,
|
|
rules: Option<PathBuf>,
|
|
rules_out: Option<PathBuf>,
|
|
audit_head: Option<PathBuf>,
|
|
metrics_addr: Option<String>,
|
|
brain_half_life: Option<f64>,
|
|
autopromote_secs: Option<u64>,
|
|
}
|
|
|
|
fn parse_args() -> CliArgs {
|
|
let mut args = std::env::args().skip(1);
|
|
let mut checkpoint = None;
|
|
let mut restore = None;
|
|
let mut rules = None;
|
|
let mut rules_out = None;
|
|
let mut audit_head = None;
|
|
let mut metrics_addr = None;
|
|
let mut brain_half_life = None;
|
|
let mut autopromote_secs = None;
|
|
while let Some(a) = args.next() {
|
|
match a.as_str() {
|
|
"--checkpoint" => checkpoint = args.next().map(PathBuf::from),
|
|
"--restore" => restore = args.next().map(PathBuf::from),
|
|
"--rules" => rules = args.next().map(PathBuf::from),
|
|
"--rules-out" => rules_out = args.next().map(PathBuf::from),
|
|
"--audit-head" => audit_head = args.next().map(PathBuf::from),
|
|
"--metrics-addr" => metrics_addr = args.next(),
|
|
"--brain-half-life" => brain_half_life = args.next().and_then(|s| s.parse().ok()),
|
|
"--autopromote-secs" => autopromote_secs = args.next().and_then(|s| s.parse().ok()),
|
|
other => warn!(arg = %other, "argumento desconocido, ignorado"),
|
|
}
|
|
}
|
|
CliArgs {
|
|
checkpoint, restore, rules, rules_out, audit_head,
|
|
metrics_addr, brain_half_life, autopromote_secs,
|
|
}
|
|
}
|
|
|
|
fn main() -> anyhow::Result<()> {
|
|
init_tracing();
|
|
let cli = parse_args();
|
|
let pid = getpid();
|
|
let dev_mode = pid != Pid::from_raw(1);
|
|
|
|
if dev_mode {
|
|
warn!(?pid, "ente-zero corriendo en DEV MODE (no PID 1) — kernel surface no se monta");
|
|
} else {
|
|
info!("ente-zero despierta como PID 1");
|
|
bootstrap_kernel_surface().context("bootstrap kernel surface")?;
|
|
become_child_subreaper().context("PR_SET_CHILD_SUBREAPER")?;
|
|
}
|
|
|
|
let card = seed::load(dev_mode, cli.restore.as_deref())?;
|
|
|
|
// current_thread runtime: ver doctrina al inicio del módulo.
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_io()
|
|
.enable_time()
|
|
.build()?;
|
|
|
|
rt.block_on(primordial_loop(
|
|
card, dev_mode,
|
|
cli.checkpoint, cli.restore, cli.rules, cli.rules_out,
|
|
cli.audit_head, cli.metrics_addr, cli.brain_half_life,
|
|
cli.autopromote_secs,
|
|
))
|
|
}
|
|
|
|
async fn primordial_loop(
|
|
seed_card: arje_card::EntityCard,
|
|
dev_mode: bool,
|
|
checkpoint_path: Option<PathBuf>,
|
|
restore_path: Option<PathBuf>,
|
|
rules_path: Option<PathBuf>,
|
|
rules_out: Option<PathBuf>,
|
|
audit_head: Option<PathBuf>,
|
|
metrics_addr: Option<String>,
|
|
brain_half_life: Option<f64>,
|
|
autopromote_secs: Option<u64>,
|
|
) -> anyhow::Result<()> {
|
|
info!(seed_id = %seed_card.id, label = %seed_card.label, "Ente #0 entra al bucle primordial");
|
|
|
|
let (graph_tx, mut graph_rx) = mpsc::channel::<GraphEvent>(64);
|
|
let mut sigchld = spawn_sigchld_stream()?;
|
|
// Uevents puede fallar en dev (sin CAP_NET_ADMIN). Degradamos a un
|
|
// canal nunca-listo en lugar de abortar el bucle primordial.
|
|
let mut uevents = match spawn_uevent_stream() {
|
|
Ok(rx) => rx,
|
|
Err(e) => {
|
|
warn!(?e, "uevents deshabilitados (probablemente falta CAP_NET_ADMIN)");
|
|
let (_keep_tx, rx) = mpsc::channel::<arje_kernel::UEvent>(1);
|
|
std::mem::forget(_keep_tx);
|
|
rx
|
|
}
|
|
};
|
|
|
|
// Bus interno: listener antes de spawn de hijos para que su Announce
|
|
// tenga adónde llegar. Su path se inyecta en ENTE_BUS_SOCK por soma.
|
|
let bus_sock = bus::default_socket_path();
|
|
let bus_path = bus::spawn_bus(bus_sock, graph_tx.clone())?;
|
|
arje_soma::set_bus_sock(bus_path.to_string_lossy().into_owned());
|
|
|
|
// Brahman protocol: handshake socket + broker compartido.
|
|
//
|
|
// Es un canal paralelo al ente-bus, dedicado a módulos "brahman
|
|
// conscientes" que se presentan con una Card y declaran flujos
|
|
// tipados. Si el bind falla (socket en uso, FS no escribible),
|
|
// degradamos a "modo bus-only" — la doctrina de PID 1 no rompe
|
|
// por subsistemas opcionales.
|
|
// Contexto operativo del broker: configurable por env var. Útil para
|
|
// distinguir test/prod/foreground sin recompilar. Sin la var, los
|
|
// biases per-contexto declarados en las Cards quedan inactivos.
|
|
let broker_context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok();
|
|
if let Some(ctx) = &broker_context {
|
|
info!(context = %ctx, "brahman broker bajo contexto operativo");
|
|
}
|
|
let brahman_broker = std::sync::Arc::new(tokio::sync::Mutex::new(
|
|
brahman_broker::Broker::new(brahman_broker::BrokerConfig {
|
|
strategy: brahman_broker::MatchStrategy::default(),
|
|
current_context: broker_context.clone(),
|
|
}),
|
|
));
|
|
|
|
// Brahman-net opcional: si BRAHMAN_LISTEN_MULTIADDR está set,
|
|
// levantamos la malla P2P y la pasamos como ServerConfig.net (Fase
|
|
// 2 wire) para que cada Card con outputs se anuncie al DHT y
|
|
// pueda ser descubierta por nodos remotos. Identidad libp2p
|
|
// persistida en disco vía keypair_store (peer_id estable across
|
|
// reboots).
|
|
let brahman_net = setup_brahman_net(dev_mode).await;
|
|
|
|
// Política opcional de peers libp2p: allowlist + denylist + hot
|
|
// reload. Activada si BRAHMAN_PEER_ALLOWLIST o BRAHMAN_PEER_DENYLIST
|
|
// están set. Sin ninguna, modo totalmente abierto (Fase 3 sin
|
|
// restricción adicional). El watcher se queda vivo en background
|
|
// observando los archivos para hot reload.
|
|
let (brahman_policy, _policy_watcher) = setup_brahman_policy();
|
|
|
|
// Si tenemos AMBOS net y policy, attachamos: el deny de la
|
|
// policy se proyecta al block_list del swarm para rechazar
|
|
// conexiones ANTES del Noise handshake (más eficiente que
|
|
// rechazar en el handshake brahman). Cada hot-reload de la
|
|
// policy también re-sincroniza vía diff.
|
|
if let (Some(net), Some(policy)) = (&brahman_net, &brahman_policy) {
|
|
policy.attach_to_net(net.clone());
|
|
let (allow, deny) = policy.sizes();
|
|
info!(
|
|
allow = ?allow,
|
|
deny = deny,
|
|
"policy attached al swarm — denies enforcedeados a nivel libp2p"
|
|
);
|
|
}
|
|
|
|
let brahman_sock = brahman_handshake::transport::default_socket_path();
|
|
match brahman_handshake::server::Server::bind(
|
|
&brahman_sock,
|
|
brahman_handshake::server::ServerConfig {
|
|
init_attached: true,
|
|
broker: Some(brahman_broker.clone()),
|
|
net: brahman_net.clone(),
|
|
policy: brahman_policy.clone(),
|
|
},
|
|
) {
|
|
Ok(server) => {
|
|
info!(socket = %brahman_sock.display(), "brahman handshake escuchando (Unix)");
|
|
// Si hay malla P2P, además del Unix accept loop levantamos
|
|
// el accept loop libp2p sobre el mismo Server compartido.
|
|
// Las sesiones locales y remotas conviven en las mismas
|
|
// tablas (sessions, push_table, broker).
|
|
let server = std::sync::Arc::new(server);
|
|
if let Some(net) = brahman_net.clone() {
|
|
let s_libp2p = server.clone();
|
|
let n_libp2p = net.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = brahman_handshake::network::run_libp2p_accept_loop(
|
|
s_libp2p, n_libp2p,
|
|
)
|
|
.await
|
|
{
|
|
warn!(?e, "brahman handshake libp2p accept loop cayó");
|
|
}
|
|
});
|
|
info!(
|
|
"brahman handshake escuchando también vía libp2p (peer_id {})",
|
|
net.peer_id
|
|
);
|
|
}
|
|
// Unix accept loop: usa Arc<Server> en lugar del consume
|
|
// de run() para coexistir con el libp2p accept loop.
|
|
let s_unix = server.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match s_unix.accept_one().await {
|
|
Ok(session) => {
|
|
tokio::spawn(async move {
|
|
if let Err(e) = session.handle().await {
|
|
warn!(?e, "session Unix terminó con error");
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
warn!(?e, "brahman handshake accept_one Unix falló");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
warn!(?e, socket = %brahman_sock.display(), "brahman handshake deshabilitado");
|
|
}
|
|
}
|
|
|
|
// Brahman admin: socket separado para snapshots de estado (sesiones +
|
|
// matches del broker). Misma política de degradación grácil.
|
|
let admin_sock = brahman_admin::transport::default_socket_path();
|
|
match brahman_admin::server::AdminServer::bind(
|
|
&admin_sock,
|
|
brahman_broker.clone(),
|
|
brahman_admin::server::AdminConfig {
|
|
init_attached: true,
|
|
current_context: broker_context.clone(),
|
|
},
|
|
) {
|
|
Ok(admin) => {
|
|
info!(socket = %admin_sock.display(), "brahman admin escuchando");
|
|
tokio::spawn(async move {
|
|
if let Err(e) = admin.run().await {
|
|
warn!(?e, "brahman admin server cayó");
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
warn!(?e, socket = %admin_sock.display(), "brahman admin deshabilitado");
|
|
}
|
|
}
|
|
|
|
let mut graph = EnteGraph::new(seed_card);
|
|
graph.instantiate_seed_dependencies(&graph_tx).await?;
|
|
|
|
// Cerebro: BrainState compartido + servidor de introspección.
|
|
// Window de 1024 eventos — suficiente para correlaciones interesantes
|
|
// sin gastar memoria de PID 1. En dev bajamos el umbral de cristalización
|
|
// para que el demo (pocos eventos) produzca cristales observables.
|
|
let mut brain = if dev_mode {
|
|
// Umbrales relajados para que el demo (pocos eventos) produzca
|
|
// cristales observables. Con P(b|a) normalizada a [0,1], los
|
|
// valores típicos en muestras pequeñas son 0.2-0.5.
|
|
BrainState::with_params(1024, arje_brain::CrystallizationParams {
|
|
min_support: 2,
|
|
min_conditional_prob: 0.3,
|
|
min_pmi: 1.0,
|
|
})
|
|
} else {
|
|
BrainState::new(1024)
|
|
};
|
|
if let Some(out_path) = rules_out {
|
|
brain = brain.with_rules_out(out_path);
|
|
}
|
|
if let Some(hl) = brain_half_life {
|
|
let mut obs = brain.observer.write().await;
|
|
// Reemplazar con un observer nuevo que tenga half-life. Estado
|
|
// anterior (vacío en este punto) descartado.
|
|
*obs = arje_brain::Observer::new(1024).with_half_life(hl);
|
|
info!(hl_secs = hl, "observer con time-decay activo");
|
|
}
|
|
if let Some(secs) = autopromote_secs {
|
|
arje_brain::spawn_autopromote_loop(
|
|
brain.clone(),
|
|
arje_brain::AutopromoteParams {
|
|
interval_secs: secs,
|
|
threshold: brain.params, // mismo threshold que crystals manuales
|
|
},
|
|
);
|
|
}
|
|
|
|
// Brain restore: si hay --restore <path>, cargamos el snapshot adjunto
|
|
// <path>.brain.json. Counters preservados across reboots.
|
|
if let Some(rpath) = &restore_path {
|
|
let brain_path = rpath.with_extension("brain.json");
|
|
if brain_path.exists() {
|
|
match read_brain_snapshot(&brain_path) {
|
|
Ok(snap) => {
|
|
let total = snap.total;
|
|
let kinds = snap.marginal.len();
|
|
let restored = arje_brain::Observer::from_snapshot(snap);
|
|
*brain.observer.write().await = restored;
|
|
info!(
|
|
path = %brain_path.display(),
|
|
total, kinds,
|
|
"brain snapshot restaurado"
|
|
);
|
|
}
|
|
Err(e) => warn!(?e, path = %brain_path.display(), "brain snapshot read falló"),
|
|
}
|
|
}
|
|
}
|
|
// Si --audit-head, configuramos el head pointer y arrancamos auto-flush.
|
|
if let Some(head_path) = audit_head {
|
|
// Re-creamos el AuditLog con head pointer.
|
|
let new_audit = arje_brain::audit::AuditLog::new()
|
|
.with_head_pointer(head_path);
|
|
*brain.audit.write().await = new_audit;
|
|
spawn_audit_auto_flush(brain.clone());
|
|
}
|
|
|
|
// Carga inicial de reglas desde JSON/JSONL si --rules path proporcionado.
|
|
if let Some(path) = &rules_path {
|
|
match arje_brain::load_rules_file(path) {
|
|
Ok(rules) => {
|
|
let mut engine = brain.engine.write().await;
|
|
for r in rules {
|
|
engine.insert(r);
|
|
}
|
|
info!(count = engine.len(), path = %path.display(), "reglas cargadas");
|
|
}
|
|
Err(e) => warn!(?e, path = %path.display(), "carga de reglas falló"),
|
|
}
|
|
}
|
|
|
|
// Endpoint Prometheus opcional. En dev por defecto en 127.0.0.1:9911 si
|
|
// el flag no se pasó.
|
|
let metrics_addr = metrics_addr.or_else(|| {
|
|
if dev_mode { Some("127.0.0.1:9911".to_string()) } else { None }
|
|
});
|
|
if let Some(addr_s) = metrics_addr {
|
|
match addr_s.parse::<std::net::SocketAddr>() {
|
|
Ok(addr) => {
|
|
let s = brain.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = arje_brain::serve_metrics(s, addr).await {
|
|
warn!(?e, "metrics server cayó");
|
|
}
|
|
});
|
|
}
|
|
Err(e) => warn!(?e, addr = %addr_s, "metrics-addr inválido"),
|
|
}
|
|
}
|
|
spawn_brain_introspect(brain.clone());
|
|
let brain_sink = brain_glue::GraphSink {
|
|
graph_tx: graph_tx.clone(),
|
|
// Spawns auto-disparados desde reglas usan la identidad de la Semilla
|
|
// (único Ente con Capability::Spawn por construcción).
|
|
requester: graph.seed_id(),
|
|
};
|
|
|
|
// Demo automático del forwarding (sólo dev, sólo si el binario existe).
|
|
if dev_mode && std::path::Path::new("target/debug/ente-echo").exists() {
|
|
spawn_echo_smoke_test(bus_path.clone());
|
|
}
|
|
|
|
// En dev mode no tenemos hijos por defecto y el bucle se quedaría inerte.
|
|
let dev_exit = if dev_mode {
|
|
Some(tokio::time::sleep(Duration::from_secs(4)))
|
|
} else {
|
|
None
|
|
};
|
|
tokio::pin!(dev_exit);
|
|
|
|
// GC de capability grants expirados — corre cada 10 segundos.
|
|
let mut grant_purge = tokio::time::interval(Duration::from_secs(10));
|
|
grant_purge.tick().await; // descartar primer tick inmediato
|
|
|
|
loop {
|
|
tokio::select! {
|
|
biased;
|
|
|
|
Some(_) = sigchld.recv() => {
|
|
reap_until_empty(&mut graph, &graph_tx).await;
|
|
}
|
|
|
|
Some(uevt) = uevents.recv() => {
|
|
graph.on_uevent(uevt, &graph_tx).await;
|
|
}
|
|
|
|
Some(evt) = graph_rx.recv() => {
|
|
// Cerebro observa antes que el grafo mute. Snapshot del
|
|
// SubjectInfo se hace contra el estado pre-mutación.
|
|
feed_brain(&brain, &brain_sink, &graph, &evt).await;
|
|
if dispatch_graph_event(&mut graph, evt, &graph_tx, &checkpoint_path, &brain).await {
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
_ = grant_purge.tick() => {
|
|
let n = graph.purge_expired_grants();
|
|
if n > 0 {
|
|
info!(purged = n, active = graph.active_grants_count(), "GC capability grants");
|
|
}
|
|
}
|
|
|
|
_ = async { dev_exit.as_mut().as_pin_mut().unwrap().await }, if dev_mode => {
|
|
info!("dev mode: timer expirado, cerrando bucle primordial");
|
|
let _ = graph_tx.send(GraphEvent::Shutdown {
|
|
reason: ShutdownReason::SeedRequested,
|
|
}).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Devuelve `true` si el bucle primordial debe terminar.
|
|
async fn dispatch_graph_event(
|
|
graph: &mut EnteGraph,
|
|
evt: GraphEvent,
|
|
tx: &mpsc::Sender<GraphEvent>,
|
|
checkpoint: &Option<PathBuf>,
|
|
brain: &BrainState,
|
|
) -> bool {
|
|
match evt {
|
|
GraphEvent::EnteDied { id, status } => {
|
|
graph.on_death(id, status, tx).await;
|
|
}
|
|
GraphEvent::CapabilityRequested { from, cap, reply } => {
|
|
graph.mediate_capability(from, cap, reply).await;
|
|
}
|
|
GraphEvent::SpawnRequest { card, requester } => {
|
|
if let Err(e) = graph.authorize_and_spawn(card, requester).await {
|
|
warn!(?e, "spawn request error");
|
|
}
|
|
}
|
|
GraphEvent::BusRequest { peer, from, request, outbound, reply } => {
|
|
graph.on_bus_request(peer, from, request, outbound, reply).await;
|
|
}
|
|
GraphEvent::BusResponse { seq, response } => {
|
|
graph.on_bus_response(seq, response).await;
|
|
}
|
|
GraphEvent::BusConnClosed { ente_id } => {
|
|
graph.on_bus_conn_closed(ente_id).await;
|
|
}
|
|
GraphEvent::Shutdown { reason } => {
|
|
warn!(?reason, "shutdown del fractal");
|
|
if let Some(path) = checkpoint.as_ref() {
|
|
// Snapshot del grafo
|
|
let snap = graph.snapshot();
|
|
match snap.write(path) {
|
|
Ok(()) => info!(path = %path.display(), entes = snap.entes.len(), "snapshot fractal persistido"),
|
|
Err(e) => warn!(?e, "snapshot write falló"),
|
|
}
|
|
// Snapshot del cerebro (observer state) en archivo adjunto
|
|
let brain_path = path.with_extension("brain.json");
|
|
let obs_snap = brain.observer.write().await.snapshot();
|
|
match write_brain_snapshot(&brain_path, &obs_snap) {
|
|
Ok(()) => info!(
|
|
path = %brain_path.display(),
|
|
total = obs_snap.total,
|
|
kinds = obs_snap.marginal.len(),
|
|
"snapshot brain persistido"
|
|
),
|
|
Err(e) => warn!(?e, "brain snapshot write falló"),
|
|
}
|
|
}
|
|
graph.cascade_shutdown().await;
|
|
return true;
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
async fn reap_until_empty(graph: &mut EnteGraph, tx: &mpsc::Sender<GraphEvent>) {
|
|
loop {
|
|
match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
|
|
Ok(WaitStatus::StillAlive) => return,
|
|
Ok(WaitStatus::Exited(pid, code)) => {
|
|
emit_death(graph, tx, pid, ExitStatus::Exit(code)).await;
|
|
}
|
|
Ok(WaitStatus::Signaled(pid, sig, _core)) => {
|
|
emit_death(graph, tx, pid, ExitStatus::Killed(sig)).await;
|
|
}
|
|
Ok(_) => continue, // Stopped/Continued — irrelevantes
|
|
Err(Errno::ECHILD) => return,
|
|
Err(e) => {
|
|
error!(?e, "waitpid fallo no recuperable en bucle de reaping");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn emit_death(
|
|
graph: &EnteGraph,
|
|
tx: &mpsc::Sender<GraphEvent>,
|
|
pid: Pid,
|
|
status: ExitStatus,
|
|
) {
|
|
let id = match graph.lookup_pid(pid) {
|
|
Some(id) => id,
|
|
None => {
|
|
// Proceso adoptado (subreaper): no está en nuestro grafo.
|
|
info!(?pid, ?status, "huérfano cosechado (no en grafo)");
|
|
return;
|
|
}
|
|
};
|
|
let _ = tx.send(GraphEvent::EnteDied { id, status }).await;
|
|
}
|
|
|
|
fn spawn_echo_smoke_test(bus_path: PathBuf) {
|
|
tokio::spawn(async move {
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
match arje_bus::BusClient::connect(&bus_path).await {
|
|
Ok(mut client) => {
|
|
let req = arje_bus::BusRequest::Invoke {
|
|
cap: arje_echo::echo_capability(),
|
|
blob: b"hola fractal forwardeado".to_vec(),
|
|
};
|
|
match client.call(req).await {
|
|
Ok(arje_bus::BusResponse::Invoked { result }) => {
|
|
info!(echo = %String::from_utf8_lossy(&result), "Invoke ECHO round-trip OK");
|
|
}
|
|
Ok(other) => warn!(?other, "Invoke ECHO respuesta inesperada"),
|
|
Err(e) => warn!(?e, "Invoke ECHO falló"),
|
|
}
|
|
}
|
|
Err(e) => warn!(?e, "no se pudo conectar al bus para test"),
|
|
}
|
|
});
|
|
}
|
|
|
|
fn write_brain_snapshot(path: &std::path::Path, snap: &arje_brain::observer::ObserverSnapshot) -> anyhow::Result<()> {
|
|
let bytes = serde_json::to_vec_pretty(snap)?;
|
|
if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); }
|
|
let tmp = path.with_extension("tmp");
|
|
std::fs::write(&tmp, &bytes)?;
|
|
std::fs::rename(&tmp, path)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn read_brain_snapshot(path: &std::path::Path) -> anyhow::Result<arje_brain::observer::ObserverSnapshot> {
|
|
let bytes = std::fs::read(path)?;
|
|
let snap: arje_brain::observer::ObserverSnapshot = serde_json::from_slice(&bytes)?;
|
|
Ok(snap)
|
|
}
|
|
|
|
fn init_tracing() {
|
|
use tracing_subscriber::{fmt, EnvFilter};
|
|
let filter = EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| EnvFilter::new("arje_zero=debug,info"));
|
|
fmt().with_env_filter(filter).with_target(true).init();
|
|
}
|
|
|
|
fn brain_introspect_path() -> PathBuf {
|
|
if let Ok(p) = std::env::var("ENTE_BRAIN_SOCK") {
|
|
return p.into();
|
|
}
|
|
let runtime = std::env::var("XDG_RUNTIME_DIR")
|
|
.unwrap_or_else(|_| std::env::var("TMPDIR").unwrap_or_else(|_| "/tmp".into()));
|
|
format!("{runtime}/ente-brain.sock").into()
|
|
}
|
|
|
|
/// Auto-flush del audit log a CAS cada 10 segundos. Ejecuta best-effort:
|
|
/// si el flush falla lo logeamos pero no abortamos. La integridad del log
|
|
/// queda garantizada por su hash chain — re-flushar es idempotente.
|
|
fn spawn_audit_auto_flush(state: BrainState) {
|
|
tokio::spawn(async move {
|
|
let mut tick = tokio::time::interval(std::time::Duration::from_secs(10));
|
|
tick.tick().await; // descartar primer tick inmediato
|
|
loop {
|
|
tick.tick().await;
|
|
let mut audit = state.audit.write().await;
|
|
match audit.flush_to_cas() {
|
|
Ok(0) => {} // nada nuevo
|
|
Ok(n) => info!(written = n, total = audit.flushed_count(), "audit auto-flush"),
|
|
Err(e) => warn!(?e, "audit auto-flush falló"),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn spawn_brain_introspect(state: BrainState) {
|
|
let path = brain_introspect_path();
|
|
tokio::spawn(async move {
|
|
let server = IntrospectServer::new(state);
|
|
if let Err(e) = server.serve(&path).await {
|
|
warn!(?e, "introspect server cayó");
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Registra el evento en el observer y dispatcha cualquier regla matched.
|
|
/// Para reglas Sequence: pasamos los últimos N eventos del observer como
|
|
/// history al engine.
|
|
async fn feed_brain(
|
|
brain: &BrainState,
|
|
sink: &brain_glue::GraphSink,
|
|
graph: &EnteGraph,
|
|
evt: &GraphEvent,
|
|
) {
|
|
let Some((kind, subj)) = brain_glue::graph_event_to_brain(evt, graph) else { return };
|
|
let history: Vec<arje_brain::TimedEvent> = {
|
|
let mut obs = brain.observer.write().await;
|
|
obs.record(kind.clone());
|
|
// Snapshot de los últimos 16 eventos — suficiente para cualquier
|
|
// Sequence pattern razonable. Clone hace una sola alocación.
|
|
obs.recent(16).cloned().collect()
|
|
};
|
|
let rules = {
|
|
let engine = brain.engine.read().await;
|
|
engine.dispatch(&kind, &subj, &history)
|
|
};
|
|
if !rules.is_empty() {
|
|
arje_brain::dispatch_actions(&rules, sink).await;
|
|
}
|
|
}
|
|
|
|
/// Inicializa la malla `brahman-net` opcional. Activa sólo si
|
|
/// `BRAHMAN_LISTEN_MULTIADDR` está set. Identidad libp2p persistente
|
|
/// vía `keypair_store`. Bootstrap del DHT vía `BRAHMAN_BOOTSTRAP_PEERS`
|
|
/// (lista coma-separada de multiaddrs, opcional).
|
|
///
|
|
/// Toda fase de setup degrada grácilmente: si la keypair no carga,
|
|
/// si el listen falla, si bootstrap dial falla — loggea y devuelve
|
|
/// `None`. El Init sigue funcionando en modo Unix-only.
|
|
async fn setup_brahman_net(
|
|
dev_mode: bool,
|
|
) -> Option<std::sync::Arc<brahman_net::BrahmanNet>> {
|
|
let listen_addr = match std::env::var("BRAHMAN_LISTEN_MULTIADDR") {
|
|
Ok(s) if !s.is_empty() => s,
|
|
_ => {
|
|
tracing::debug!(
|
|
"brahman-net deshabilitado (sin BRAHMAN_LISTEN_MULTIADDR)"
|
|
);
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let multiaddr: brahman_net::Multiaddr = match listen_addr.parse() {
|
|
Ok(m) => m,
|
|
Err(e) => {
|
|
warn!(addr = %listen_addr, ?e, "BRAHMAN_LISTEN_MULTIADDR inválido — net deshabilitado");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let keypair_path = keypair_store::default_path(dev_mode);
|
|
let (keypair, loaded) = match keypair_store::load_or_generate(&keypair_path) {
|
|
Ok(kp) => kp,
|
|
Err(e) => {
|
|
warn!(path = %keypair_path.display(), ?e, "no pude cargar/generar keypair libp2p — net deshabilitado");
|
|
return None;
|
|
}
|
|
};
|
|
info!(
|
|
path = %keypair_path.display(),
|
|
peer_id = %keypair.public().to_peer_id(),
|
|
loaded = loaded,
|
|
"identidad libp2p {}",
|
|
if loaded { "cargada" } else { "generada y persistida" }
|
|
);
|
|
|
|
let net = match brahman_net::BrahmanNet::with_keypair(keypair) {
|
|
Ok(n) => std::sync::Arc::new(n),
|
|
Err(e) => {
|
|
warn!(?e, "BrahmanNet::with_keypair falló — net deshabilitado");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let actual = net.listen(multiaddr).await;
|
|
info!(addr = %actual, peer_id = %net.peer_id, "brahman-net escuchando");
|
|
|
|
// Bootstrap opcional: dial-ar a peers conocidos para entrar al
|
|
// DHT. Sin bootstrap, el nodo arranca aislado hasta que alguien
|
|
// se conecte a él.
|
|
if let Ok(bootstrap) = std::env::var("BRAHMAN_BOOTSTRAP_PEERS") {
|
|
let mut dialed = 0usize;
|
|
for entry in bootstrap.split(',').filter(|s| !s.is_empty()) {
|
|
match entry.parse::<brahman_net::Multiaddr>() {
|
|
Ok(addr) => {
|
|
net.dial(addr.clone());
|
|
dialed += 1;
|
|
tracing::debug!(peer = %addr, "dial bootstrap");
|
|
}
|
|
Err(e) => {
|
|
warn!(entry = %entry, ?e, "bootstrap multiaddr inválido — saltado");
|
|
}
|
|
}
|
|
}
|
|
if dialed > 0 {
|
|
info!(count = dialed, "bootstrap peers dial-eados");
|
|
}
|
|
}
|
|
|
|
Some(net)
|
|
}
|
|
|
|
/// Carga la política de peers libp2p desde los archivos apuntados por
|
|
/// `BRAHMAN_PEER_ALLOWLIST` y/o `BRAHMAN_PEER_DENYLIST`, y arranca un
|
|
/// watcher para hot reload sobre cualquier cambio.
|
|
///
|
|
/// - Sin ninguna env: `(None, None)` → modo totalmente abierto.
|
|
/// - Con cualquiera (o ambas) set: política activa + watcher vivo.
|
|
/// - Si los archivos fallan al cargar: degrada a `(None, None)`,
|
|
/// loggea, NO rompe el bucle primordial (doctrina PID 1).
|
|
///
|
|
/// Devuelve la política y el `JoinHandle` del watcher (que el caller
|
|
/// debe mantener para que el thread no se aborte). Si no hay paths,
|
|
/// el watcher es un no-op que termina inmediato.
|
|
fn setup_brahman_policy() -> (
|
|
Option<brahman_handshake::peer_policy::PeerPolicy>,
|
|
Option<std::thread::JoinHandle<()>>,
|
|
) {
|
|
let allow_path = std::env::var("BRAHMAN_PEER_ALLOWLIST")
|
|
.ok()
|
|
.filter(|s| !s.is_empty());
|
|
let deny_path = std::env::var("BRAHMAN_PEER_DENYLIST")
|
|
.ok()
|
|
.filter(|s| !s.is_empty());
|
|
|
|
if allow_path.is_none() && deny_path.is_none() {
|
|
tracing::debug!(
|
|
"BRAHMAN_PEER_ALLOWLIST y BRAHMAN_PEER_DENYLIST no set — modo abierto (todo peer pasa)"
|
|
);
|
|
return (None, None);
|
|
}
|
|
|
|
let allow_pb = allow_path.as_deref().map(std::path::Path::new);
|
|
let deny_pb = deny_path.as_deref().map(std::path::Path::new);
|
|
|
|
let policy = match brahman_handshake::peer_policy::PeerPolicy::from_files(allow_pb, deny_pb) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
warn!(
|
|
?e,
|
|
allow = ?allow_path,
|
|
deny = ?deny_path,
|
|
"policy de peers inválida — degradando a modo abierto (sin restricción)"
|
|
);
|
|
return (None, None);
|
|
}
|
|
};
|
|
|
|
let (allow_count, deny_count) = policy.sizes();
|
|
info!(
|
|
allow = ?allow_count,
|
|
deny = deny_count,
|
|
allow_path = ?allow_path,
|
|
deny_path = ?deny_path,
|
|
"policy de peers libp2p cargada"
|
|
);
|
|
|
|
// Spawn watcher para hot reload. Errores aquí no son fatales —
|
|
// tendrías política sin reload, que es razonable.
|
|
let watcher = match policy.spawn_watcher() {
|
|
Ok(h) => Some(h),
|
|
Err(e) => {
|
|
warn!(?e, "policy watcher no se pudo crear — hot reload deshabilitado");
|
|
None
|
|
}
|
|
};
|
|
|
|
(Some(policy), watcher)
|
|
}
|