diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index 14f76f0..bbb64c4 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -64,14 +64,43 @@ async fn main() -> anyhow::Result<()> { })); // Restaurar snapshot previo si existe. Workspaces se recrean; los - // pids de comandos viejos NO se recuperan (kernel los mató). + // pids de comandos viejos NO se recuperan (kernel los mató). Los + // pipelines vivos (con supervisor) se relanzan desde cero. let snapshot_path = shipote_core::persist::default_snapshot_path(); - if let Err(e) = mgr.restore_snapshot(&snapshot_path).await { - warn!(?e, "restore_snapshot falló — start fresh"); + let restore = match mgr.restore_snapshot(&snapshot_path).await { + Ok(r) => r, + Err(e) => { + warn!(?e, "restore_snapshot falló — start fresh"); + shipote_core::persist::RestoreOutcome::default() + } + }; + // Relauncher de live_pipelines: como necesita inc+disc del daemon, + // lo hacemos acá tras el restore. Cada uno mismo flujo que un run + // normal — register_pipeline_commands + register_pipeline_supervisor. + for entry in restore.live_pipelines { + let inc = mgr.incarnator_handle(); + let disc = Arc::new(DiscernPipeline::default_pipeline()); + let workspace = entry.workspace; + let ws_label = mgr.workspace_label(workspace).await.unwrap_or_default(); + let tap = entry.tap; + let spec = entry.spec; + match shipote_core::pipeline::run_pipeline( + &spec, &ws_label, tap, disc, inc, Some(mgr.clone()), + ) + .await + { + Ok(launch) => { + mgr.register_pipeline_commands(workspace, launch.pipeline, launch.command_pids.clone()).await; + mgr.register_pipeline_supervisor(launch.pipeline, workspace, spec, tap).await; + info!(label = %launch.pipeline, "live pipeline relaunched from snapshot"); + } + Err(e) => warn!(?e, "live pipeline relaunch failed"), + } } - // Save-on-shutdown via SIGTERM/SIGINT handler. tokio::signal soporta - // ambos en Linux. + // Shutdown handler: SIGTERM/SIGINT → drain (stop_with_grace de todos + // los workspaces) → snapshot → exit. El drain usa grace=1s para dar + // chance a los comandos a terminar limpio antes del SIGKILL. { let mgr = mgr.clone(); let path = snapshot_path.clone(); @@ -80,13 +109,32 @@ async fn main() -> anyhow::Result<()> { .expect("SIGTERM handler"); let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) .expect("SIGINT handler"); - tokio::select! { - _ = term.recv() => info!("SIGTERM — saving snapshot"), - _ = int.recv() => info!("SIGINT — saving snapshot"), - } + let sig_name = tokio::select! { + _ = term.recv() => "SIGTERM", + _ = int.recv() => "SIGINT", + }; + info!(signal = sig_name, "shipote-daemon shutdown: draining workspaces"); + + // 1) Snapshot ANTES del drain — preserva intención declarada + // (los workspace specs siguen vivos en el snapshot aunque + // matemos los procesos hijos). if let Err(e) = mgr.save_snapshot(&path).await { warn!(?e, "save_snapshot falló"); } + + // 2) Drain: stop_with_grace de todos los workspaces vivos. + // Grace 1s da chance a apps Type=notify de hacer cleanup. + let workspaces = mgr.list().await; + let n = workspaces.len(); + for ws in workspaces { + if let Err(e) = mgr + .stop_with_grace(ws.id, std::time::Duration::from_millis(1000)) + .await + { + warn!(?e, %ws.id, "stop_with_grace falló en drain"); + } + } + info!(drained = n, "drain complete"); std::process::exit(0); }); } @@ -417,6 +465,52 @@ async fn dispatch( }, }, + Request::WorkspaceFullSummary { workspace } => { + let stats = match mgr.workspace_stats(workspace).await { + Some(s) => WorkspaceStatsInfo { + commands_alive: s.commands_alive, + commands_total: s.commands_total, + rss_bytes: s.rss_bytes, + rss_peak_bytes: s.rss_peak_bytes, + cpu_usec: s.cpu_usec, + cpu_percent: s.cpu_percent, + cpu_cores: s.cpu_cores, + source: s.source, + uptime_ms: s.uptime_ms, + }, + None => return Response::Error { message: format!("workspace {workspace} not found") }, + }; + let quota = match mgr.workspace_quota(workspace).await { + Some(q) => QuotaReportInfo { + mem_limit: q.mem_limit, + nproc_limit: q.nproc_limit, + breaches: q.breaches, + }, + None => QuotaReportInfo { mem_limit: None, nproc_limit: None, breaches: Vec::new() }, + }; + let commands = mgr + .list_commands(workspace) + .await + .into_iter() + .map(|c| ProtoCommandInfo { + id: c.id, + label: c.label, + pid: c.pid, + alive: c.alive, + exit_status: c.exit_status, + log_bytes: c.log_bytes, + }) + .collect(); + // Flow sockets de pipelines whose workspace == este. + let flow_sockets = mgr + .list_flow_pipelines() + .await + .into_iter() + .flat_map(|(_, sockets)| sockets) + .collect(); + Response::WorkspaceFullSummary { stats, quota, commands, flow_sockets } + } + Request::WorkspaceQuota { workspace } => match mgr.workspace_quota(workspace).await { Some(q) => Response::WorkspaceQuota { info: QuotaReportInfo { diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index 0bf35bc..91b1e03 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -174,42 +174,25 @@ fn probe_blocking(path: &std::path::Path) -> Result { other => return Err(format!("unexpected list resp: {other:?}")), }; - // Commands por workspace. + // Batched: stats+quota+commands+flow_sockets en 1 roundtrip por ws. let mut commands_map = std::collections::BTreeMap::new(); let mut fresh_stats = std::collections::BTreeMap::new(); let mut quotas = std::collections::BTreeMap::new(); for w in &workspaces { - write_frame(&mut stream, &Request::CommandList { workspace: w.id }) + write_frame(&mut stream, &Request::WorkspaceFullSummary { workspace: w.id }) .await - .map_err(|e| format!("write commands: {e}"))?; + .map_err(|e| format!("write summary: {e}"))?; let resp: Response = read_frame(&mut stream) .await - .map_err(|e| format!("read commands: {e}"))?; - if let Response::CommandList { items } = resp { - if !items.is_empty() { - commands_map.insert(w.id.to_string(), items); + .map_err(|e| format!("read summary: {e}"))?; + if let Response::WorkspaceFullSummary { stats, quota, commands, .. } = resp { + let key = w.id.to_string(); + fresh_stats.insert(key.clone(), stats); + quotas.insert(key.clone(), quota); + if !commands.is_empty() { + commands_map.insert(key, commands); } } - // Stats por workspace. - write_frame(&mut stream, &Request::WorkspaceStats { workspace: w.id }) - .await - .map_err(|e| format!("write stats: {e}"))?; - let resp: Response = read_frame(&mut stream) - .await - .map_err(|e| format!("read stats: {e}"))?; - if let Response::WorkspaceStats { info } = resp { - fresh_stats.insert(w.id.to_string(), info); - } - // Quota por workspace. - write_frame(&mut stream, &Request::WorkspaceQuota { workspace: w.id }) - .await - .map_err(|e| format!("write quota: {e}"))?; - let resp: Response = read_frame(&mut stream) - .await - .map_err(|e| format!("read quota: {e}"))?; - if let Response::WorkspaceQuota { info } = resp { - quotas.insert(w.id.to_string(), info); - } } // Saved pipelines. diff --git a/crates/modules/shipote/shipote-core/src/persist.rs b/crates/modules/shipote/shipote-core/src/persist.rs index 86eb14c..456f470 100644 --- a/crates/modules/shipote/shipote-core/src/persist.rs +++ b/crates/modules/shipote/shipote-core/src/persist.rs @@ -10,8 +10,10 @@ use shipote_card::{PipelineSpec, WorkspaceId, WorkspaceSpec}; use std::path::{Path, PathBuf}; use tracing::{info, warn}; -/// v2 agregó `saved_pipelines`. v1 lee con campo ausente como vacío. -pub const SNAPSHOT_VERSION: u16 = 2; +/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines` (pipelines +/// con supervisor vivo al momento del snapshot — el daemon los relanza +/// al restore). Versiones inferiores leen campos ausentes como vacío. +pub const SNAPSHOT_VERSION: u16 = 3; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShipoteSnapshot { @@ -20,6 +22,10 @@ pub struct ShipoteSnapshot { pub workspaces: Vec, #[serde(default)] pub saved_pipelines: Vec, + /// Pipelines vivos con supervisor (`restart_on_failure=true`) al + /// momento del snapshot. El daemon los relanza al restore. + #[serde(default)] + pub live_pipelines: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -34,6 +40,13 @@ pub struct PipelineEntry { pub spec: PipelineSpec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LivePipelineEntry { + pub workspace: WorkspaceId, + pub spec: PipelineSpec, + pub tap: bool, +} + impl ShipoteSnapshot { pub fn write(&self, path: &Path) -> anyhow::Result<()> { let bytes = serde_json::to_vec_pretty(self)?; @@ -102,11 +115,24 @@ impl WorkspaceManager { spec: spec.clone(), }) .collect(); + // Pipelines vivos con supervisor — preserva la intención. Los + // pids/sockets/discernments son ephemeral y se regeneran al + // restore (relaunch desde cero). + let live_pipelines = g + .pipeline_supervisors + .values() + .map(|sup| LivePipelineEntry { + workspace: sup.workspace, + spec: sup.spec.clone(), + tap: sup.tap, + }) + .collect(); ShipoteSnapshot { version: SNAPSHOT_VERSION, timestamp_ms: now_ms(), workspaces, saved_pipelines, + live_pipelines, } } @@ -118,34 +144,57 @@ impl WorkspaceManager { Ok(()) } - /// Carga snapshot desde disco y restaura los Workspaces. + /// Carga snapshot desde disco y restaura los Workspaces + saved + /// pipelines. Devuelve los `live_pipelines` para que el caller + /// (daemon) los relance — no podemos relanzarlos desde acá porque + /// `run_pipeline` necesita `Incarnator` + `DiscernPipeline`. /// Errores no-fatales (workspaces inválidos) se loguean y se saltan. - pub async fn restore_snapshot(self: &std::sync::Arc, path: &Path) -> anyhow::Result { + pub async fn restore_snapshot( + self: &std::sync::Arc, + path: &Path, + ) -> anyhow::Result { let snap = match ShipoteSnapshot::read(path) { Ok(s) => s, Err(e) => { warn!(?e, path = %path.display(), "no snapshot — start fresh"); - return Ok(0); + return Ok(RestoreOutcome::default()); } }; - let mut restored = 0usize; + let mut out = RestoreOutcome::default(); for entry in snap.workspaces { // v2+: reusamos el id original así clients que tracking // workspace_id no se rompen al restart. let label = entry.spec.label.clone(); match self.create_with_id(entry.id, entry.spec).await { - Ok(_) => restored += 1, + Ok(_) => out.workspaces_restored += 1, Err(e) => warn!(?e, %label, "skipped workspace en restore"), } } for entry in snap.saved_pipelines { self.save_pipeline(entry.name, entry.spec).await; + out.saved_pipelines_restored += 1; } - info!(restored, "snapshot restored"); - Ok(restored) + out.live_pipelines = snap.live_pipelines; + info!( + workspaces = out.workspaces_restored, + saved_pipelines = out.saved_pipelines_restored, + live_pipelines = out.live_pipelines.len(), + "snapshot restored" + ); + Ok(out) } } +/// Lo que el caller del restore obtiene. Las `live_pipelines` requieren +/// `Incarnator + DiscernPipeline` para relanzarlas → el caller las +/// procesa (típicamente el daemon). +#[derive(Debug, Default)] +pub struct RestoreOutcome { + pub workspaces_restored: usize, + pub saved_pipelines_restored: usize, + pub live_pipelines: Vec, +} + #[cfg(test)] mod tests { use super::*; @@ -177,8 +226,8 @@ mod tests { mgr1.save_snapshot(&path).await.unwrap(); let mgr2 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); - let n = mgr2.restore_snapshot(&path).await.unwrap(); - assert_eq!(n, 2); + let out = mgr2.restore_snapshot(&path).await.unwrap(); + assert_eq!(out.workspaces_restored, 2); let listed = mgr2.list().await; let restored_ids: std::collections::HashSet<_> = listed.iter().map(|s| s.id).collect(); assert!(restored_ids.contains(&id1)); diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 598c17e..64169d6 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -110,6 +110,11 @@ pub enum Request { /// Reporte de quotas (rlimits declarados vs uso actual). WorkspaceQuota { workspace: shipote_card::WorkspaceId }, + /// Resumen completo de un workspace: stats + quota + commands + + /// flow sockets en una sola roundtrip. Reduce N×4 requests del + /// shell a N×1. + WorkspaceFullSummary { workspace: shipote_card::WorkspaceId }, + /// Detener selectivamente los comandos de un pipeline (no el workspace /// entero). `grace_ms`: SIGTERM → wait → SIGKILL. PipelineStop { @@ -205,6 +210,13 @@ pub enum Response { info: QuotaReportInfo, }, + WorkspaceFullSummary { + stats: WorkspaceStatsInfo, + quota: QuotaReportInfo, + commands: Vec, + flow_sockets: Vec, + }, + FlowList { items: Vec, },