//! Ente #0 — el primer Ente. PID 1 del fractal. //! //! Reglas no negociables: //! 1. NUNCA lógica de servicio aquí. Sólo: leer Semilla, cosechar zombis, //! mediar capacidades, propagar eventos. //! 2. Single-threaded. Cualquier paralelismo se delega a Entes worker. //! Un panic en un thread de PID 1 = kernel panic. //! 3. Errores de hijos son *eventos* en `graph_tx`, no `Result` propagado. //! //! Este archivo es sólo wireup. La lógica vive en: //! - `seed` : construcción/restauración de la Tarjeta Semilla //! - `bus` : listener Unix + auth via SO_PEERCRED //! - `graph::*` : estado del fractal (lifecycle, topology, shutdown, //! bus_mediator, devices, capabilities) //! - `events` : tipos de eventos del bucle primordial //! - crates externos del workspace para CAS, soma, wasm, snapshot, kernel. mod brain_glue; mod bus; mod events; mod graph; mod keypair_store; mod seed; use anyhow::Context; use arje_brain::{BrainState, IntrospectServer}; use arje_kernel::{become_child_subreaper, bootstrap_kernel_surface, spawn_sigchld_stream, spawn_uevent_stream}; use events::{ExitStatus, GraphEvent, ShutdownReason}; use graph::EnteGraph; use nix::errno::Errno; use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; use nix::unistd::{getpid, Pid}; use std::path::PathBuf; use std::time::Duration; use tokio::sync::mpsc; use tracing::{error, info, warn}; struct CliArgs { checkpoint: Option, restore: Option, rules: Option, rules_out: Option, audit_head: Option, metrics_addr: Option, brain_half_life: Option, autopromote_secs: Option, } fn parse_args() -> CliArgs { let mut args = std::env::args().skip(1); let mut checkpoint = None; let mut restore = None; let mut rules = None; let mut rules_out = None; let mut audit_head = None; let mut metrics_addr = None; let mut brain_half_life = None; let mut autopromote_secs = None; while let Some(a) = args.next() { match a.as_str() { "--checkpoint" => checkpoint = args.next().map(PathBuf::from), "--restore" => restore = args.next().map(PathBuf::from), "--rules" => rules = args.next().map(PathBuf::from), "--rules-out" => rules_out = args.next().map(PathBuf::from), "--audit-head" => audit_head = args.next().map(PathBuf::from), "--metrics-addr" => metrics_addr = args.next(), "--brain-half-life" => brain_half_life = args.next().and_then(|s| s.parse().ok()), "--autopromote-secs" => autopromote_secs = args.next().and_then(|s| s.parse().ok()), other => warn!(arg = %other, "argumento desconocido, ignorado"), } } CliArgs { checkpoint, restore, rules, rules_out, audit_head, metrics_addr, brain_half_life, autopromote_secs, } } fn main() -> anyhow::Result<()> { init_tracing(); let cli = parse_args(); let pid = getpid(); let dev_mode = pid != Pid::from_raw(1); if dev_mode { warn!(?pid, "ente-zero corriendo en DEV MODE (no PID 1) — kernel surface no se monta"); } else { info!("ente-zero despierta como PID 1"); bootstrap_kernel_surface().context("bootstrap kernel surface")?; become_child_subreaper().context("PR_SET_CHILD_SUBREAPER")?; } let card = seed::load(dev_mode, cli.restore.as_deref())?; // current_thread runtime: ver doctrina al inicio del módulo. let rt = tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() .build()?; rt.block_on(primordial_loop( card, dev_mode, cli.checkpoint, cli.restore, cli.rules, cli.rules_out, cli.audit_head, cli.metrics_addr, cli.brain_half_life, cli.autopromote_secs, )) } async fn primordial_loop( seed_card: arje_card::EntityCard, dev_mode: bool, checkpoint_path: Option, restore_path: Option, rules_path: Option, rules_out: Option, audit_head: Option, metrics_addr: Option, brain_half_life: Option, autopromote_secs: Option, ) -> anyhow::Result<()> { info!(seed_id = %seed_card.id, label = %seed_card.label, "Ente #0 entra al bucle primordial"); let (graph_tx, mut graph_rx) = mpsc::channel::(64); let mut sigchld = spawn_sigchld_stream()?; // Uevents puede fallar en dev (sin CAP_NET_ADMIN). Degradamos a un // canal nunca-listo en lugar de abortar el bucle primordial. let mut uevents = match spawn_uevent_stream() { Ok(rx) => rx, Err(e) => { warn!(?e, "uevents deshabilitados (probablemente falta CAP_NET_ADMIN)"); let (_keep_tx, rx) = mpsc::channel::(1); std::mem::forget(_keep_tx); rx } }; // Bus interno: listener antes de spawn de hijos para que su Announce // tenga adónde llegar. Su path se inyecta en ENTE_BUS_SOCK por soma. let bus_sock = bus::default_socket_path(); let bus_path = bus::spawn_bus(bus_sock, graph_tx.clone())?; arje_soma::set_bus_sock(bus_path.to_string_lossy().into_owned()); // Brahman protocol: handshake socket + broker compartido. // // Es un canal paralelo al ente-bus, dedicado a módulos "brahman // conscientes" que se presentan con una Card y declaran flujos // tipados. Si el bind falla (socket en uso, FS no escribible), // degradamos a "modo bus-only" — la doctrina de PID 1 no rompe // por subsistemas opcionales. // Contexto operativo del broker: configurable por env var. Útil para // distinguir test/prod/foreground sin recompilar. Sin la var, los // biases per-contexto declarados en las Cards quedan inactivos. let broker_context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok(); if let Some(ctx) = &broker_context { info!(context = %ctx, "brahman broker bajo contexto operativo"); } let brahman_broker = std::sync::Arc::new(tokio::sync::Mutex::new( brahman_broker::Broker::new(brahman_broker::BrokerConfig { strategy: brahman_broker::MatchStrategy::default(), current_context: broker_context.clone(), }), )); // Brahman-net opcional: si BRAHMAN_LISTEN_MULTIADDR está set, // levantamos la malla P2P y la pasamos como ServerConfig.net (Fase // 2 wire) para que cada Card con outputs se anuncie al DHT y // pueda ser descubierta por nodos remotos. Identidad libp2p // persistida en disco vía keypair_store (peer_id estable across // reboots). let brahman_net = setup_brahman_net(dev_mode).await; // Política opcional de peers libp2p: allowlist + denylist + hot // reload. Activada si BRAHMAN_PEER_ALLOWLIST o BRAHMAN_PEER_DENYLIST // están set. Sin ninguna, modo totalmente abierto (Fase 3 sin // restricción adicional). El watcher se queda vivo en background // observando los archivos para hot reload. let (brahman_policy, _policy_watcher) = setup_brahman_policy(); // Si tenemos AMBOS net y policy, attachamos: el deny de la // policy se proyecta al block_list del swarm para rechazar // conexiones ANTES del Noise handshake (más eficiente que // rechazar en el handshake brahman). Cada hot-reload de la // policy también re-sincroniza vía diff. if let (Some(net), Some(policy)) = (&brahman_net, &brahman_policy) { policy.attach_to_net(net.clone()); let (allow, deny) = policy.sizes(); info!( allow = ?allow, deny = deny, "policy attached al swarm — denies enforcedeados a nivel libp2p" ); } let brahman_sock = brahman_handshake::transport::default_socket_path(); match brahman_handshake::server::Server::bind( &brahman_sock, brahman_handshake::server::ServerConfig { init_attached: true, broker: Some(brahman_broker.clone()), net: brahman_net.clone(), policy: brahman_policy.clone(), }, ) { Ok(server) => { info!(socket = %brahman_sock.display(), "brahman handshake escuchando (Unix)"); // Si hay malla P2P, además del Unix accept loop levantamos // el accept loop libp2p sobre el mismo Server compartido. // Las sesiones locales y remotas conviven en las mismas // tablas (sessions, push_table, broker). let server = std::sync::Arc::new(server); if let Some(net) = brahman_net.clone() { let s_libp2p = server.clone(); let n_libp2p = net.clone(); tokio::spawn(async move { if let Err(e) = brahman_handshake::network::run_libp2p_accept_loop( s_libp2p, n_libp2p, ) .await { warn!(?e, "brahman handshake libp2p accept loop cayó"); } }); info!( "brahman handshake escuchando también vía libp2p (peer_id {})", net.peer_id ); } // Unix accept loop: usa Arc en lugar del consume // de run() para coexistir con el libp2p accept loop. let s_unix = server.clone(); tokio::spawn(async move { loop { match s_unix.accept_one().await { Ok(session) => { tokio::spawn(async move { if let Err(e) = session.handle().await { warn!(?e, "session Unix terminó con error"); } }); } Err(e) => { warn!(?e, "brahman handshake accept_one Unix falló"); break; } } } }); } Err(e) => { warn!(?e, socket = %brahman_sock.display(), "brahman handshake deshabilitado"); } } // Brahman admin: socket separado para snapshots de estado (sesiones + // matches del broker). Misma política de degradación grácil. let admin_sock = brahman_admin::transport::default_socket_path(); match brahman_admin::server::AdminServer::bind( &admin_sock, brahman_broker.clone(), brahman_admin::server::AdminConfig { init_attached: true, current_context: broker_context.clone(), }, ) { Ok(admin) => { info!(socket = %admin_sock.display(), "brahman admin escuchando"); tokio::spawn(async move { if let Err(e) = admin.run().await { warn!(?e, "brahman admin server cayó"); } }); } Err(e) => { warn!(?e, socket = %admin_sock.display(), "brahman admin deshabilitado"); } } let mut graph = EnteGraph::new(seed_card); graph.instantiate_seed_dependencies(&graph_tx).await?; // Cerebro: BrainState compartido + servidor de introspección. // Window de 1024 eventos — suficiente para correlaciones interesantes // sin gastar memoria de PID 1. En dev bajamos el umbral de cristalización // para que el demo (pocos eventos) produzca cristales observables. let mut brain = if dev_mode { // Umbrales relajados para que el demo (pocos eventos) produzca // cristales observables. Con P(b|a) normalizada a [0,1], los // valores típicos en muestras pequeñas son 0.2-0.5. BrainState::with_params(1024, arje_brain::CrystallizationParams { min_support: 2, min_conditional_prob: 0.3, min_pmi: 1.0, }) } else { BrainState::new(1024) }; if let Some(out_path) = rules_out { brain = brain.with_rules_out(out_path); } if let Some(hl) = brain_half_life { let mut obs = brain.observer.write().await; // Reemplazar con un observer nuevo que tenga half-life. Estado // anterior (vacío en este punto) descartado. *obs = arje_brain::Observer::new(1024).with_half_life(hl); info!(hl_secs = hl, "observer con time-decay activo"); } if let Some(secs) = autopromote_secs { arje_brain::spawn_autopromote_loop( brain.clone(), arje_brain::AutopromoteParams { interval_secs: secs, threshold: brain.params, // mismo threshold que crystals manuales }, ); } // Brain restore: si hay --restore , cargamos el snapshot adjunto // .brain.json. Counters preservados across reboots. if let Some(rpath) = &restore_path { let brain_path = rpath.with_extension("brain.json"); if brain_path.exists() { match read_brain_snapshot(&brain_path) { Ok(snap) => { let total = snap.total; let kinds = snap.marginal.len(); let restored = arje_brain::Observer::from_snapshot(snap); *brain.observer.write().await = restored; info!( path = %brain_path.display(), total, kinds, "brain snapshot restaurado" ); } Err(e) => warn!(?e, path = %brain_path.display(), "brain snapshot read falló"), } } } // Si --audit-head, configuramos el head pointer y arrancamos auto-flush. if let Some(head_path) = audit_head { // Re-creamos el AuditLog con head pointer. let new_audit = arje_brain::audit::AuditLog::new() .with_head_pointer(head_path); *brain.audit.write().await = new_audit; spawn_audit_auto_flush(brain.clone()); } // Carga inicial de reglas desde JSON/JSONL si --rules path proporcionado. if let Some(path) = &rules_path { match arje_brain::load_rules_file(path) { Ok(rules) => { let mut engine = brain.engine.write().await; for r in rules { engine.insert(r); } info!(count = engine.len(), path = %path.display(), "reglas cargadas"); } Err(e) => warn!(?e, path = %path.display(), "carga de reglas falló"), } } // Endpoint Prometheus opcional. En dev por defecto en 127.0.0.1:9911 si // el flag no se pasó. let metrics_addr = metrics_addr.or_else(|| { if dev_mode { Some("127.0.0.1:9911".to_string()) } else { None } }); if let Some(addr_s) = metrics_addr { match addr_s.parse::() { Ok(addr) => { let s = brain.clone(); tokio::spawn(async move { if let Err(e) = arje_brain::serve_metrics(s, addr).await { warn!(?e, "metrics server cayó"); } }); } Err(e) => warn!(?e, addr = %addr_s, "metrics-addr inválido"), } } spawn_brain_introspect(brain.clone()); let brain_sink = brain_glue::GraphSink { graph_tx: graph_tx.clone(), // Spawns auto-disparados desde reglas usan la identidad de la Semilla // (único Ente con Capability::Spawn por construcción). requester: graph.seed_id(), }; // Demo automático del forwarding (sólo dev, sólo si el binario existe). if dev_mode && std::path::Path::new("target/debug/ente-echo").exists() { spawn_echo_smoke_test(bus_path.clone()); } // En dev mode no tenemos hijos por defecto y el bucle se quedaría inerte. let dev_exit = if dev_mode { Some(tokio::time::sleep(Duration::from_secs(4))) } else { None }; tokio::pin!(dev_exit); // GC de capability grants expirados — corre cada 10 segundos. let mut grant_purge = tokio::time::interval(Duration::from_secs(10)); grant_purge.tick().await; // descartar primer tick inmediato loop { tokio::select! { biased; Some(_) = sigchld.recv() => { reap_until_empty(&mut graph, &graph_tx).await; } Some(uevt) = uevents.recv() => { graph.on_uevent(uevt, &graph_tx).await; } Some(evt) = graph_rx.recv() => { // Cerebro observa antes que el grafo mute. Snapshot del // SubjectInfo se hace contra el estado pre-mutación. feed_brain(&brain, &brain_sink, &graph, &evt).await; if dispatch_graph_event(&mut graph, evt, &graph_tx, &checkpoint_path, &brain).await { return Ok(()); } } _ = grant_purge.tick() => { let n = graph.purge_expired_grants(); if n > 0 { info!(purged = n, active = graph.active_grants_count(), "GC capability grants"); } } _ = async { dev_exit.as_mut().as_pin_mut().unwrap().await }, if dev_mode => { info!("dev mode: timer expirado, cerrando bucle primordial"); let _ = graph_tx.send(GraphEvent::Shutdown { reason: ShutdownReason::SeedRequested, }).await; } } } } /// Devuelve `true` si el bucle primordial debe terminar. async fn dispatch_graph_event( graph: &mut EnteGraph, evt: GraphEvent, tx: &mpsc::Sender, checkpoint: &Option, brain: &BrainState, ) -> bool { match evt { GraphEvent::EnteDied { id, status } => { graph.on_death(id, status, tx).await; } GraphEvent::CapabilityRequested { from, cap, reply } => { graph.mediate_capability(from, cap, reply).await; } GraphEvent::SpawnRequest { card, requester } => { if let Err(e) = graph.authorize_and_spawn(card, requester).await { warn!(?e, "spawn request error"); } } GraphEvent::BusRequest { peer, from, request, outbound, reply } => { graph.on_bus_request(peer, from, request, outbound, reply).await; } GraphEvent::BusResponse { seq, response } => { graph.on_bus_response(seq, response).await; } GraphEvent::BusConnClosed { ente_id } => { graph.on_bus_conn_closed(ente_id).await; } GraphEvent::Shutdown { reason } => { warn!(?reason, "shutdown del fractal"); if let Some(path) = checkpoint.as_ref() { // Snapshot del grafo let snap = graph.snapshot(); match snap.write(path) { Ok(()) => info!(path = %path.display(), entes = snap.entes.len(), "snapshot fractal persistido"), Err(e) => warn!(?e, "snapshot write falló"), } // Snapshot del cerebro (observer state) en archivo adjunto let brain_path = path.with_extension("brain.json"); let obs_snap = brain.observer.write().await.snapshot(); match write_brain_snapshot(&brain_path, &obs_snap) { Ok(()) => info!( path = %brain_path.display(), total = obs_snap.total, kinds = obs_snap.marginal.len(), "snapshot brain persistido" ), Err(e) => warn!(?e, "brain snapshot write falló"), } } graph.cascade_shutdown().await; return true; } } false } async fn reap_until_empty(graph: &mut EnteGraph, tx: &mpsc::Sender) { loop { match waitpid(None, Some(WaitPidFlag::WNOHANG)) { Ok(WaitStatus::StillAlive) => return, Ok(WaitStatus::Exited(pid, code)) => { emit_death(graph, tx, pid, ExitStatus::Exit(code)).await; } Ok(WaitStatus::Signaled(pid, sig, _core)) => { emit_death(graph, tx, pid, ExitStatus::Killed(sig)).await; } Ok(_) => continue, // Stopped/Continued — irrelevantes Err(Errno::ECHILD) => return, Err(e) => { error!(?e, "waitpid fallo no recuperable en bucle de reaping"); return; } } } } async fn emit_death( graph: &EnteGraph, tx: &mpsc::Sender, pid: Pid, status: ExitStatus, ) { let id = match graph.lookup_pid(pid) { Some(id) => id, None => { // Proceso adoptado (subreaper): no está en nuestro grafo. info!(?pid, ?status, "huérfano cosechado (no en grafo)"); return; } }; let _ = tx.send(GraphEvent::EnteDied { id, status }).await; } fn spawn_echo_smoke_test(bus_path: PathBuf) { tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(300)).await; match arje_bus::BusClient::connect(&bus_path).await { Ok(mut client) => { let req = arje_bus::BusRequest::Invoke { cap: arje_echo::echo_capability(), blob: b"hola fractal forwardeado".to_vec(), }; match client.call(req).await { Ok(arje_bus::BusResponse::Invoked { result }) => { info!(echo = %String::from_utf8_lossy(&result), "Invoke ECHO round-trip OK"); } Ok(other) => warn!(?other, "Invoke ECHO respuesta inesperada"), Err(e) => warn!(?e, "Invoke ECHO falló"), } } Err(e) => warn!(?e, "no se pudo conectar al bus para test"), } }); } fn write_brain_snapshot(path: &std::path::Path, snap: &arje_brain::observer::ObserverSnapshot) -> anyhow::Result<()> { let bytes = serde_json::to_vec_pretty(snap)?; if let Some(parent) = path.parent() { let _ = std::fs::create_dir_all(parent); } let tmp = path.with_extension("tmp"); std::fs::write(&tmp, &bytes)?; std::fs::rename(&tmp, path)?; Ok(()) } fn read_brain_snapshot(path: &std::path::Path) -> anyhow::Result { let bytes = std::fs::read(path)?; let snap: arje_brain::observer::ObserverSnapshot = serde_json::from_slice(&bytes)?; Ok(snap) } fn init_tracing() { use tracing_subscriber::{fmt, EnvFilter}; let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("arje_zero=debug,info")); fmt().with_env_filter(filter).with_target(true).init(); } fn brain_introspect_path() -> PathBuf { if let Ok(p) = std::env::var("ENTE_BRAIN_SOCK") { return p.into(); } let runtime = std::env::var("XDG_RUNTIME_DIR") .unwrap_or_else(|_| std::env::var("TMPDIR").unwrap_or_else(|_| "/tmp".into())); format!("{runtime}/ente-brain.sock").into() } /// Auto-flush del audit log a CAS cada 10 segundos. Ejecuta best-effort: /// si el flush falla lo logeamos pero no abortamos. La integridad del log /// queda garantizada por su hash chain — re-flushar es idempotente. fn spawn_audit_auto_flush(state: BrainState) { tokio::spawn(async move { let mut tick = tokio::time::interval(std::time::Duration::from_secs(10)); tick.tick().await; // descartar primer tick inmediato loop { tick.tick().await; let mut audit = state.audit.write().await; match audit.flush_to_cas() { Ok(0) => {} // nada nuevo Ok(n) => info!(written = n, total = audit.flushed_count(), "audit auto-flush"), Err(e) => warn!(?e, "audit auto-flush falló"), } } }); } fn spawn_brain_introspect(state: BrainState) { let path = brain_introspect_path(); tokio::spawn(async move { let server = IntrospectServer::new(state); if let Err(e) = server.serve(&path).await { warn!(?e, "introspect server cayó"); } }); } /// Registra el evento en el observer y dispatcha cualquier regla matched. /// Para reglas Sequence: pasamos los últimos N eventos del observer como /// history al engine. async fn feed_brain( brain: &BrainState, sink: &brain_glue::GraphSink, graph: &EnteGraph, evt: &GraphEvent, ) { let Some((kind, subj)) = brain_glue::graph_event_to_brain(evt, graph) else { return }; let history: Vec = { let mut obs = brain.observer.write().await; obs.record(kind.clone()); // Snapshot de los últimos 16 eventos — suficiente para cualquier // Sequence pattern razonable. Clone hace una sola alocación. obs.recent(16).cloned().collect() }; let rules = { let engine = brain.engine.read().await; engine.dispatch(&kind, &subj, &history) }; if !rules.is_empty() { arje_brain::dispatch_actions(&rules, sink).await; } } /// Inicializa la malla `brahman-net` opcional. Activa sólo si /// `BRAHMAN_LISTEN_MULTIADDR` está set. Identidad libp2p persistente /// vía `keypair_store`. Bootstrap del DHT vía `BRAHMAN_BOOTSTRAP_PEERS` /// (lista coma-separada de multiaddrs, opcional). /// /// Toda fase de setup degrada grácilmente: si la keypair no carga, /// si el listen falla, si bootstrap dial falla — loggea y devuelve /// `None`. El Init sigue funcionando en modo Unix-only. async fn setup_brahman_net( dev_mode: bool, ) -> Option> { let listen_addr = match std::env::var("BRAHMAN_LISTEN_MULTIADDR") { Ok(s) if !s.is_empty() => s, _ => { tracing::debug!( "brahman-net deshabilitado (sin BRAHMAN_LISTEN_MULTIADDR)" ); return None; } }; let multiaddr: brahman_net::Multiaddr = match listen_addr.parse() { Ok(m) => m, Err(e) => { warn!(addr = %listen_addr, ?e, "BRAHMAN_LISTEN_MULTIADDR inválido — net deshabilitado"); return None; } }; let keypair_path = keypair_store::default_path(dev_mode); let (keypair, loaded) = match keypair_store::load_or_generate(&keypair_path) { Ok(kp) => kp, Err(e) => { warn!(path = %keypair_path.display(), ?e, "no pude cargar/generar keypair libp2p — net deshabilitado"); return None; } }; info!( path = %keypair_path.display(), peer_id = %keypair.public().to_peer_id(), loaded = loaded, "identidad libp2p {}", if loaded { "cargada" } else { "generada y persistida" } ); let net = match brahman_net::BrahmanNet::with_keypair(keypair) { Ok(n) => std::sync::Arc::new(n), Err(e) => { warn!(?e, "BrahmanNet::with_keypair falló — net deshabilitado"); return None; } }; let actual = net.listen(multiaddr).await; info!(addr = %actual, peer_id = %net.peer_id, "brahman-net escuchando"); // Bootstrap opcional: dial-ar a peers conocidos para entrar al // DHT. Sin bootstrap, el nodo arranca aislado hasta que alguien // se conecte a él. if let Ok(bootstrap) = std::env::var("BRAHMAN_BOOTSTRAP_PEERS") { let mut dialed = 0usize; for entry in bootstrap.split(',').filter(|s| !s.is_empty()) { match entry.parse::() { Ok(addr) => { net.dial(addr.clone()); dialed += 1; tracing::debug!(peer = %addr, "dial bootstrap"); } Err(e) => { warn!(entry = %entry, ?e, "bootstrap multiaddr inválido — saltado"); } } } if dialed > 0 { info!(count = dialed, "bootstrap peers dial-eados"); } } Some(net) } /// Carga la política de peers libp2p desde los archivos apuntados por /// `BRAHMAN_PEER_ALLOWLIST` y/o `BRAHMAN_PEER_DENYLIST`, y arranca un /// watcher para hot reload sobre cualquier cambio. /// /// - Sin ninguna env: `(None, None)` → modo totalmente abierto. /// - Con cualquiera (o ambas) set: política activa + watcher vivo. /// - Si los archivos fallan al cargar: degrada a `(None, None)`, /// loggea, NO rompe el bucle primordial (doctrina PID 1). /// /// Devuelve la política y el `JoinHandle` del watcher (que el caller /// debe mantener para que el thread no se aborte). Si no hay paths, /// el watcher es un no-op que termina inmediato. fn setup_brahman_policy() -> ( Option, Option>, ) { let allow_path = std::env::var("BRAHMAN_PEER_ALLOWLIST") .ok() .filter(|s| !s.is_empty()); let deny_path = std::env::var("BRAHMAN_PEER_DENYLIST") .ok() .filter(|s| !s.is_empty()); if allow_path.is_none() && deny_path.is_none() { tracing::debug!( "BRAHMAN_PEER_ALLOWLIST y BRAHMAN_PEER_DENYLIST no set — modo abierto (todo peer pasa)" ); return (None, None); } let allow_pb = allow_path.as_deref().map(std::path::Path::new); let deny_pb = deny_path.as_deref().map(std::path::Path::new); let policy = match brahman_handshake::peer_policy::PeerPolicy::from_files(allow_pb, deny_pb) { Ok(p) => p, Err(e) => { warn!( ?e, allow = ?allow_path, deny = ?deny_path, "policy de peers inválida — degradando a modo abierto (sin restricción)" ); return (None, None); } }; let (allow_count, deny_count) = policy.sizes(); info!( allow = ?allow_count, deny = deny_count, allow_path = ?allow_path, deny_path = ?deny_path, "policy de peers libp2p cargada" ); // Spawn watcher para hot reload. Errores aquí no son fatales — // tendrías política sin reload, que es razonable. let watcher = match policy.spawn_watcher() { Ok(h) => Some(h), Err(e) => { warn!(?e, "policy watcher no se pudo crear — hot reload deshabilitado"); None } }; (Some(policy), watcher) }