refactor(naming): A1 — ente→arje, vista→revista, pluma→fana

Rename batch de la Fase A del PLAN_MACRO:
- 25 crates ente-* → arje-* (protocol/init/runtime/compat). El linaje
  arje (init Linux) queda con prefijo coherente.
- vista → revista (revista-core + revista-web).
- pluma → fana (fana-md + fana-md-reader-web). fana absorbe el linaje
  markdown de pluma; será el writer DAG editor (prioridad alta).

Cambios:
- git mv de 29 crate dirs + 2 SDDs
- package/lib/bin names + path refs + imports .rs reescritos
- workspace Cargo.toml + comentarios de sección
- SDDs de init/runtime/compat/protocol actualizados a arje-
- SDD de revista + SDD de fana (reescrito: writer DAG editor)
- docs/STATUS.md, ROADMAP.md, PLAN_MACRO.md, arje-boot.md,
  arje-replace-systemd.md actualizados
- docs/changelog/akasha.md → chasqui.md

scripts/rename-fase-a.py idempotente (--dry-run soportado).
cargo check --workspace verde.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-20 00:10:14 +00:00
parent 3fc6dcfa72
commit b83d40a833
159 changed files with 2384 additions and 1111 deletions
@@ -0,0 +1,240 @@
//! Bus mediator: integración de `EnteGraph` con el bus interno.
//!
//! Responsabilidades:
//! - Auth de Announce (verificar identidad reclamada contra SO_PEERCRED)
//! - Registro de conexiones (`bus_connections` indexado por Ulid)
//! - Forwarding de Invokes a proveedores
//! - Tracking de invokes en vuelo (`pending_invokes` por seq)
//! - Cleanup en cierre de conexión
use super::{EnteGraph, SERVER_SEQ_FLAG};
use arje_bus::{BusMessage, BusPayload, BusRequest, BusResponse, EnteInfo, PeerCreds};
use arje_card::Capability;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, warn};
use ulid::Ulid;
/// Operaciones que requieren identidad verificada en el bus.
///
/// - `Announce`: establece bus_connections para forwarding.
/// - `UpdateCapabilities`: muta dynamic_provides del Ente — sólo el dueño.
///
/// Invoke, ListEntes y power-mgmt se aceptan anonymous — políticas por
/// capacidad se aplican aguas abajo, no aquí.
fn requires_auth(req: &BusRequest) -> bool {
matches!(
req,
BusRequest::Announce { .. } | BusRequest::UpdateCapabilities { .. }
)
}
impl EnteGraph {
pub async fn on_bus_request(
&mut self,
peer: PeerCreds,
from: Option<Ulid>,
request: BusRequest,
outbound: mpsc::Sender<BusMessage>,
reply: oneshot::Sender<BusResponse>,
) {
// ---- Auth: kernel-injected SO_PEERCRED vs identidad reclamada ----
let from_authenticated = match from {
None => None,
Some(claimed) => {
let expected = self.incarnated.get(&claimed).and_then(|i| i.pid);
match expected {
Some(p) if p.as_raw() == peer.pid => Some(claimed),
Some(p) => {
warn!(
claimed = %claimed, expected_pid = p.as_raw(),
actual_pid = peer.pid,
"identity mismatch — rechazando request"
);
let _ = reply.send(BusResponse::Error("identity mismatch".into()));
return;
}
None => {
warn!(?claimed, peer_pid = peer.pid, "Ente desconocido reclamando identidad");
let _ = reply.send(BusResponse::Error("unknown ente claimed".into()));
return;
}
}
}
};
if requires_auth(&request) && from_authenticated.is_none() {
let _ = reply.send(BusResponse::Error("auth required for this request".into()));
return;
}
// ---- Dispatch ----
match request {
BusRequest::Announce { capabilities } => {
let id = from_authenticated.expect("auth-required guarantees Some");
let label = self.incarnated.get(&id).map(|i| i.card.label.clone())
.unwrap_or_else(|| "anónimo".into());
info!(%id, %label, ?capabilities, peer_pid = peer.pid, "Announce autenticado");
self.bus_connections.insert(id, outbound);
let _ = reply.send(BusResponse::Ok);
}
BusRequest::ListEntes => {
let entes = self.incarnated.values()
.map(|i| EnteInfo {
id: i.card.id,
label: i.card.label.clone(),
provides: i.card.provides.iter().cloned().collect(),
pid: i.pid.map(|p| p.as_raw()),
})
.collect();
let _ = reply.send(BusResponse::Entes(entes));
}
BusRequest::PowerOff { interactive } => {
info!(?from_authenticated, interactive, peer_pid = peer.pid, "PowerOff via bus");
let _ = reply.send(BusResponse::Ok);
}
BusRequest::Reboot { interactive } => {
info!(?from_authenticated, interactive, "Reboot via bus");
let _ = reply.send(BusResponse::Ok);
}
BusRequest::Suspend { interactive } => {
info!(?from_authenticated, interactive, "Suspend via bus");
let _ = reply.send(BusResponse::Ok);
}
BusRequest::Hibernate { interactive } => {
info!(?from_authenticated, interactive, "Hibernate via bus");
let _ = reply.send(BusResponse::Ok);
}
BusRequest::Invoke { cap, blob } => {
self.forward_invoke(from_authenticated, cap, blob, reply).await;
}
BusRequest::UpdateCapabilities { adds, removes } => {
let id = from_authenticated.expect("auth-required guarantees Some");
self.apply_capability_update(id, adds, removes);
let _ = reply.send(BusResponse::Ok);
}
}
}
/// Muta `dynamic_provides` del Ente y actualiza el índice global de
/// providers. La Card original (immutable) no se toca.
fn apply_capability_update(
&mut self,
ente_id: Ulid,
adds: Vec<Capability>,
removes: Vec<Capability>,
) {
// Adiciones: dedupe contra Card.provides + dynamic_provides existentes.
let mut added = Vec::new();
let mut removed = Vec::new();
if let Some(inc) = self.incarnated.get_mut(&ente_id) {
for cap in adds {
if inc.card.provides.contains(&cap) || inc.dynamic_provides.contains(&cap) {
continue; // ya provista, no-op
}
inc.dynamic_provides.insert(cap.clone());
added.push(cap);
}
for cap in removes {
if inc.dynamic_provides.remove(&cap) {
removed.push(cap);
}
// Caps de la Card original no se pueden quitar — silenciosamente
// ignoradas. Una Card es contrato; sólo el dynamic es mutable.
}
}
// Actualizar índice global. Hacemos esto fuera del scope `inc` para
// evitar el doble-borrow de self.
for cap in &added {
self.register_dynamic_cap(ente_id, cap.clone());
}
for cap in &removed {
self.unregister_dynamic_cap(ente_id, cap);
// Revocar grants emitidos contra esta cap por este Ente.
let revoked: Vec<u64> = self.grants.iter()
.filter(|(_, g)| g.provider == ente_id && &g.cap == cap)
.map(|(t, _)| *t)
.collect();
for t in revoked {
self.grants.remove(&t);
}
}
info!(
%ente_id,
added_count = added.len(),
removed_count = removed.len(),
"capabilities actualizadas en runtime"
);
}
/// Enruta un Invoke al proveedor real de la capacidad. Aloca un seq
/// server-side, registra el reply oneshot en `pending_invokes`, y empuja
/// el request por la conexión del proveedor.
async fn forward_invoke(
&mut self,
from: Option<Ulid>,
cap: Capability,
blob: Vec<u8>,
reply: oneshot::Sender<BusResponse>,
) {
let provider_id = match self.pick_invokable_provider(&cap) {
Some(id) => id,
None => {
let _ = reply.send(BusResponse::Error(format!("sin proveedor invokable para {cap:?}")));
return;
}
};
let outbound = match self.bus_connections.get(&provider_id) {
Some(o) => o.clone(),
None => {
let _ = reply.send(BusResponse::Error("proveedor no conectado al bus".into()));
return;
}
};
let seq = self.alloc_invoke_seq();
self.pending_invokes.insert(seq, reply);
debug!(?from, ?cap, ?provider_id, seq, blob_len = blob.len(), "forwardeando Invoke");
let msg = BusMessage {
from: None,
seq,
payload: BusPayload::Request(BusRequest::Invoke { cap, blob }),
};
if outbound.send(msg).await.is_err() {
if let Some(orig) = self.pending_invokes.remove(&seq) {
let _ = orig.send(BusResponse::Error("conn del proveedor cerrada".into()));
}
}
}
fn pick_invokable_provider(&self, cap: &Capability) -> Option<Ulid> {
// Sólo proveedores con conexión al bus pueden recibir forwards.
// El propio Ente #0 está en `providers` para varias caps pero no
// debe recibir forwards — se filtra implícitamente porque la Semilla
// no tiene conexión al bus.
self.providers.get(cap)?
.iter()
.find(|id| self.bus_connections.contains_key(id))
.copied()
}
pub(in crate::graph) fn alloc_invoke_seq(&mut self) -> u64 {
self.next_invoke_seq = self.next_invoke_seq.wrapping_add(1);
SERVER_SEQ_FLAG | self.next_invoke_seq
}
pub async fn on_bus_response(&mut self, seq: u64, response: BusResponse) {
if let Some(orig) = self.pending_invokes.remove(&seq) {
let _ = orig.send(response);
} else {
warn!(seq, "Response sin pending invoke");
}
}
pub async fn on_bus_conn_closed(&mut self, ente_id: Option<Ulid>) {
if let Some(id) = ente_id {
self.bus_connections.remove(&id);
// No revocamos providers — la capacidad sigue declarada en su
// Card. Sólo perdimos el canal de invocación.
debug!(%id, "bus connection cerrada");
}
}
}
@@ -0,0 +1,98 @@
//! Mediación de capabilities: emisión, renovación, revocación de tokens.
//!
//! Los grants tienen TTL (`DEFAULT_GRANT_TTL`). El cliente debe renovarlos
//! periódicamente con `renew_grant(token)`; en caso contrario, el background
//! task `purge_expired_grants` los revoca al vencimiento.
use super::{quota_for_capability, ttl_for_capability, EnteGraph, GrantedCapability};
use crate::events::CapabilityGrant;
use arje_card::Capability;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::debug;
use ulid::Ulid;
impl EnteGraph {
pub async fn mediate_capability(
&mut self,
from: Ulid,
cap: Capability,
reply: oneshot::Sender<CapabilityGrant>,
) {
let grant = match self.providers.get(&cap).and_then(|s| s.iter().next().copied()) {
None => CapabilityGrant::NoProvider,
Some(provider) => {
// Quota: contar tokens vivos para (from, cap). Si excede,
// rechazar antes de emitir uno nuevo.
let limit = quota_for_capability(&cap);
let active = self.active_tokens_for(from, &cap);
if active >= limit {
CapabilityGrant::QuotaExceeded { active, limit }
} else {
let token = self.next_token;
self.next_token += 1;
let ttl = ttl_for_capability(&cap);
let expires_at = Instant::now() + ttl;
self.grants.insert(token, GrantedCapability {
cap: cap.clone(),
provider,
holder: from,
expires_at,
});
CapabilityGrant::Granted { token }
}
}
};
let _ = reply.send(grant);
}
/// Cuenta tokens vivos (no expirados) emitidos a un holder para una cap.
pub fn active_tokens_for(&self, holder: Ulid, cap: &Capability) -> u32 {
let now = Instant::now();
self.grants.values()
.filter(|g| g.holder == holder && &g.cap == cap && g.expires_at > now)
.count() as u32
}
/// Extiende un grant existente. Devuelve `true` si renovó. Si el token
/// no existe o ya expiró, `false` (el cliente debe re-acquire).
/// Usa el TTL específico de la cap del grant.
///
/// Reservado para el flujo de capability renewal (no cableado todavía).
#[allow(dead_code)]
pub fn renew_grant(&mut self, token: u64) -> bool {
let now = Instant::now();
if let Some(g) = self.grants.get_mut(&token) {
if g.expires_at > now {
g.expires_at = now + ttl_for_capability(&g.cap);
return true;
}
// Expired — purgamos aquí mismo.
self.grants.remove(&token);
}
false
}
/// GC: elimina grants vencidos. Devuelve cuántos fueron purgados.
pub fn purge_expired_grants(&mut self) -> usize {
let now = Instant::now();
let expired: Vec<u64> = self.grants.iter()
.filter(|(_, g)| g.expires_at <= now)
.map(|(t, _)| *t)
.collect();
for t in &expired {
self.grants.remove(t);
}
if !expired.is_empty() {
debug!(count = expired.len(), "grants expirados purgados");
}
expired.len()
}
/// Cuenta de grants vivos (no expirados). Usado por métricas.
pub fn active_grants_count(&self) -> usize {
let now = Instant::now();
self.grants.values().filter(|g| g.expires_at > now).count()
}
}
@@ -0,0 +1,60 @@
//! Device registry: mantiene el índice de dispositivos del kernel presentes,
//! traduce uevents en cambios de `Capability::Device { class }`.
use super::EnteGraph;
use crate::events::GraphEvent;
use arje_card::{Capability, DeviceClass};
use arje_kernel::{UAction, UEvent};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
impl EnteGraph {
pub async fn on_uevent(&mut self, evt: UEvent, _tx: &mpsc::Sender<GraphEvent>) {
let class = match &evt.device_class {
Some(c) => c.clone(),
None => return, // subsystems sin DeviceClass mapeada — ignoramos.
};
match evt.action {
UAction::Add | UAction::Bind | UAction::Online => {
let was_first = self.devices_of_class(&class) == 0;
self.devices.insert(evt.devpath.clone(), evt.clone());
if was_first {
// Primera instancia de la clase → la registramos como
// capacidad disponible. El "proveedor" virtual es el
// Ente #0 (kernel surface).
let cap = Capability::Device { class: class.clone() };
self.providers.entry(cap).or_default().insert(self.seed.id);
info!(?class, devpath = %evt.devpath, "device capability disponible");
}
}
UAction::Remove | UAction::Unbind | UAction::Offline => {
self.devices.remove(&evt.devpath);
if self.devices_of_class(&class) == 0 {
let cap = Capability::Device { class: class.clone() };
if let Some(set) = self.providers.get_mut(&cap) {
set.remove(&self.seed.id);
}
let revoked: Vec<u64> = self.grants.iter()
.filter(|(_, g)| g.cap == cap)
.map(|(t, _)| *t)
.collect();
for t in revoked {
self.grants.remove(&t);
}
warn!(?class, "última instancia removida — capacidad revocada");
}
}
UAction::Change | UAction::Move => {
self.devices.insert(evt.devpath.clone(), evt);
debug!(?class, "device modified");
}
UAction::Unknown => {}
}
}
fn devices_of_class(&self, class: &DeviceClass) -> usize {
self.devices.values()
.filter(|e| e.device_class.as_ref() == Some(class))
.count()
}
}
@@ -0,0 +1,154 @@
//! Encarnación, muerte y supervisión.
//!
//! Aquí vive el flujo: Card → autorizar → soma::incarnate / wasm → registro
//! en el grafo → SIGCHLD → on_death → Restart/OneShot/Delegate.
use super::{EnteGraph, Incarnated};
use crate::events::{ExitStatus, GraphEvent};
use arje_bus::{BusMessage, BusPayload, BusRequest};
use arje_card::{Capability, EntityCard, Payload, Supervision};
use tokio::sync::mpsc;
use tracing::{info, warn};
use ulid::Ulid;
impl EnteGraph {
/// Encarna las dependencias declaradas en la Semilla. Único punto donde
/// el Init "decide": después sólo reacciona.
pub async fn instantiate_seed_dependencies(
&mut self,
_tx: &mpsc::Sender<GraphEvent>,
) -> anyhow::Result<()> {
let cards = std::mem::take(&mut self.pending_genesis);
if cards.is_empty() {
info!(seed = %self.seed.label, "semilla sin genesis cards");
return Ok(());
}
info!(seed = %self.seed.label, count = cards.len(), "instanciando genesis");
let seed_id = self.seed.id;
for card in cards {
if let Err(e) = self.authorize_and_spawn(card, seed_id).await {
warn!(?e, "genesis card falló");
}
}
Ok(())
}
/// Spawn solicitado por un Ente con `Capability::Spawn`. Verifica auth,
/// requires del grafo, y delega la encarnación al backend correspondiente
/// (`arje_soma` para procesos, `arje_wasm` para Wasm).
pub async fn authorize_and_spawn(
&mut self,
mut card: EntityCard,
requester: Ulid,
) -> anyhow::Result<()> {
if !self.holder_has(requester, &Capability::Spawn) {
warn!(?requester, "spawn denied: lacks Capability::Spawn");
return Ok(());
}
if let Err(e) = card.validate() {
warn!(?e, label = %card.label, "card inválida, spawn rechazado");
return Ok(());
}
// Falla rápida sobre `requires` — mejor que daemons en bucle.
for req in &card.requires {
if !self.providers.contains_key(req) {
warn!(?req, label = %card.label, "requires no satisfecho");
return Ok(());
}
}
// Lineage por defecto = quien pidió el spawn.
if card.lineage.is_none() {
card.lineage = Some(requester);
}
let pid = match &card.payload {
Payload::Virtual => None,
Payload::Native { .. } | Payload::Legacy { .. } => {
Some(arje_soma::incarnate(&card)?)
}
Payload::Wasm { module_sha256, entry } => {
// Wasm: hilo dedicado, sin PID. Su muerte se observa por
// estado del runtime, no por SIGCHLD.
let bytes = arje_cas::resolve(module_sha256)
.map_err(|e| anyhow::anyhow!("CAS resolve para {}: {e}", card.label))?;
arje_wasm::incarnate_wasm(&card, bytes, entry.clone())?;
None
}
};
if let Some(p) = pid {
self.by_pid.insert(p.as_raw(), card.id);
}
self.register_provider(&card);
if let Some(parent) = card.lineage {
self.children.entry(parent).or_default().push(card.id);
}
info!(label = %card.label, ?pid, lineage = ?card.lineage, "Ente encarnado");
self.incarnated.insert(card.id, Incarnated {
card, pid,
dynamic_provides: std::collections::BTreeSet::new(),
});
Ok(())
}
pub async fn on_death(
&mut self,
id: Ulid,
status: ExitStatus,
_tx: &mpsc::Sender<GraphEvent>,
) {
let Some(inc) = self.incarnated.remove(&id) else { return };
if let Some(p) = inc.pid {
self.by_pid.remove(&p.as_raw());
}
self.unregister_provider(&inc.card);
if let Some(parent) = inc.card.lineage {
if let Some(siblings) = self.children.get_mut(&parent) {
siblings.retain(|c| c != &id);
}
}
info!(label = %inc.card.label, ?status, "Ente disuelto");
match inc.card.supervision.clone() {
Supervision::Restart { initial, max: _ } => {
// Backoff exponencial: TODO real con timer del runtime.
tokio::time::sleep(initial).await;
let new_card = EntityCard { id: Ulid::new(), ..inc.card };
if let Err(e) = self.authorize_and_spawn(new_card, self.seed.id).await {
warn!(?e, "restart falló");
}
}
Supervision::OneShot => {}
Supervision::Delegate => {
self.notify_lineage_of_death(&inc, &status);
}
}
}
/// Fire-and-forget: si el parent tiene conexión al bus, le forwardeamos
/// un Invoke con la muerte del hijo. Sin retry, sin backpressure.
fn notify_lineage_of_death(&mut self, inc: &Incarnated, status: &ExitStatus) {
let Some(parent) = inc.card.lineage else { return };
info!(
child = %inc.card.id, parent = %parent, label = %inc.card.label,
?status,
"Supervision::Delegate — muerte notificada al lineage"
);
if let Some(out) = self.bus_connections.get(&parent).cloned() {
let blob = format!("{}:{:?}", inc.card.id, status);
let seq = self.alloc_invoke_seq();
let msg = BusMessage {
from: None,
seq,
payload: BusPayload::Request(BusRequest::Invoke {
cap: Capability::Endpoint {
interface: arje_card::InterfaceId([0xde; 16]),
version: 1,
},
blob: blob.into_bytes(),
}),
};
let _ = out.try_send(msg);
}
}
}
+229
View File
@@ -0,0 +1,229 @@
//! `EnteGraph`: estado del fractal vivo en PID 1.
//!
//! Diseño:
//! - Submódulos por concern: lifecycle, topology, shutdown, bus_mediator,
//! devices, capabilities. Cada uno extiende `impl EnteGraph` con métodos
//! relacionados.
//! - Estado plano (no substructs todavía) — la separación es por
//! comportamiento, no por compartimentación de datos.
//! - Toda mutación pasa por el bucle primordial vía `GraphEvent`. Los
//! submódulos se llaman desde `main.rs::primordial_loop`.
mod bus_mediator;
mod capabilities;
mod devices;
mod lifecycle;
mod shutdown;
mod topology;
use arje_bus::{BusMessage, BusResponse};
use arje_card::{Capability, EntityCard};
use nix::unistd::Pid;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use tokio::sync::{mpsc, oneshot};
use ulid::Ulid;
// `SHUTDOWN_GRACE` está re-exportado bajo `crate::graph::shutdown::SHUTDOWN_GRACE`
// directo; la re-export adicional aquí no se usa todavía.
/// Bit alto encendido en `seq` para invokes server-iniciados — evita choque
/// con secuencias allocadas por clientes.
pub(in crate::graph) const SERVER_SEQ_FLAG: u64 = 1u64 << 63;
pub struct EnteGraph {
pub(in crate::graph) seed: EntityCard,
/// Entes encarnados como proceso o nodo virtual. id↔pid bidireccional.
pub(in crate::graph) incarnated: HashMap<Ulid, Incarnated>,
pub(in crate::graph) by_pid: HashMap<i32, Ulid>,
/// Quién provee qué capacidad. Resuelve `requires` y `pick_invokable`.
pub(in crate::graph) providers: BTreeMap<Capability, BTreeSet<Ulid>>,
/// Tokens de capability emitidos. Revocables al morir el proveedor.
pub(in crate::graph) next_token: u64,
pub(in crate::graph) grants: HashMap<u64, GrantedCapability>,
/// Dispositivos del kernel presentes (devpath → última UEvent).
pub(in crate::graph) devices: HashMap<String, arje_kernel::UEvent>,
/// Cards genesis pendientes de instanciar (extraídas de la Semilla).
pub(in crate::graph) pending_genesis: Vec<EntityCard>,
/// Hijos directos por lineage. parent → [child, ...].
pub(in crate::graph) children: HashMap<Ulid, Vec<Ulid>>,
/// Conexiones del bus indexadas por la identidad anunciada y verificada
/// con SO_PEERCRED. El value es el extremo de escritura del writer task.
pub(in crate::graph) bus_connections: HashMap<Ulid, mpsc::Sender<BusMessage>>,
/// Invokes forwardeados pendientes de respuesta del proveedor.
pub(in crate::graph) pending_invokes: HashMap<u64, oneshot::Sender<BusResponse>>,
pub(in crate::graph) next_invoke_seq: u64,
}
pub(in crate::graph) struct Incarnated {
pub card: EntityCard,
pub pid: Option<Pid>,
/// Capacidades añadidas en runtime vía BusRequest::UpdateCapabilities.
/// La Card original es immutable; la "vista efectiva" del Ente es
/// `card.provides dynamic_provides`.
pub dynamic_provides: BTreeSet<Capability>,
}
pub(in crate::graph) struct GrantedCapability {
pub cap: Capability,
pub provider: Ulid,
pub holder: Ulid,
/// Instante en el que el grant deja de ser válido. El garbage collector
/// del cerebro purga grants con `Instant::now() > expires_at`.
pub expires_at: std::time::Instant,
}
/// TTL default para grants cuando la cap no tiene override. 60s es un
/// compromiso: largo enough para evitar churn en patrones interactivos,
/// corto enough para que credenciales filtradas expiren rápidamente.
///
/// Reservado para el flujo de capability granting (no cableado todavía).
#[allow(dead_code)]
pub const DEFAULT_GRANT_TTL: std::time::Duration = std::time::Duration::from_secs(60);
/// Quota máxima de tokens activos por (holder, cap). Caps escaladas tienen
/// quota baja para limitar fugas de credenciales; caps de uso frecuente
/// (Endpoint, Journal) son más laxas.
pub fn quota_for_capability(cap: &Capability) -> u32 {
match cap {
// Caps escaladas: pocos tokens, fuerza patrón request-act-release.
Capability::Spawn => 2,
Capability::FilesystemRoot => 2,
Capability::Device { .. } => 4,
// Caps de propósito general.
Capability::Endpoint { .. } => 16,
Capability::KernelNetlink(_) => 4,
Capability::LegacyLogind => 8,
// Logging: hasta 32 streams.
Capability::Journal => 32,
}
}
/// TTL específico por variante de Capability. Caps de mayor riesgo / costo
/// (Spawn, FilesystemRoot) tienen TTL más corto; caps "logging" como
/// Journal pueden vivir más.
///
/// Cualquier cap no listada cae al `DEFAULT_GRANT_TTL`.
pub fn ttl_for_capability(cap: &Capability) -> std::time::Duration {
use std::time::Duration;
match cap {
// Caps escaladas: TTL corto para forzar renovación frecuente.
Capability::Spawn => Duration::from_secs(30),
Capability::FilesystemRoot => Duration::from_secs(30),
Capability::Device { .. } => Duration::from_secs(60),
// Caps de propósito general.
Capability::Endpoint { .. } => Duration::from_secs(300), // 5 min
Capability::KernelNetlink(_) => Duration::from_secs(300),
Capability::LegacyLogind => Duration::from_secs(300),
// Logging puede vivir mucho.
Capability::Journal => Duration::from_secs(3600), // 1h
}
}
impl EnteGraph {
pub fn new(mut seed: EntityCard) -> Self {
// Extraemos genesis antes de almacenar la Semilla — evita duplicación
// en `incarnated[seed.id]`.
let pending_genesis = std::mem::take(&mut seed.genesis);
let mut g = Self {
seed: seed.clone(),
incarnated: HashMap::new(),
by_pid: HashMap::new(),
providers: BTreeMap::new(),
next_token: 1,
grants: HashMap::new(),
devices: HashMap::new(),
pending_genesis,
children: HashMap::new(),
bus_connections: HashMap::new(),
pending_invokes: HashMap::new(),
next_invoke_seq: 0,
};
// El Ente #0 se inscribe a sí mismo como proveedor de las capacidades
// que su Card declara — sólo así los hijos pueden requerirlas.
g.register_provider(&seed);
g.incarnated.insert(seed.id, Incarnated {
card: seed, pid: None,
dynamic_provides: BTreeSet::new(),
});
g
}
pub fn lookup_pid(&self, pid: Pid) -> Option<Ulid> {
self.by_pid.get(&pid.as_raw()).copied()
}
/// Acceso read-only a la Card de un Ente vivo. Usado por el cerebro
/// para hidratar `SubjectInfo` sin clonar todo el mapa.
pub fn peek_card(&self, id: &Ulid) -> Option<&EntityCard> {
self.incarnated.get(id).map(|i| &i.card)
}
/// Identidad de la Semilla. Usado como `requester` para spawns generados
/// por reglas auto-cristalizadas (única identidad con Capability::Spawn).
pub fn seed_id(&self) -> Ulid {
self.seed.id
}
/// Captura el estado live como snapshot serializable. Excluye la Semilla
/// (será re-sintetizada al restore con su seed_id preservado).
pub fn snapshot(&self) -> arje_snapshot::FractalSnapshot {
let entes: Vec<EntityCard> = self.incarnated.iter()
.filter(|(id, _)| **id != self.seed.id)
.map(|(_, inc)| inc.card.clone())
.collect();
arje_snapshot::FractalSnapshot {
version: arje_snapshot::SNAPSHOT_VERSION,
timestamp_ms: arje_snapshot::now_ms(),
seed_id: self.seed.id,
seed_label: self.seed.label.clone(),
entes,
}
}
pub(in crate::graph) fn register_provider(&mut self, card: &EntityCard) {
for cap in &card.provides {
self.providers.entry(cap.clone()).or_default().insert(card.id);
}
}
pub(in crate::graph) fn unregister_provider(&mut self, card: &EntityCard) {
for cap in &card.provides {
if let Some(set) = self.providers.get_mut(cap) {
set.remove(&card.id);
}
}
// Revocar grants emitidos por el Ente fallecido.
let revoked: Vec<u64> = self.grants.iter()
.filter(|(_, g)| g.provider == card.id)
.map(|(t, _)| *t)
.collect();
for t in revoked {
self.grants.remove(&t);
}
}
/// Quita una capacidad dinámica del índice de providers para un Ente
/// específico. Usado al recibir UpdateCapabilities con `removes`.
pub(in crate::graph) fn unregister_dynamic_cap(&mut self, ente_id: Ulid, cap: &Capability) {
if let Some(set) = self.providers.get_mut(cap) {
set.remove(&ente_id);
}
}
/// Inserta una capacidad dinámica al índice de providers para un Ente.
pub(in crate::graph) fn register_dynamic_cap(&mut self, ente_id: Ulid, cap: Capability) {
self.providers.entry(cap).or_default().insert(ente_id);
}
/// El Ente #0 (semilla) tiene todas sus capacidades declaradas. Otros
/// las tienen si su Card las declara o si poseen un grant vivo.
pub(in crate::graph) fn holder_has(&self, holder: Ulid, cap: &Capability) -> bool {
if holder == self.seed.id {
return self.seed.provides.contains(cap);
}
if let Some(inc) = self.incarnated.get(&holder) {
if inc.card.provides.contains(cap) { return true; }
}
self.grants.values().any(|g| g.holder == holder && &g.cap == cap)
}
}
+100
View File
@@ -0,0 +1,100 @@
//! Cascade shutdown: SIGTERM en orden topológico (hojas primero), grace
//! period, SIGKILL para stragglers, reap final.
use super::EnteGraph;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
/// Tiempo que damos a los Entes tras SIGTERM antes de escalar a SIGKILL.
pub const SHUTDOWN_GRACE: Duration = Duration::from_secs(2);
impl EnteGraph {
pub async fn cascade_shutdown(&mut self) {
let order = self.topo_order();
let pids: Vec<Pid> = order.iter()
.filter_map(|id| self.incarnated.get(id).and_then(|i| i.pid))
.collect();
if pids.is_empty() {
info!("cascade shutdown: ningún Ente encarnado, salida limpia");
return;
}
info!(
count = pids.len(), grace_ms = SHUTDOWN_GRACE.as_millis() as u64,
"SIGTERM cascade (topológico, hojas primero)"
);
for pid in &pids {
match kill(*pid, Signal::SIGTERM) {
Ok(()) => {}
Err(Errno::ESRCH) => {} // ya muerto, lo cosecharemos abajo
Err(e) => warn!(?pid, ?e, "kill SIGTERM falló"),
}
}
let deadline = Instant::now() + SHUTDOWN_GRACE;
while Instant::now() < deadline {
if !self.incarnated.values().any(|i| i.pid.is_some()) {
break;
}
match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, code)) => {
self.reap_during_shutdown(pid);
debug!(?pid, code, "reaped (exited)");
}
Ok(WaitStatus::Signaled(pid, sig, _)) => {
self.reap_during_shutdown(pid);
debug!(?pid, ?sig, "reaped (signaled)");
}
Ok(WaitStatus::StillAlive) | Err(Errno::EINTR) => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(Errno::ECHILD) => return,
Ok(_) => {}
Err(e) => {
warn!(?e, "waitpid fallo en shutdown grace");
break;
}
}
}
let stragglers: Vec<Pid> = self.incarnated.values()
.filter_map(|i| i.pid)
.collect();
if stragglers.is_empty() {
info!("cascade shutdown completo (todos los Entes terminaron en gracia)");
return;
}
warn!(count = stragglers.len(), "stragglers post-SIGTERM, escalando a SIGKILL");
for pid in &stragglers {
let _ = kill(*pid, Signal::SIGKILL);
}
loop {
match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, _)) | Ok(WaitStatus::Signaled(pid, _, _)) => {
self.reap_during_shutdown(pid);
}
Ok(WaitStatus::StillAlive) => {
std::thread::sleep(Duration::from_millis(20));
}
Err(Errno::ECHILD) => break,
_ => break,
}
if !self.incarnated.values().any(|i| i.pid.is_some()) { break; }
}
info!("cascade shutdown completo (con SIGKILL)");
}
fn reap_during_shutdown(&mut self, pid: Pid) {
let Some(id) = self.by_pid.remove(&pid.as_raw()) else { return };
if let Some(inc) = self.incarnated.remove(&id) {
self.unregister_provider(&inc.card);
}
}
}
@@ -0,0 +1,35 @@
//! Topología del fractal: índice de hijos por lineage y orden topológico
//! para shutdown.
use super::EnteGraph;
use std::collections::BTreeSet;
use ulid::Ulid;
impl EnteGraph {
/// DFS post-order desde la Semilla. Hojas primero, raíz al final.
/// Garantiza que SIGTERM va a un padre sólo cuando sus hijos ya recibieron
/// la señal (evita orfandad transitoria que confunda Restart supervisors).
pub(in crate::graph) fn topo_order(&self) -> Vec<Ulid> {
let mut visited = BTreeSet::new();
let mut order = Vec::new();
self.dfs_post(self.seed.id, &mut visited, &mut order);
// Entes encarnados sin lineage hacia el seed (no debería pasar pero
// protege contra grafos huérfanos): añadirlos al final.
for id in self.incarnated.keys() {
if !visited.contains(id) {
self.dfs_post(*id, &mut visited, &mut order);
}
}
order
}
fn dfs_post(&self, node: Ulid, visited: &mut BTreeSet<Ulid>, order: &mut Vec<Ulid>) {
if !visited.insert(node) { return; }
if let Some(children) = self.children.get(&node) {
for c in children.clone() {
self.dfs_post(c, visited, order);
}
}
order.push(node);
}
}