feat(shipote): multi-core CPU% + quota report + restart-on-failure (fase K)

- WorkspaceStats.cpu_cores via sysconf cacheado. CLI muestra
  `cpu_pct: 98.7 % (24.7% total / 4 cores)`.
- workspace_quota compara SomaSpec.rlimits contra accounting actual.
  Reporta breaches humanos. NO enforcement automático en v1.
- run_with_options(.., restart_on_failure): si exit != 0, reaper
  relaunch con backoff exponencial 200ms → 30s cap. Inner.restart_specs
  persiste el spec entre intentos.

81 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 22, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 01:32:39 +00:00
parent d8727a3038
commit 324a0c2d5d
5 changed files with 331 additions and 20 deletions
+41 -2
View File
@@ -36,6 +36,9 @@ enum Cmd {
/// ULID del workspace destino.
#[arg(short = 'w', long)]
workspace: String,
/// Si exit != 0, relanzar con backoff exponencial.
#[arg(long)]
restart_on_failure: bool,
/// Path del ejecutable.
exec: String,
/// Argumentos del comando.
@@ -164,6 +167,10 @@ enum WsCmd {
Stats {
id: String,
},
/// Quota report: rlimits declarados vs uso actual.
Quota {
id: String,
},
}
#[tokio::main]
@@ -251,7 +258,9 @@ async fn main() -> Result<()> {
.unwrap_or_else(|| "".into());
let cpu_pct = info
.cpu_percent
.map(|p| format!("{p:.1} %"))
.map(|p| format!("{p:.1} % ({:.1}% total / {} cores)",
if info.cpu_cores > 0 { p / info.cpu_cores as f32 } else { p },
info.cpu_cores))
.unwrap_or_else(|| "— (esperando 2do sample)".into());
println!("rss: {rss}");
println!("rss_peak: {peak}");
@@ -265,6 +274,35 @@ async fn main() -> Result<()> {
}
}
Cmd::Workspace(WsCmd::Quota { id }) => {
let id = parse_ws_id(&id)?;
let resp = round_trip(&mut stream, Request::WorkspaceQuota { workspace: id }).await?;
match resp {
Response::WorkspaceQuota { info } => {
let mem = info
.mem_limit
.map(|b| format!("{:.2} MiB", b as f64 / 1024.0 / 1024.0))
.unwrap_or_else(|| "".into());
let nproc = info
.nproc_limit
.map(|n| n.to_string())
.unwrap_or_else(|| "".into());
println!("mem_limit: {mem}");
println!("nproc_limit: {nproc}");
if info.breaches.is_empty() {
println!("breaches: (none — dentro de quota)");
} else {
println!("breaches:");
for b in info.breaches {
println!(" - {b}");
}
}
}
Response::Error { message } => return Err(anyhow!(message)),
other => print_unexpected(&other),
}
}
Cmd::Workspace(WsCmd::Stop { id, grace_ms }) => {
let id = parse_ws_id(&id)?;
let resp = round_trip(&mut stream, Request::WorkspaceStop { id, grace_ms }).await?;
@@ -277,7 +315,7 @@ async fn main() -> Result<()> {
}
}
Cmd::Run { workspace, exec, argv } => {
Cmd::Run { workspace, exec, argv, restart_on_failure } => {
let id = parse_ws_id(&workspace)?;
let resp = round_trip(
&mut stream,
@@ -286,6 +324,7 @@ async fn main() -> Result<()> {
exec,
argv,
envp: vec![],
restart_on_failure,
},
)
.await?;
+21 -3
View File
@@ -17,7 +17,8 @@ use shipote_core::WorkspaceManager;
use shipote_discern::{DiscernPipeline, Hint};
use shipote_protocol::{
default_socket_path, read_frame, write_frame, CommandInfo as ProtoCommandInfo,
EdgeDiscernmentInfo, FlowInfo, Request, Response, WorkspaceStatsInfo, WorkspaceSummary,
EdgeDiscernmentInfo, FlowInfo, QuotaReportInfo, Request, Response, WorkspaceStatsInfo,
WorkspaceSummary,
};
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
@@ -180,8 +181,11 @@ async fn dispatch(
}
}
Request::Run { workspace, exec, argv, envp } => {
match mgr.run(workspace, exec, argv, envp).await {
Request::Run { workspace, exec, argv, envp, restart_on_failure } => {
match mgr
.run_with_options(workspace, exec, argv, envp, restart_on_failure)
.await
{
Ok(s) => Response::RunStarted {
workspace,
command_id: s.id,
@@ -345,6 +349,7 @@ async fn dispatch(
rss_peak_bytes: s.rss_peak_bytes,
cpu_usec: s.cpu_usec,
cpu_percent: s.cpu_percent,
cpu_cores: s.cpu_cores,
source: s.source,
uptime_ms: s.uptime_ms,
},
@@ -354,6 +359,19 @@ async fn dispatch(
},
},
Request::WorkspaceQuota { workspace } => match mgr.workspace_quota(workspace).await {
Some(q) => Response::WorkspaceQuota {
info: QuotaReportInfo {
mem_limit: q.mem_limit,
nproc_limit: q.nproc_limit,
breaches: q.breaches,
},
},
None => Response::Error {
message: format!("workspace {workspace} not found"),
},
},
Request::FlowList => {
let items = mgr
.list_flow_pipelines()
+193 -1
View File
@@ -94,6 +94,24 @@ struct Inner {
/// el reaper los borra (futuro). v1: viven hasta `stop_pipeline_flows`
/// explícito o hasta shutdown.
pipeline_flows: HashMap<Ulid, Vec<crate::flow_channel::FlowChannel>>,
/// Specs de comandos `run()` con `restart_on_failure=true`. Indexed
/// por command_id. Cuando `reap_dead` detecta exit!=0, se relauncha
/// con la misma spec (nuevo pid y nuevo command_id se asigna por
/// el nuevo state pero el restart_spec sigue ligado al original).
restart_specs: HashMap<Ulid, RestartSpec>,
}
#[derive(Debug, Clone)]
struct RestartSpec {
workspace: WorkspaceId,
exec: String,
argv: Vec<String>,
envp: Vec<(String, String)>,
/// Backoff inicial (ms). Crece exponencialmente hasta max_backoff_ms.
backoff_ms: u64,
max_backoff_ms: u64,
/// Cantidad de restarts ya ejecutados (para tracking).
restart_count: u32,
}
#[derive(Debug, Clone)]
@@ -179,6 +197,7 @@ impl WorkspaceManager {
workspaces: HashMap::new(),
saved_pipelines: HashMap::new(),
pipeline_flows: HashMap::new(),
restart_specs: HashMap::new(),
})),
incarnator: Arc::new(Incarnator::new(cfg)),
}
@@ -353,6 +372,39 @@ impl WorkspaceManager {
.map(|w| w.spec.label.clone())
}
/// Compara accounting real (RSS, commands_alive) contra los rlimits
/// declarados en `SomaSpec`. Devuelve violaciones humanizadas. NO
/// hace enforcement automático.
pub async fn workspace_quota(&self, id: WorkspaceId) -> Option<stats::QuotaReport> {
let stats_now = self.workspace_stats(id).await?;
let g = self.inner.lock().await;
let ws = g.workspaces.get(&id)?;
let rl = &ws.spec.soma.rlimits;
let mut report = stats::QuotaReport {
mem_limit: rl.mem_bytes,
nproc_limit: rl.nproc,
breaches: Vec::new(),
};
if let (Some(limit), Some(used)) = (rl.mem_bytes, stats_now.rss_bytes) {
if used > limit {
report.breaches.push(format!(
"memory: {:.2} MiB > {:.2} MiB limit",
used as f64 / 1024.0 / 1024.0,
limit as f64 / 1024.0 / 1024.0,
));
}
}
if let Some(limit) = rl.nproc {
if stats_now.commands_alive > limit {
report.breaches.push(format!(
"nproc: {} alive > {} limit",
stats_now.commands_alive, limit
));
}
}
Some(report)
}
/// Estadísticas de recursos del workspace: RSS + CPU agregado de sus
/// comandos vivos. Lee `/proc/<pid>/` directamente; si el spec declara
/// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye
@@ -531,6 +583,20 @@ impl WorkspaceManager {
exec: String,
argv: Vec<String>,
envp: Vec<(String, String)>,
) -> Result<CommandSummary, CoreError> {
self.run_with_options(id, exec, argv, envp, false).await
}
/// Variante con `restart_on_failure`: si el comando muere con
/// exit_status != 0, el reaper lo relauncha con backoff exponencial
/// (200ms → 400 → 800 → … cap 30s).
pub async fn run_with_options(
&self,
id: WorkspaceId,
exec: String,
argv: Vec<String>,
envp: Vec<(String, String)>,
restart_on_failure: bool,
) -> Result<CommandSummary, CoreError> {
let workspace_label = {
let g = self.inner.lock().await;
@@ -593,6 +659,23 @@ impl WorkspaceManager {
},
);
}
if restart_on_failure {
// Reextract exec/argv/envp del payload del CommandRef.
if let Payload::Native { exec, argv, envp } = &cmd_ref.payload {
g.restart_specs.insert(
cmd_id,
RestartSpec {
workspace: id,
exec: exec.clone(),
argv: argv.clone(),
envp: envp.clone(),
backoff_ms: 200,
max_backoff_ms: 30_000,
restart_count: 0,
},
);
}
}
for d in &out.degradations {
warn!(?d, %id, "command incarnation degradation");
}
@@ -693,7 +776,9 @@ impl WorkspaceManager {
/// Cosecha hijos terminados (no-bloqueante). Llamar periódicamente desde
/// el daemon o ante SIGCHLD. Marca `alive=false` y guarda exit_status.
pub async fn reap_dead(&self) {
pub async fn reap_dead(self: &Arc<Self>) {
let mut to_restart: Vec<RestartSpec> = Vec::new();
{
let mut g = self.inner.lock().await;
for ws in g.workspaces.values_mut() {
for cmd in ws.commands.values_mut() {
@@ -713,6 +798,78 @@ impl WorkspaceManager {
}
}
}
// Detectar restart_specs cuyo command_id ya está dead con exit!=0.
let mut to_remove: Vec<Ulid> = Vec::new();
for (cmd_id, spec) in g.restart_specs.iter() {
let mut should_restart = false;
let mut should_drop = false;
'outer: for ws in g.workspaces.values() {
if let Some(cmd) = ws.commands.get(cmd_id) {
if !cmd.alive {
match cmd.exit_status {
Some(0) => should_drop = true,
Some(_) => should_restart = true,
None => {}
}
break 'outer;
}
}
}
if should_drop {
to_remove.push(*cmd_id);
} else if should_restart {
to_restart.push(spec.clone());
to_remove.push(*cmd_id);
}
}
for id in to_remove {
g.restart_specs.remove(&id);
}
}
// Schedule restart fuera del lock.
for mut spec in to_restart {
let mgr = self.clone();
let backoff = std::time::Duration::from_millis(spec.backoff_ms);
// Subir el backoff para la PRÓXIMA falla, no esta.
spec.backoff_ms = (spec.backoff_ms * 2).min(spec.max_backoff_ms);
spec.restart_count += 1;
let restart_n = spec.restart_count;
tokio::spawn(async move {
tokio::time::sleep(backoff).await;
info!(
backoff_ms = backoff.as_millis() as u64,
restart = restart_n,
"restarting failed command"
);
let workspace = spec.workspace;
if let Err(e) = mgr
.run_with_options(workspace, spec.exec.clone(), spec.argv.clone(), spec.envp.clone(), true)
.await
{
warn!(?e, "restart failed to launch");
return;
}
// Preservar backoff acumulado: localizar el nuevo command_id
// (el más reciente vivo en el workspace) y sobreescribir.
let new_cmd_id = {
let g = mgr.inner.lock().await;
g.workspaces.get(&workspace).and_then(|ws| {
ws.commands
.values()
.filter(|c| c.alive)
.max_by_key(|c| c.id)
.map(|c| c.id)
})
};
if let Some(new_id) = new_cmd_id {
let mut g = mgr.inner.lock().await;
if let Some(existing) = g.restart_specs.get_mut(&new_id) {
existing.backoff_ms = spec.backoff_ms;
existing.restart_count = spec.restart_count;
}
}
});
}
}
}
@@ -850,6 +1007,41 @@ mod tests {
panic!("logs never captured on both streams");
}
#[tokio::test]
async fn restart_on_failure_relaunches_failing_command() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let spec = WorkspaceSpec {
label: "restart".into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
};
let (id, _) = mgr.create(spec).await.unwrap();
// /bin/false sale con exit=1. Con restart_on_failure=true debería
// relanzarse al menos 1 vez (tras el backoff inicial de 200ms).
let summary = mgr
.run_with_options(id, "/bin/false".into(), vec![], vec![], true)
.await
.unwrap();
let original_id = summary.id;
// Esperamos ~500ms para que termine + reap + restart corra.
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
mgr.reap_dead().await;
let g = mgr.inner.lock().await;
if let Some(ws) = g.workspaces.get(&id) {
let new_cmds: Vec<_> = ws.commands.keys().filter(|k| **k != original_id).collect();
if !new_cmds.is_empty() {
// Hay un nuevo command_id → restart funcionó.
return;
}
}
}
panic!("restart never launched a new command");
}
#[tokio::test]
async fn run_true_in_workspace() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
@@ -25,13 +25,50 @@ pub struct WorkspaceStats {
pub cpu_usec: Option<u64>,
/// %CPU instantáneo derivado entre dos samples consecutivos. `None`
/// en el primer sample (no hay baseline). `100.0` = 1 core saturado.
/// `400.0` con 4 cores activos = la máquina al 100%.
pub cpu_percent: Option<f32>,
/// Cores online detectados (sysconf `_SC_NPROCESSORS_ONLN`). Útil
/// para normalizar `cpu_percent / cpu_cores` → 0..100 absoluto.
pub cpu_cores: u32,
/// Fuente del dato: "proc" | "cgroup" | "mixed".
pub source: String,
/// Wall-clock uptime del workspace en milisegundos.
pub uptime_ms: u64,
}
impl WorkspaceStats {
/// CPU% normalizado al 100% total de la máquina (no por core).
/// Útil para comparar workspaces independiente del paralelismo.
pub fn cpu_percent_total(&self) -> Option<f32> {
self.cpu_percent
.map(|p| if self.cpu_cores == 0 { p } else { p / self.cpu_cores as f32 })
}
}
/// Reporte de quotas: comparación entre el accounting real y los
/// `rlimits` declarados en `SomaSpec`. NO hace enforcement automático
/// en v1 — sólo accounting + reporting. El caller decide qué hacer.
#[derive(Debug, Clone, Default)]
pub struct QuotaReport {
/// Límite de memoria declarado (bytes). None = sin límite.
pub mem_limit: Option<u64>,
/// Límite de procesos declarado.
pub nproc_limit: Option<u32>,
/// Lista de violaciones detectadas (strings humano-legibles).
/// Empty = todo dentro de quota.
pub breaches: Vec<String>,
}
/// Detecta cores online runtime. Cacheado vía OnceLock — el valor no
/// cambia salvo hotplug, que es raro y aceptamos sample stale.
fn online_cores() -> u32 {
static CACHED: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
*CACHED.get_or_init(|| {
let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
if n > 0 { n as u32 } else { 1 }
})
}
/// Mide stats para un set de PIDs vivos + un path de cgroup opcional.
pub fn measure(
alive_pids: &[i32],
@@ -71,6 +108,7 @@ pub fn measure(
rss_peak_bytes: rss_peak,
cpu_usec: cpu,
cpu_percent: None, // El caller lo rellena con el diff vs prev sample.
cpu_cores: online_cores(),
source,
uptime_ms: workspace_started.elapsed().as_millis() as u64,
}
@@ -50,6 +50,10 @@ pub enum Request {
exec: String,
argv: Vec<String>,
envp: Vec<(String, String)>,
/// Si `true` y el comando muere con exit_status != 0, el reaper
/// lo relaunch con backoff exponencial.
#[serde(default)]
restart_on_failure: bool,
},
/// Lanzar un Pipeline completo dentro de un workspace.
@@ -103,6 +107,9 @@ pub enum Request {
/// Resource accounting de un workspace.
WorkspaceStats { workspace: shipote_card::WorkspaceId },
/// Reporte de quotas (rlimits declarados vs uso actual).
WorkspaceQuota { workspace: shipote_card::WorkspaceId },
/// Detener selectivamente los comandos de un pipeline (no el workspace
/// entero). `grace_ms`: SIGTERM → wait → SIGKILL.
PipelineStop {
@@ -194,6 +201,10 @@ pub enum Response {
info: WorkspaceStatsInfo,
},
WorkspaceQuota {
info: QuotaReportInfo,
},
FlowList {
items: Vec<FlowInfo>,
},
@@ -208,6 +219,13 @@ pub enum Response {
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuotaReportInfo {
pub mem_limit: Option<u64>,
pub nproc_limit: Option<u32>,
pub breaches: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceStatsInfo {
pub commands_alive: u32,
@@ -218,10 +236,16 @@ pub struct WorkspaceStatsInfo {
pub cpu_usec: Option<u64>,
#[serde(default)]
pub cpu_percent: Option<f32>,
#[serde(default = "default_cpu_cores")]
pub cpu_cores: u32,
pub source: String,
pub uptime_ms: u64,
}
fn default_cpu_cores() -> u32 {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowInfo {
pub pipeline: Ulid,