diff --git a/Cargo.lock b/Cargo.lock index ebba677..fc4eb8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10146,6 +10146,20 @@ dependencies = [ "serde", ] +[[package]] +name = "sandokan-local" +version = "0.1.0" +dependencies = [ + "arje-incarnate", + "async-trait", + "brahman-card", + "nix 0.29.0", + "sandokan-core", + "sandokan-lifecycle", + "tokio", + "ulid", +] + [[package]] name = "saphyr-parser" version = "0.0.6" diff --git a/Cargo.toml b/Cargo.toml index ac5cb00..f8c916e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "crates/runtime/arje-echo", "crates/runtime/sandokan-lifecycle", "crates/runtime/sandokan-core", + "crates/runtime/sandokan-local", # ============================================================ # compat/ — Shims D-Bus para correr software systemd-aware diff --git a/crates/runtime/sandokan-local/Cargo.toml b/crates/runtime/sandokan-local/Cargo.toml new file mode 100644 index 0000000..eee6b27 --- /dev/null +++ b/crates/runtime/sandokan-local/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "sandokan-local" +description = "LocalEngine: implementación in-process del orquestador sandokan. Encarna Cards vía arje-incarnate, trackea lifecycle, reaping." +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true + +[dependencies] +sandokan-core = { path = "../sandokan-core" } +sandokan-lifecycle = { path = "../sandokan-lifecycle" } +arje-incarnate = { path = "../../init/arje-incarnate" } +brahman-card = { path = "../../protocol/brahman-card" } +async-trait = { workspace = true } +tokio = { workspace = true } +nix = { workspace = true } +ulid = { workspace = true } diff --git a/crates/runtime/sandokan-local/src/lib.rs b/crates/runtime/sandokan-local/src/lib.rs new file mode 100644 index 0000000..8c36df5 --- /dev/null +++ b/crates/runtime/sandokan-local/src/lib.rs @@ -0,0 +1,229 @@ +//! sandokan-local — `LocalEngine`, orquestación in-process. +//! +//! Primera implementación del trait [`Engine`]: encarna Cards en el +//! mismo host vía `arje-incarnate`, mantiene un registro de las +//! entidades activas y hace reaping perezoso (waitpid WNOHANG en cada +//! consulta) para detectar salidas sin un task de fondo dedicado. +//! +//! `DaemonEngine` y `RemoteEngine` (transportes) se construirán sobre +//! este mismo contrato en crates separados. + +mod proc; + +use arje_incarnate::{Incarnator, IncarnatorConfig}; +use async_trait::async_trait; +use nix::sys::signal::{kill, Signal}; +use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; +use nix::unistd::Pid; +use sandokan_core::{Engine, EngineError, ExecHandle, Intent, TelemetryFrame}; +use sandokan_lifecycle::LifecycleState; +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::{Duration, Instant, SystemTime}; +use ulid::Ulid; + +/// Una entidad encarnada y su estado conocido. +struct Entity { + handle: ExecHandle, + pid: i32, + state: LifecycleState, +} + +/// Orquestador in-process. Encarna Cards localmente y trackea su lifecycle. +pub struct LocalEngine { + base_cfg: IncarnatorConfig, + registry: Mutex>, +} + +impl LocalEngine { + /// Crea un engine con configuración de incarnación por defecto. + pub fn new() -> Self { + Self::with_config(IncarnatorConfig::default()) + } + + /// Crea un engine con una `IncarnatorConfig` explícita (bus socket, + /// env extra, strict_caps). + pub fn with_config(cfg: IncarnatorConfig) -> Self { + Self { + base_cfg: cfg, + registry: Mutex::new(HashMap::new()), + } + } + + /// Marca el estado de una entidad (best-effort; ignora lock envenenado). + fn mark(&self, card_id: Ulid, state: LifecycleState) { + if let Ok(mut reg) = self.registry.lock() { + if let Some(ent) = reg.get_mut(&card_id) { + ent.state = state; + } + } + } +} + +impl Default for LocalEngine { + fn default() -> Self { + Self::new() + } +} + +/// Reaping no-bloqueante de un pid. `Some(estado terminal)` si la entidad +/// transicionó; `None` si sigue viva. +fn reap(pid: i32) -> Option { + match waitpid(Pid::from_raw(pid), Some(WaitPidFlag::WNOHANG)) { + Ok(WaitStatus::Exited(_, code)) => Some(LifecycleState::Exited { code }), + Ok(WaitStatus::Signaled(_, _, _)) => Some(LifecycleState::Killed), + Ok(WaitStatus::StillAlive) => None, + // Stopped / Continued / PtraceEvent: la entidad sigue presente. + Ok(_) => None, + Err(nix::errno::Errno::ECHILD) => { + // No es (ya) hijo reapable. Si procfs no lo tiene, terminó. + if proc::proc_exists(pid) { + None + } else { + Some(LifecycleState::Exited { code: -1 }) + } + } + Err(_) => None, + } +} + +#[async_trait] +impl Engine for LocalEngine { + async fn run(&self, intent: Intent) -> Result { + let card_id = intent.card_id(); + let label = intent.card.label.clone(); + + // El env del contexto se mergea sobre el del engine base. + let mut cfg = self.base_cfg.clone(); + cfg.extra_env.extend(intent.context.env.clone()); + + // NOTA v1: `IsolationLevel` es advisory. None/Standard encarnan + // según `Card.soma`; Sealed (rootfs aislado vía pivot_root + + // OverlayFS) queda reservado para cuando el Intent transporte una + // spec de rootfs — `arje-incarnate` ya expone los builders. + let incarnator = Incarnator::new(cfg); + let outcome = incarnator + .incarnate(&intent.card) + .map_err(|e| EngineError::Incarnate(e.to_string()))?; + + let handle = ExecHandle { + card_id, + label, + started_at: SystemTime::now(), + }; + let mut reg = self.registry.lock().expect("registry lock"); + reg.insert( + card_id, + Entity { + handle: handle.clone(), + pid: outcome.pid.as_raw(), + state: LifecycleState::Running, + }, + ); + Ok(handle) + } + + async fn stop(&self, card_id: Ulid, grace: Duration) -> Result<(), EngineError> { + let pid = { + let reg = self.registry.lock().expect("registry lock"); + reg.get(&card_id) + .map(|e| e.pid) + .ok_or(EngineError::NotFound(card_id))? + }; + let npid = Pid::from_raw(pid); + + // SIGTERM + período de gracia: damos chance a un cierre ordenado. + if grace > Duration::ZERO { + let _ = kill(npid, Signal::SIGTERM); + let deadline = Instant::now() + grace; + loop { + if reap(pid).is_some() { + self.mark(card_id, LifecycleState::Killed); + return Ok(()); + } + if Instant::now() >= deadline { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + } + + // SIGKILL + reap bloqueante de lo que haya quedado. + let _ = kill(npid, Signal::SIGKILL); + let _ = waitpid(npid, None); + self.mark(card_id, LifecycleState::Killed); + Ok(()) + } + + async fn list(&self) -> Result, EngineError> { + let mut reg = self.registry.lock().expect("registry lock"); + let mut out = Vec::new(); + for ent in reg.values_mut() { + if !ent.state.is_terminal() { + if let Some(new_state) = reap(ent.pid) { + ent.state = new_state; + } + } + if !ent.state.is_terminal() { + out.push(ent.handle.clone()); + } + } + Ok(out) + } + + async fn status(&self, card_id: Ulid) -> Result { + let mut reg = self.registry.lock().expect("registry lock"); + let ent = reg + .get_mut(&card_id) + .ok_or(EngineError::NotFound(card_id))?; + if !ent.state.is_terminal() { + if let Some(new_state) = reap(ent.pid) { + ent.state = new_state; + } + } + Ok(ent.state.clone()) + } + + async fn telemetry(&self, card_id: Ulid) -> Result { + let pid = { + let reg = self.registry.lock().expect("registry lock"); + reg.get(&card_id) + .map(|e| e.pid) + .ok_or(EngineError::NotFound(card_id))? + }; + Ok(TelemetryFrame { + card_id, + at: SystemTime::now(), + mem_bytes: proc::read_mem_bytes(pid), + nproc: proc::read_thread_count(pid), + // v1: CPU% requiere dos samples espaciados — pendiente. + cpu_pct: 0.0, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn empty_engine_lists_nothing() { + let e = LocalEngine::new(); + assert!(e.list().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn unknown_card_is_not_found() { + let e = LocalEngine::new(); + let id = Ulid::new(); + assert!(matches!(e.status(id).await, Err(EngineError::NotFound(_)))); + assert!(matches!( + e.stop(id, Duration::ZERO).await, + Err(EngineError::NotFound(_)) + )); + assert!(matches!( + e.telemetry(id).await, + Err(EngineError::NotFound(_)) + )); + } +} diff --git a/crates/runtime/sandokan-local/src/proc.rs b/crates/runtime/sandokan-local/src/proc.rs new file mode 100644 index 0000000..4eeb324 --- /dev/null +++ b/crates/runtime/sandokan-local/src/proc.rs @@ -0,0 +1,63 @@ +//! Lectura de `/proc//` para telemetría. Sin invocar binarios +//! externos (`ps`, `free`): syscalls + lectura directa de procfs. + +/// RSS en bytes desde `/proc//status` (línea `VmRSS:`). +/// Devuelve 0 si el proceso ya no existe o la línea falta. +pub fn read_mem_bytes(pid: i32) -> u64 { + let path = format!("/proc/{pid}/status"); + let Ok(content) = std::fs::read_to_string(&path) else { + return 0; + }; + for line in content.lines() { + if let Some(rest) = line.strip_prefix("VmRSS:") { + // formato: "VmRSS:\t 1234 kB" + let kb: u64 = rest + .split_whitespace() + .next() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + return kb * 1024; + } + } + 0 +} + +/// Número de threads desde `/proc//status` (línea `Threads:`). +/// Devuelve 0 si el proceso ya no existe. +pub fn read_thread_count(pid: i32) -> u32 { + let path = format!("/proc/{pid}/status"); + let Ok(content) = std::fs::read_to_string(&path) else { + return 0; + }; + for line in content.lines() { + if let Some(rest) = line.strip_prefix("Threads:") { + return rest.trim().parse().unwrap_or(0); + } + } + 0 +} + +/// `true` si `/proc/` existe (el proceso, vivo o zombie, está presente). +pub fn proc_exists(pid: i32) -> bool { + std::path::Path::new(&format!("/proc/{pid}")).exists() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn self_has_positive_rss_and_threads() { + let me = std::process::id() as i32; + assert!(read_mem_bytes(me) > 0, "el propio proceso debe tener RSS"); + assert!(read_thread_count(me) >= 1); + assert!(proc_exists(me)); + } + + #[test] + fn nonexistent_pid_is_zero() { + // PID improbable de existir. + assert_eq!(read_mem_bytes(2_000_000_000), 0); + assert!(!proc_exists(2_000_000_000)); + } +}