feat(sandokan-local): B1.2 — LocalEngine

Primera implementación del trait Engine: orquestación in-process.

- LocalEngine encarna Cards vía arje-incarnate, mantiene un registro
  HashMap<Ulid, Entity> de entidades activas.
- run()       — incarnate + registro; mergea env del contexto.
- stop()      — SIGTERM + período de gracia + SIGKILL + reap.
- list()      — reaping perezoso (waitpid WNOHANG) + handles activos.
- status()    — reaping perezoso + LifecycleState.
- telemetry() — lee /proc/<pid>/status (VmRSS + Threads), sin invocar
                binarios externos.
- Reaping sin task de fondo: cada consulta hace waitpid WNOHANG.

proc.rs: lectura directa de procfs (mem_bytes, thread_count, proc_exists).

4 tests verdes (2 proc + 2 engine: empty list, NotFound paths).
cargo check --workspace verde.

v1: IsolationLevel es advisory (Sealed reservado para cuando el Intent
transporte rootfs spec). CPU% pendiente (requiere 2 samples).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-20 13:57:26 +00:00
parent af5d4a1f22
commit cba3a9dd6e
5 changed files with 325 additions and 0 deletions
Generated
+14
View File
@@ -10146,6 +10146,20 @@ dependencies = [
"serde",
]
[[package]]
name = "sandokan-local"
version = "0.1.0"
dependencies = [
"arje-incarnate",
"async-trait",
"brahman-card",
"nix 0.29.0",
"sandokan-core",
"sandokan-lifecycle",
"tokio",
"ulid",
]
[[package]]
name = "saphyr-parser"
version = "0.0.6"
+1
View File
@@ -36,6 +36,7 @@ members = [
"crates/runtime/arje-echo",
"crates/runtime/sandokan-lifecycle",
"crates/runtime/sandokan-core",
"crates/runtime/sandokan-local",
# ============================================================
# compat/ — Shims D-Bus para correr software systemd-aware
+18
View File
@@ -0,0 +1,18 @@
[package]
name = "sandokan-local"
description = "LocalEngine: implementación in-process del orquestador sandokan. Encarna Cards vía arje-incarnate, trackea lifecycle, reaping."
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
[dependencies]
sandokan-core = { path = "../sandokan-core" }
sandokan-lifecycle = { path = "../sandokan-lifecycle" }
arje-incarnate = { path = "../../init/arje-incarnate" }
brahman-card = { path = "../../protocol/brahman-card" }
async-trait = { workspace = true }
tokio = { workspace = true }
nix = { workspace = true }
ulid = { workspace = true }
+229
View File
@@ -0,0 +1,229 @@
//! sandokan-local — `LocalEngine`, orquestación in-process.
//!
//! Primera implementación del trait [`Engine`]: encarna Cards en el
//! mismo host vía `arje-incarnate`, mantiene un registro de las
//! entidades activas y hace reaping perezoso (waitpid WNOHANG en cada
//! consulta) para detectar salidas sin un task de fondo dedicado.
//!
//! `DaemonEngine` y `RemoteEngine` (transportes) se construirán sobre
//! este mismo contrato en crates separados.
mod proc;
use arje_incarnate::{Incarnator, IncarnatorConfig};
use async_trait::async_trait;
use nix::sys::signal::{kill, Signal};
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use sandokan_core::{Engine, EngineError, ExecHandle, Intent, TelemetryFrame};
use sandokan_lifecycle::LifecycleState;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant, SystemTime};
use ulid::Ulid;
/// Una entidad encarnada y su estado conocido.
struct Entity {
handle: ExecHandle,
pid: i32,
state: LifecycleState,
}
/// Orquestador in-process. Encarna Cards localmente y trackea su lifecycle.
pub struct LocalEngine {
base_cfg: IncarnatorConfig,
registry: Mutex<HashMap<Ulid, Entity>>,
}
impl LocalEngine {
/// Crea un engine con configuración de incarnación por defecto.
pub fn new() -> Self {
Self::with_config(IncarnatorConfig::default())
}
/// Crea un engine con una `IncarnatorConfig` explícita (bus socket,
/// env extra, strict_caps).
pub fn with_config(cfg: IncarnatorConfig) -> Self {
Self {
base_cfg: cfg,
registry: Mutex::new(HashMap::new()),
}
}
/// Marca el estado de una entidad (best-effort; ignora lock envenenado).
fn mark(&self, card_id: Ulid, state: LifecycleState) {
if let Ok(mut reg) = self.registry.lock() {
if let Some(ent) = reg.get_mut(&card_id) {
ent.state = state;
}
}
}
}
impl Default for LocalEngine {
fn default() -> Self {
Self::new()
}
}
/// Reaping no-bloqueante de un pid. `Some(estado terminal)` si la entidad
/// transicionó; `None` si sigue viva.
fn reap(pid: i32) -> Option<LifecycleState> {
match waitpid(Pid::from_raw(pid), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(_, code)) => Some(LifecycleState::Exited { code }),
Ok(WaitStatus::Signaled(_, _, _)) => Some(LifecycleState::Killed),
Ok(WaitStatus::StillAlive) => None,
// Stopped / Continued / PtraceEvent: la entidad sigue presente.
Ok(_) => None,
Err(nix::errno::Errno::ECHILD) => {
// No es (ya) hijo reapable. Si procfs no lo tiene, terminó.
if proc::proc_exists(pid) {
None
} else {
Some(LifecycleState::Exited { code: -1 })
}
}
Err(_) => None,
}
}
#[async_trait]
impl Engine for LocalEngine {
async fn run(&self, intent: Intent) -> Result<ExecHandle, EngineError> {
let card_id = intent.card_id();
let label = intent.card.label.clone();
// El env del contexto se mergea sobre el del engine base.
let mut cfg = self.base_cfg.clone();
cfg.extra_env.extend(intent.context.env.clone());
// NOTA v1: `IsolationLevel` es advisory. None/Standard encarnan
// según `Card.soma`; Sealed (rootfs aislado vía pivot_root +
// OverlayFS) queda reservado para cuando el Intent transporte una
// spec de rootfs — `arje-incarnate` ya expone los builders.
let incarnator = Incarnator::new(cfg);
let outcome = incarnator
.incarnate(&intent.card)
.map_err(|e| EngineError::Incarnate(e.to_string()))?;
let handle = ExecHandle {
card_id,
label,
started_at: SystemTime::now(),
};
let mut reg = self.registry.lock().expect("registry lock");
reg.insert(
card_id,
Entity {
handle: handle.clone(),
pid: outcome.pid.as_raw(),
state: LifecycleState::Running,
},
);
Ok(handle)
}
async fn stop(&self, card_id: Ulid, grace: Duration) -> Result<(), EngineError> {
let pid = {
let reg = self.registry.lock().expect("registry lock");
reg.get(&card_id)
.map(|e| e.pid)
.ok_or(EngineError::NotFound(card_id))?
};
let npid = Pid::from_raw(pid);
// SIGTERM + período de gracia: damos chance a un cierre ordenado.
if grace > Duration::ZERO {
let _ = kill(npid, Signal::SIGTERM);
let deadline = Instant::now() + grace;
loop {
if reap(pid).is_some() {
self.mark(card_id, LifecycleState::Killed);
return Ok(());
}
if Instant::now() >= deadline {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
// SIGKILL + reap bloqueante de lo que haya quedado.
let _ = kill(npid, Signal::SIGKILL);
let _ = waitpid(npid, None);
self.mark(card_id, LifecycleState::Killed);
Ok(())
}
async fn list(&self) -> Result<Vec<ExecHandle>, EngineError> {
let mut reg = self.registry.lock().expect("registry lock");
let mut out = Vec::new();
for ent in reg.values_mut() {
if !ent.state.is_terminal() {
if let Some(new_state) = reap(ent.pid) {
ent.state = new_state;
}
}
if !ent.state.is_terminal() {
out.push(ent.handle.clone());
}
}
Ok(out)
}
async fn status(&self, card_id: Ulid) -> Result<LifecycleState, EngineError> {
let mut reg = self.registry.lock().expect("registry lock");
let ent = reg
.get_mut(&card_id)
.ok_or(EngineError::NotFound(card_id))?;
if !ent.state.is_terminal() {
if let Some(new_state) = reap(ent.pid) {
ent.state = new_state;
}
}
Ok(ent.state.clone())
}
async fn telemetry(&self, card_id: Ulid) -> Result<TelemetryFrame, EngineError> {
let pid = {
let reg = self.registry.lock().expect("registry lock");
reg.get(&card_id)
.map(|e| e.pid)
.ok_or(EngineError::NotFound(card_id))?
};
Ok(TelemetryFrame {
card_id,
at: SystemTime::now(),
mem_bytes: proc::read_mem_bytes(pid),
nproc: proc::read_thread_count(pid),
// v1: CPU% requiere dos samples espaciados — pendiente.
cpu_pct: 0.0,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn empty_engine_lists_nothing() {
let e = LocalEngine::new();
assert!(e.list().await.unwrap().is_empty());
}
#[tokio::test]
async fn unknown_card_is_not_found() {
let e = LocalEngine::new();
let id = Ulid::new();
assert!(matches!(e.status(id).await, Err(EngineError::NotFound(_))));
assert!(matches!(
e.stop(id, Duration::ZERO).await,
Err(EngineError::NotFound(_))
));
assert!(matches!(
e.telemetry(id).await,
Err(EngineError::NotFound(_))
));
}
}
+63
View File
@@ -0,0 +1,63 @@
//! Lectura de `/proc/<pid>/` para telemetría. Sin invocar binarios
//! externos (`ps`, `free`): syscalls + lectura directa de procfs.
/// RSS en bytes desde `/proc/<pid>/status` (línea `VmRSS:`).
/// Devuelve 0 si el proceso ya no existe o la línea falta.
pub fn read_mem_bytes(pid: i32) -> u64 {
let path = format!("/proc/{pid}/status");
let Ok(content) = std::fs::read_to_string(&path) else {
return 0;
};
for line in content.lines() {
if let Some(rest) = line.strip_prefix("VmRSS:") {
// formato: "VmRSS:\t 1234 kB"
let kb: u64 = rest
.split_whitespace()
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
return kb * 1024;
}
}
0
}
/// Número de threads desde `/proc/<pid>/status` (línea `Threads:`).
/// Devuelve 0 si el proceso ya no existe.
pub fn read_thread_count(pid: i32) -> u32 {
let path = format!("/proc/{pid}/status");
let Ok(content) = std::fs::read_to_string(&path) else {
return 0;
};
for line in content.lines() {
if let Some(rest) = line.strip_prefix("Threads:") {
return rest.trim().parse().unwrap_or(0);
}
}
0
}
/// `true` si `/proc/<pid>` existe (el proceso, vivo o zombie, está presente).
pub fn proc_exists(pid: i32) -> bool {
std::path::Path::new(&format!("/proc/{pid}")).exists()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn self_has_positive_rss_and_threads() {
let me = std::process::id() as i32;
assert!(read_mem_bytes(me) > 0, "el propio proceso debe tener RSS");
assert!(read_thread_count(me) >= 1);
assert!(proc_exists(me));
}
#[test]
fn nonexistent_pid_is_zero() {
// PID improbable de existir.
assert_eq!(read_mem_bytes(2_000_000_000), 0);
assert!(!proc_exists(2_000_000_000));
}
}