feat(shipote): drain shutdown + persist live pipelines + batched query (fase N)

- Daemon SIGTERM/SIGINT: snapshot ANTES, stop_with_grace(1s) de todos
  los workspaces DESPUÉS. Grace permite app-level cleanup.
- Snapshot v3 con live_pipelines: pipeline_supervisors se persisten;
  daemon relanza al restore con sus recursos (Incarnator+DiscernPipeline).
  RestoreOutcome separado para que core no necesite incarnator.
  Forward-compat v1/v2 via #[serde(default)].
- WorkspaceFullSummary: stats+quota+commands+flow_sockets en 1 roundtrip.
  Shell reduce N×4 requests/probe a N×1 + 4 globales.

83 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 24, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 10:48:11 +00:00
parent c3f9c9e36a
commit a823c40fe1
4 changed files with 185 additions and 47 deletions
+103 -9
View File
@@ -64,14 +64,43 @@ async fn main() -> anyhow::Result<()> {
})); }));
// Restaurar snapshot previo si existe. Workspaces se recrean; los // Restaurar snapshot previo si existe. Workspaces se recrean; los
// pids de comandos viejos NO se recuperan (kernel los mató). // pids de comandos viejos NO se recuperan (kernel los mató). Los
// pipelines vivos (con supervisor) se relanzan desde cero.
let snapshot_path = shipote_core::persist::default_snapshot_path(); let snapshot_path = shipote_core::persist::default_snapshot_path();
if let Err(e) = mgr.restore_snapshot(&snapshot_path).await { let restore = match mgr.restore_snapshot(&snapshot_path).await {
warn!(?e, "restore_snapshot falló — start fresh"); Ok(r) => r,
Err(e) => {
warn!(?e, "restore_snapshot falló — start fresh");
shipote_core::persist::RestoreOutcome::default()
}
};
// Relauncher de live_pipelines: como necesita inc+disc del daemon,
// lo hacemos acá tras el restore. Cada uno mismo flujo que un run
// normal — register_pipeline_commands + register_pipeline_supervisor.
for entry in restore.live_pipelines {
let inc = mgr.incarnator_handle();
let disc = Arc::new(DiscernPipeline::default_pipeline());
let workspace = entry.workspace;
let ws_label = mgr.workspace_label(workspace).await.unwrap_or_default();
let tap = entry.tap;
let spec = entry.spec;
match shipote_core::pipeline::run_pipeline(
&spec, &ws_label, tap, disc, inc, Some(mgr.clone()),
)
.await
{
Ok(launch) => {
mgr.register_pipeline_commands(workspace, launch.pipeline, launch.command_pids.clone()).await;
mgr.register_pipeline_supervisor(launch.pipeline, workspace, spec, tap).await;
info!(label = %launch.pipeline, "live pipeline relaunched from snapshot");
}
Err(e) => warn!(?e, "live pipeline relaunch failed"),
}
} }
// Save-on-shutdown via SIGTERM/SIGINT handler. tokio::signal soporta // Shutdown handler: SIGTERM/SIGINT → drain (stop_with_grace de todos
// ambos en Linux. // los workspaces) → snapshot → exit. El drain usa grace=1s para dar
// chance a los comandos a terminar limpio antes del SIGKILL.
{ {
let mgr = mgr.clone(); let mgr = mgr.clone();
let path = snapshot_path.clone(); let path = snapshot_path.clone();
@@ -80,13 +109,32 @@ async fn main() -> anyhow::Result<()> {
.expect("SIGTERM handler"); .expect("SIGTERM handler");
let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) let mut int = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("SIGINT handler"); .expect("SIGINT handler");
tokio::select! { let sig_name = tokio::select! {
_ = term.recv() => info!("SIGTERM — saving snapshot"), _ = term.recv() => "SIGTERM",
_ = int.recv() => info!("SIGINT — saving snapshot"), _ = int.recv() => "SIGINT",
} };
info!(signal = sig_name, "shipote-daemon shutdown: draining workspaces");
// 1) Snapshot ANTES del drain — preserva intención declarada
// (los workspace specs siguen vivos en el snapshot aunque
// matemos los procesos hijos).
if let Err(e) = mgr.save_snapshot(&path).await { if let Err(e) = mgr.save_snapshot(&path).await {
warn!(?e, "save_snapshot falló"); warn!(?e, "save_snapshot falló");
} }
// 2) Drain: stop_with_grace de todos los workspaces vivos.
// Grace 1s da chance a apps Type=notify de hacer cleanup.
let workspaces = mgr.list().await;
let n = workspaces.len();
for ws in workspaces {
if let Err(e) = mgr
.stop_with_grace(ws.id, std::time::Duration::from_millis(1000))
.await
{
warn!(?e, %ws.id, "stop_with_grace falló en drain");
}
}
info!(drained = n, "drain complete");
std::process::exit(0); std::process::exit(0);
}); });
} }
@@ -417,6 +465,52 @@ async fn dispatch(
}, },
}, },
Request::WorkspaceFullSummary { workspace } => {
let stats = match mgr.workspace_stats(workspace).await {
Some(s) => WorkspaceStatsInfo {
commands_alive: s.commands_alive,
commands_total: s.commands_total,
rss_bytes: s.rss_bytes,
rss_peak_bytes: s.rss_peak_bytes,
cpu_usec: s.cpu_usec,
cpu_percent: s.cpu_percent,
cpu_cores: s.cpu_cores,
source: s.source,
uptime_ms: s.uptime_ms,
},
None => return Response::Error { message: format!("workspace {workspace} not found") },
};
let quota = match mgr.workspace_quota(workspace).await {
Some(q) => QuotaReportInfo {
mem_limit: q.mem_limit,
nproc_limit: q.nproc_limit,
breaches: q.breaches,
},
None => QuotaReportInfo { mem_limit: None, nproc_limit: None, breaches: Vec::new() },
};
let commands = mgr
.list_commands(workspace)
.await
.into_iter()
.map(|c| ProtoCommandInfo {
id: c.id,
label: c.label,
pid: c.pid,
alive: c.alive,
exit_status: c.exit_status,
log_bytes: c.log_bytes,
})
.collect();
// Flow sockets de pipelines whose workspace == este.
let flow_sockets = mgr
.list_flow_pipelines()
.await
.into_iter()
.flat_map(|(_, sockets)| sockets)
.collect();
Response::WorkspaceFullSummary { stats, quota, commands, flow_sockets }
}
Request::WorkspaceQuota { workspace } => match mgr.workspace_quota(workspace).await { Request::WorkspaceQuota { workspace } => match mgr.workspace_quota(workspace).await {
Some(q) => Response::WorkspaceQuota { Some(q) => Response::WorkspaceQuota {
info: QuotaReportInfo { info: QuotaReportInfo {
+10 -27
View File
@@ -174,42 +174,25 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
other => return Err(format!("unexpected list resp: {other:?}")), other => return Err(format!("unexpected list resp: {other:?}")),
}; };
// Commands por workspace. // Batched: stats+quota+commands+flow_sockets en 1 roundtrip por ws.
let mut commands_map = std::collections::BTreeMap::new(); let mut commands_map = std::collections::BTreeMap::new();
let mut fresh_stats = std::collections::BTreeMap::new(); let mut fresh_stats = std::collections::BTreeMap::new();
let mut quotas = std::collections::BTreeMap::new(); let mut quotas = std::collections::BTreeMap::new();
for w in &workspaces { for w in &workspaces {
write_frame(&mut stream, &Request::CommandList { workspace: w.id }) write_frame(&mut stream, &Request::WorkspaceFullSummary { workspace: w.id })
.await .await
.map_err(|e| format!("write commands: {e}"))?; .map_err(|e| format!("write summary: {e}"))?;
let resp: Response = read_frame(&mut stream) let resp: Response = read_frame(&mut stream)
.await .await
.map_err(|e| format!("read commands: {e}"))?; .map_err(|e| format!("read summary: {e}"))?;
if let Response::CommandList { items } = resp { if let Response::WorkspaceFullSummary { stats, quota, commands, .. } = resp {
if !items.is_empty() { let key = w.id.to_string();
commands_map.insert(w.id.to_string(), items); fresh_stats.insert(key.clone(), stats);
quotas.insert(key.clone(), quota);
if !commands.is_empty() {
commands_map.insert(key, commands);
} }
} }
// Stats por workspace.
write_frame(&mut stream, &Request::WorkspaceStats { workspace: w.id })
.await
.map_err(|e| format!("write stats: {e}"))?;
let resp: Response = read_frame(&mut stream)
.await
.map_err(|e| format!("read stats: {e}"))?;
if let Response::WorkspaceStats { info } = resp {
fresh_stats.insert(w.id.to_string(), info);
}
// Quota por workspace.
write_frame(&mut stream, &Request::WorkspaceQuota { workspace: w.id })
.await
.map_err(|e| format!("write quota: {e}"))?;
let resp: Response = read_frame(&mut stream)
.await
.map_err(|e| format!("read quota: {e}"))?;
if let Response::WorkspaceQuota { info } = resp {
quotas.insert(w.id.to_string(), info);
}
} }
// Saved pipelines. // Saved pipelines.
@@ -10,8 +10,10 @@ use shipote_card::{PipelineSpec, WorkspaceId, WorkspaceSpec};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tracing::{info, warn}; use tracing::{info, warn};
/// v2 agregó `saved_pipelines`. v1 lee con campo ausente como vacío. /// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines` (pipelines
pub const SNAPSHOT_VERSION: u16 = 2; /// con supervisor vivo al momento del snapshot — el daemon los relanza
/// al restore). Versiones inferiores leen campos ausentes como vacío.
pub const SNAPSHOT_VERSION: u16 = 3;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShipoteSnapshot { pub struct ShipoteSnapshot {
@@ -20,6 +22,10 @@ pub struct ShipoteSnapshot {
pub workspaces: Vec<WorkspaceEntry>, pub workspaces: Vec<WorkspaceEntry>,
#[serde(default)] #[serde(default)]
pub saved_pipelines: Vec<PipelineEntry>, pub saved_pipelines: Vec<PipelineEntry>,
/// Pipelines vivos con supervisor (`restart_on_failure=true`) al
/// momento del snapshot. El daemon los relanza al restore.
#[serde(default)]
pub live_pipelines: Vec<LivePipelineEntry>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -34,6 +40,13 @@ pub struct PipelineEntry {
pub spec: PipelineSpec, pub spec: PipelineSpec,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LivePipelineEntry {
pub workspace: WorkspaceId,
pub spec: PipelineSpec,
pub tap: bool,
}
impl ShipoteSnapshot { impl ShipoteSnapshot {
pub fn write(&self, path: &Path) -> anyhow::Result<()> { pub fn write(&self, path: &Path) -> anyhow::Result<()> {
let bytes = serde_json::to_vec_pretty(self)?; let bytes = serde_json::to_vec_pretty(self)?;
@@ -102,11 +115,24 @@ impl WorkspaceManager {
spec: spec.clone(), spec: spec.clone(),
}) })
.collect(); .collect();
// Pipelines vivos con supervisor — preserva la intención. Los
// pids/sockets/discernments son ephemeral y se regeneran al
// restore (relaunch desde cero).
let live_pipelines = g
.pipeline_supervisors
.values()
.map(|sup| LivePipelineEntry {
workspace: sup.workspace,
spec: sup.spec.clone(),
tap: sup.tap,
})
.collect();
ShipoteSnapshot { ShipoteSnapshot {
version: SNAPSHOT_VERSION, version: SNAPSHOT_VERSION,
timestamp_ms: now_ms(), timestamp_ms: now_ms(),
workspaces, workspaces,
saved_pipelines, saved_pipelines,
live_pipelines,
} }
} }
@@ -118,34 +144,57 @@ impl WorkspaceManager {
Ok(()) Ok(())
} }
/// Carga snapshot desde disco y restaura los Workspaces. /// Carga snapshot desde disco y restaura los Workspaces + saved
/// pipelines. Devuelve los `live_pipelines` para que el caller
/// (daemon) los relance — no podemos relanzarlos desde acá porque
/// `run_pipeline` necesita `Incarnator` + `DiscernPipeline`.
/// Errores no-fatales (workspaces inválidos) se loguean y se saltan. /// Errores no-fatales (workspaces inválidos) se loguean y se saltan.
pub async fn restore_snapshot(self: &std::sync::Arc<Self>, path: &Path) -> anyhow::Result<usize> { pub async fn restore_snapshot(
self: &std::sync::Arc<Self>,
path: &Path,
) -> anyhow::Result<RestoreOutcome> {
let snap = match ShipoteSnapshot::read(path) { let snap = match ShipoteSnapshot::read(path) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
warn!(?e, path = %path.display(), "no snapshot — start fresh"); warn!(?e, path = %path.display(), "no snapshot — start fresh");
return Ok(0); return Ok(RestoreOutcome::default());
} }
}; };
let mut restored = 0usize; let mut out = RestoreOutcome::default();
for entry in snap.workspaces { for entry in snap.workspaces {
// v2+: reusamos el id original así clients que tracking // v2+: reusamos el id original así clients que tracking
// workspace_id no se rompen al restart. // workspace_id no se rompen al restart.
let label = entry.spec.label.clone(); let label = entry.spec.label.clone();
match self.create_with_id(entry.id, entry.spec).await { match self.create_with_id(entry.id, entry.spec).await {
Ok(_) => restored += 1, Ok(_) => out.workspaces_restored += 1,
Err(e) => warn!(?e, %label, "skipped workspace en restore"), Err(e) => warn!(?e, %label, "skipped workspace en restore"),
} }
} }
for entry in snap.saved_pipelines { for entry in snap.saved_pipelines {
self.save_pipeline(entry.name, entry.spec).await; self.save_pipeline(entry.name, entry.spec).await;
out.saved_pipelines_restored += 1;
} }
info!(restored, "snapshot restored"); out.live_pipelines = snap.live_pipelines;
Ok(restored) info!(
workspaces = out.workspaces_restored,
saved_pipelines = out.saved_pipelines_restored,
live_pipelines = out.live_pipelines.len(),
"snapshot restored"
);
Ok(out)
} }
} }
/// Lo que el caller del restore obtiene. Las `live_pipelines` requieren
/// `Incarnator + DiscernPipeline` para relanzarlas → el caller las
/// procesa (típicamente el daemon).
#[derive(Debug, Default)]
pub struct RestoreOutcome {
pub workspaces_restored: usize,
pub saved_pipelines_restored: usize,
pub live_pipelines: Vec<LivePipelineEntry>,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -177,8 +226,8 @@ mod tests {
mgr1.save_snapshot(&path).await.unwrap(); mgr1.save_snapshot(&path).await.unwrap();
let mgr2 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); let mgr2 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let n = mgr2.restore_snapshot(&path).await.unwrap(); let out = mgr2.restore_snapshot(&path).await.unwrap();
assert_eq!(n, 2); assert_eq!(out.workspaces_restored, 2);
let listed = mgr2.list().await; let listed = mgr2.list().await;
let restored_ids: std::collections::HashSet<_> = listed.iter().map(|s| s.id).collect(); let restored_ids: std::collections::HashSet<_> = listed.iter().map(|s| s.id).collect();
assert!(restored_ids.contains(&id1)); assert!(restored_ids.contains(&id1));
@@ -110,6 +110,11 @@ pub enum Request {
/// Reporte de quotas (rlimits declarados vs uso actual). /// Reporte de quotas (rlimits declarados vs uso actual).
WorkspaceQuota { workspace: shipote_card::WorkspaceId }, WorkspaceQuota { workspace: shipote_card::WorkspaceId },
/// Resumen completo de un workspace: stats + quota + commands +
/// flow sockets en una sola roundtrip. Reduce N×4 requests del
/// shell a N×1.
WorkspaceFullSummary { workspace: shipote_card::WorkspaceId },
/// Detener selectivamente los comandos de un pipeline (no el workspace /// Detener selectivamente los comandos de un pipeline (no el workspace
/// entero). `grace_ms`: SIGTERM → wait → SIGKILL. /// entero). `grace_ms`: SIGTERM → wait → SIGKILL.
PipelineStop { PipelineStop {
@@ -205,6 +210,13 @@ pub enum Response {
info: QuotaReportInfo, info: QuotaReportInfo,
}, },
WorkspaceFullSummary {
stats: WorkspaceStatsInfo,
quota: QuotaReportInfo,
commands: Vec<CommandInfo>,
flow_sockets: Vec<PathBuf>,
},
FlowList { FlowList {
items: Vec<FlowInfo>, items: Vec<FlowInfo>,
}, },