diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index 9740f3f..632e33c 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -36,6 +36,9 @@ enum Cmd { /// ULID del workspace destino. #[arg(short = 'w', long)] workspace: String, + /// Si exit != 0, relanzar con backoff exponencial. + #[arg(long)] + restart_on_failure: bool, /// Path del ejecutable. exec: String, /// Argumentos del comando. @@ -164,6 +167,10 @@ enum WsCmd { Stats { id: String, }, + /// Quota report: rlimits declarados vs uso actual. + Quota { + id: String, + }, } #[tokio::main] @@ -251,7 +258,9 @@ async fn main() -> Result<()> { .unwrap_or_else(|| "—".into()); let cpu_pct = info .cpu_percent - .map(|p| format!("{p:.1} %")) + .map(|p| format!("{p:.1} % ({:.1}% total / {} cores)", + if info.cpu_cores > 0 { p / info.cpu_cores as f32 } else { p }, + info.cpu_cores)) .unwrap_or_else(|| "— (esperando 2do sample)".into()); println!("rss: {rss}"); println!("rss_peak: {peak}"); @@ -265,6 +274,35 @@ async fn main() -> Result<()> { } } + Cmd::Workspace(WsCmd::Quota { id }) => { + let id = parse_ws_id(&id)?; + let resp = round_trip(&mut stream, Request::WorkspaceQuota { workspace: id }).await?; + match resp { + Response::WorkspaceQuota { info } => { + let mem = info + .mem_limit + .map(|b| format!("{:.2} MiB", b as f64 / 1024.0 / 1024.0)) + .unwrap_or_else(|| "—".into()); + let nproc = info + .nproc_limit + .map(|n| n.to_string()) + .unwrap_or_else(|| "—".into()); + println!("mem_limit: {mem}"); + println!("nproc_limit: {nproc}"); + if info.breaches.is_empty() { + println!("breaches: (none — dentro de quota)"); + } else { + println!("breaches:"); + for b in info.breaches { + println!(" - {b}"); + } + } + } + Response::Error { message } => return Err(anyhow!(message)), + other => print_unexpected(&other), + } + } + Cmd::Workspace(WsCmd::Stop { id, grace_ms }) => { let id = parse_ws_id(&id)?; let resp = round_trip(&mut stream, Request::WorkspaceStop { id, grace_ms }).await?; @@ -277,7 +315,7 @@ async fn main() -> Result<()> { } } - Cmd::Run { workspace, exec, argv } => { + Cmd::Run { workspace, exec, argv, restart_on_failure } => { let id = parse_ws_id(&workspace)?; let resp = round_trip( &mut stream, @@ -286,6 +324,7 @@ async fn main() -> Result<()> { exec, argv, envp: vec![], + restart_on_failure, }, ) .await?; diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index d065c10..9f024b0 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -17,7 +17,8 @@ use shipote_core::WorkspaceManager; use shipote_discern::{DiscernPipeline, Hint}; use shipote_protocol::{ default_socket_path, read_frame, write_frame, CommandInfo as ProtoCommandInfo, - EdgeDiscernmentInfo, FlowInfo, Request, Response, WorkspaceStatsInfo, WorkspaceSummary, + EdgeDiscernmentInfo, FlowInfo, QuotaReportInfo, Request, Response, WorkspaceStatsInfo, + WorkspaceSummary, }; use std::sync::Arc; use tokio::net::{UnixListener, UnixStream}; @@ -180,8 +181,11 @@ async fn dispatch( } } - Request::Run { workspace, exec, argv, envp } => { - match mgr.run(workspace, exec, argv, envp).await { + Request::Run { workspace, exec, argv, envp, restart_on_failure } => { + match mgr + .run_with_options(workspace, exec, argv, envp, restart_on_failure) + .await + { Ok(s) => Response::RunStarted { workspace, command_id: s.id, @@ -345,6 +349,7 @@ async fn dispatch( rss_peak_bytes: s.rss_peak_bytes, cpu_usec: s.cpu_usec, cpu_percent: s.cpu_percent, + cpu_cores: s.cpu_cores, source: s.source, uptime_ms: s.uptime_ms, }, @@ -354,6 +359,19 @@ async fn dispatch( }, }, + Request::WorkspaceQuota { workspace } => match mgr.workspace_quota(workspace).await { + Some(q) => Response::WorkspaceQuota { + info: QuotaReportInfo { + mem_limit: q.mem_limit, + nproc_limit: q.nproc_limit, + breaches: q.breaches, + }, + }, + None => Response::Error { + message: format!("workspace {workspace} not found"), + }, + }, + Request::FlowList => { let items = mgr .list_flow_pipelines() diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index a72e74c..239b60f 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -94,6 +94,24 @@ struct Inner { /// el reaper los borra (futuro). v1: viven hasta `stop_pipeline_flows` /// explícito o hasta shutdown. pipeline_flows: HashMap>, + /// Specs de comandos `run()` con `restart_on_failure=true`. Indexed + /// por command_id. Cuando `reap_dead` detecta exit!=0, se relauncha + /// con la misma spec (nuevo pid y nuevo command_id se asigna por + /// el nuevo state pero el restart_spec sigue ligado al original). + restart_specs: HashMap, +} + +#[derive(Debug, Clone)] +struct RestartSpec { + workspace: WorkspaceId, + exec: String, + argv: Vec, + envp: Vec<(String, String)>, + /// Backoff inicial (ms). Crece exponencialmente hasta max_backoff_ms. + backoff_ms: u64, + max_backoff_ms: u64, + /// Cantidad de restarts ya ejecutados (para tracking). + restart_count: u32, } #[derive(Debug, Clone)] @@ -179,6 +197,7 @@ impl WorkspaceManager { workspaces: HashMap::new(), saved_pipelines: HashMap::new(), pipeline_flows: HashMap::new(), + restart_specs: HashMap::new(), })), incarnator: Arc::new(Incarnator::new(cfg)), } @@ -353,6 +372,39 @@ impl WorkspaceManager { .map(|w| w.spec.label.clone()) } + /// Compara accounting real (RSS, commands_alive) contra los rlimits + /// declarados en `SomaSpec`. Devuelve violaciones humanizadas. NO + /// hace enforcement automático. + pub async fn workspace_quota(&self, id: WorkspaceId) -> Option { + let stats_now = self.workspace_stats(id).await?; + let g = self.inner.lock().await; + let ws = g.workspaces.get(&id)?; + let rl = &ws.spec.soma.rlimits; + let mut report = stats::QuotaReport { + mem_limit: rl.mem_bytes, + nproc_limit: rl.nproc, + breaches: Vec::new(), + }; + if let (Some(limit), Some(used)) = (rl.mem_bytes, stats_now.rss_bytes) { + if used > limit { + report.breaches.push(format!( + "memory: {:.2} MiB > {:.2} MiB limit", + used as f64 / 1024.0 / 1024.0, + limit as f64 / 1024.0 / 1024.0, + )); + } + } + if let Some(limit) = rl.nproc { + if stats_now.commands_alive > limit { + report.breaches.push(format!( + "nproc: {} alive > {} limit", + stats_now.commands_alive, limit + )); + } + } + Some(report) + } + /// Estadísticas de recursos del workspace: RSS + CPU agregado de sus /// comandos vivos. Lee `/proc//` directamente; si el spec declara /// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye @@ -531,6 +583,20 @@ impl WorkspaceManager { exec: String, argv: Vec, envp: Vec<(String, String)>, + ) -> Result { + self.run_with_options(id, exec, argv, envp, false).await + } + + /// Variante con `restart_on_failure`: si el comando muere con + /// exit_status != 0, el reaper lo relauncha con backoff exponencial + /// (200ms → 400 → 800 → … cap 30s). + pub async fn run_with_options( + &self, + id: WorkspaceId, + exec: String, + argv: Vec, + envp: Vec<(String, String)>, + restart_on_failure: bool, ) -> Result { let workspace_label = { let g = self.inner.lock().await; @@ -593,6 +659,23 @@ impl WorkspaceManager { }, ); } + if restart_on_failure { + // Reextract exec/argv/envp del payload del CommandRef. + if let Payload::Native { exec, argv, envp } = &cmd_ref.payload { + g.restart_specs.insert( + cmd_id, + RestartSpec { + workspace: id, + exec: exec.clone(), + argv: argv.clone(), + envp: envp.clone(), + backoff_ms: 200, + max_backoff_ms: 30_000, + restart_count: 0, + }, + ); + } + } for d in &out.degradations { warn!(?d, %id, "command incarnation degradation"); } @@ -693,25 +776,99 @@ impl WorkspaceManager { /// Cosecha hijos terminados (no-bloqueante). Llamar periódicamente desde /// el daemon o ante SIGCHLD. Marca `alive=false` y guarda exit_status. - pub async fn reap_dead(&self) { - let mut g = self.inner.lock().await; - for ws in g.workspaces.values_mut() { - for cmd in ws.commands.values_mut() { - if !cmd.alive { - continue; - } - match waitpid(cmd.pid, Some(WaitPidFlag::WNOHANG)) { - Ok(WaitStatus::Exited(_, code)) => { - cmd.alive = false; - cmd.exit_status = Some(code); + pub async fn reap_dead(self: &Arc) { + let mut to_restart: Vec = Vec::new(); + { + let mut g = self.inner.lock().await; + for ws in g.workspaces.values_mut() { + for cmd in ws.commands.values_mut() { + if !cmd.alive { + continue; } - Ok(WaitStatus::Signaled(_, sig, _)) => { - cmd.alive = false; - cmd.exit_status = Some(128 + (sig as i32)); + match waitpid(cmd.pid, Some(WaitPidFlag::WNOHANG)) { + Ok(WaitStatus::Exited(_, code)) => { + cmd.alive = false; + cmd.exit_status = Some(code); + } + Ok(WaitStatus::Signaled(_, sig, _)) => { + cmd.alive = false; + cmd.exit_status = Some(128 + (sig as i32)); + } + _ => {} } - _ => {} } } + // Detectar restart_specs cuyo command_id ya está dead con exit!=0. + let mut to_remove: Vec = Vec::new(); + for (cmd_id, spec) in g.restart_specs.iter() { + let mut should_restart = false; + let mut should_drop = false; + 'outer: for ws in g.workspaces.values() { + if let Some(cmd) = ws.commands.get(cmd_id) { + if !cmd.alive { + match cmd.exit_status { + Some(0) => should_drop = true, + Some(_) => should_restart = true, + None => {} + } + break 'outer; + } + } + } + if should_drop { + to_remove.push(*cmd_id); + } else if should_restart { + to_restart.push(spec.clone()); + to_remove.push(*cmd_id); + } + } + for id in to_remove { + g.restart_specs.remove(&id); + } + } + // Schedule restart fuera del lock. + for mut spec in to_restart { + let mgr = self.clone(); + let backoff = std::time::Duration::from_millis(spec.backoff_ms); + // Subir el backoff para la PRÓXIMA falla, no esta. + spec.backoff_ms = (spec.backoff_ms * 2).min(spec.max_backoff_ms); + spec.restart_count += 1; + let restart_n = spec.restart_count; + tokio::spawn(async move { + tokio::time::sleep(backoff).await; + info!( + backoff_ms = backoff.as_millis() as u64, + restart = restart_n, + "restarting failed command" + ); + let workspace = spec.workspace; + if let Err(e) = mgr + .run_with_options(workspace, spec.exec.clone(), spec.argv.clone(), spec.envp.clone(), true) + .await + { + warn!(?e, "restart failed to launch"); + return; + } + // Preservar backoff acumulado: localizar el nuevo command_id + // (el más reciente vivo en el workspace) y sobreescribir. + let new_cmd_id = { + let g = mgr.inner.lock().await; + g.workspaces.get(&workspace).and_then(|ws| { + ws.commands + .values() + .filter(|c| c.alive) + .max_by_key(|c| c.id) + .map(|c| c.id) + }) + }; + if let Some(new_id) = new_cmd_id { + let mut g = mgr.inner.lock().await; + if let Some(existing) = g.restart_specs.get_mut(&new_id) { + existing.backoff_ms = spec.backoff_ms; + existing.restart_count = spec.restart_count; + } + } + }); } } } @@ -850,6 +1007,41 @@ mod tests { panic!("logs never captured on both streams"); } + #[tokio::test] + async fn restart_on_failure_relaunches_failing_command() { + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let spec = WorkspaceSpec { + label: "restart".into(), + soma: Default::default(), + permissions: Default::default(), + ttl: None, + flow_dirs: vec![], + on_exit: shipote_card::ExitPolicy::Reap, + }; + let (id, _) = mgr.create(spec).await.unwrap(); + // /bin/false sale con exit=1. Con restart_on_failure=true debería + // relanzarse al menos 1 vez (tras el backoff inicial de 200ms). + let summary = mgr + .run_with_options(id, "/bin/false".into(), vec![], vec![], true) + .await + .unwrap(); + let original_id = summary.id; + // Esperamos ~500ms para que termine + reap + restart corra. + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + mgr.reap_dead().await; + let g = mgr.inner.lock().await; + if let Some(ws) = g.workspaces.get(&id) { + let new_cmds: Vec<_> = ws.commands.keys().filter(|k| **k != original_id).collect(); + if !new_cmds.is_empty() { + // Hay un nuevo command_id → restart funcionó. + return; + } + } + } + panic!("restart never launched a new command"); + } + #[tokio::test] async fn run_true_in_workspace() { let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); diff --git a/crates/modules/shipote/shipote-core/src/stats.rs b/crates/modules/shipote/shipote-core/src/stats.rs index cd199ec..481d7cf 100644 --- a/crates/modules/shipote/shipote-core/src/stats.rs +++ b/crates/modules/shipote/shipote-core/src/stats.rs @@ -25,13 +25,50 @@ pub struct WorkspaceStats { pub cpu_usec: Option, /// %CPU instantáneo derivado entre dos samples consecutivos. `None` /// en el primer sample (no hay baseline). `100.0` = 1 core saturado. + /// `400.0` con 4 cores activos = la máquina al 100%. pub cpu_percent: Option, + /// Cores online detectados (sysconf `_SC_NPROCESSORS_ONLN`). Útil + /// para normalizar `cpu_percent / cpu_cores` → 0..100 absoluto. + pub cpu_cores: u32, /// Fuente del dato: "proc" | "cgroup" | "mixed". pub source: String, /// Wall-clock uptime del workspace en milisegundos. pub uptime_ms: u64, } +impl WorkspaceStats { + /// CPU% normalizado al 100% total de la máquina (no por core). + /// Útil para comparar workspaces independiente del paralelismo. + pub fn cpu_percent_total(&self) -> Option { + self.cpu_percent + .map(|p| if self.cpu_cores == 0 { p } else { p / self.cpu_cores as f32 }) + } +} + +/// Reporte de quotas: comparación entre el accounting real y los +/// `rlimits` declarados en `SomaSpec`. NO hace enforcement automático +/// en v1 — sólo accounting + reporting. El caller decide qué hacer. +#[derive(Debug, Clone, Default)] +pub struct QuotaReport { + /// Límite de memoria declarado (bytes). None = sin límite. + pub mem_limit: Option, + /// Límite de procesos declarado. + pub nproc_limit: Option, + /// Lista de violaciones detectadas (strings humano-legibles). + /// Empty = todo dentro de quota. + pub breaches: Vec, +} + +/// Detecta cores online runtime. Cacheado vía OnceLock — el valor no +/// cambia salvo hotplug, que es raro y aceptamos sample stale. +fn online_cores() -> u32 { + static CACHED: std::sync::OnceLock = std::sync::OnceLock::new(); + *CACHED.get_or_init(|| { + let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) }; + if n > 0 { n as u32 } else { 1 } + }) +} + /// Mide stats para un set de PIDs vivos + un path de cgroup opcional. pub fn measure( alive_pids: &[i32], @@ -71,6 +108,7 @@ pub fn measure( rss_peak_bytes: rss_peak, cpu_usec: cpu, cpu_percent: None, // El caller lo rellena con el diff vs prev sample. + cpu_cores: online_cores(), source, uptime_ms: workspace_started.elapsed().as_millis() as u64, } diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index a5c1e7e..51d130c 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -50,6 +50,10 @@ pub enum Request { exec: String, argv: Vec, envp: Vec<(String, String)>, + /// Si `true` y el comando muere con exit_status != 0, el reaper + /// lo relaunch con backoff exponencial. + #[serde(default)] + restart_on_failure: bool, }, /// Lanzar un Pipeline completo dentro de un workspace. @@ -103,6 +107,9 @@ pub enum Request { /// Resource accounting de un workspace. WorkspaceStats { workspace: shipote_card::WorkspaceId }, + /// Reporte de quotas (rlimits declarados vs uso actual). + WorkspaceQuota { workspace: shipote_card::WorkspaceId }, + /// Detener selectivamente los comandos de un pipeline (no el workspace /// entero). `grace_ms`: SIGTERM → wait → SIGKILL. PipelineStop { @@ -194,6 +201,10 @@ pub enum Response { info: WorkspaceStatsInfo, }, + WorkspaceQuota { + info: QuotaReportInfo, + }, + FlowList { items: Vec, }, @@ -208,6 +219,13 @@ pub enum Response { }, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QuotaReportInfo { + pub mem_limit: Option, + pub nproc_limit: Option, + pub breaches: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkspaceStatsInfo { pub commands_alive: u32, @@ -218,10 +236,16 @@ pub struct WorkspaceStatsInfo { pub cpu_usec: Option, #[serde(default)] pub cpu_percent: Option, + #[serde(default = "default_cpu_cores")] + pub cpu_cores: u32, pub source: String, pub uptime_ms: u64, } +fn default_cpu_cores() -> u32 { + 1 +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowInfo { pub pipeline: Ulid,