From 1cce50b290043180ddcef8a8f58efda4307b28ba Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 10:55:21 +0000 Subject: [PATCH] feat(shipote): collision detection + stats history server-side (fase O) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Flow socket names usan pipeline_id full (ULID 26 chars) + edge_idx. Cero colisiones entre pipelines (ULID es único global). Fallback con suffix -N si el path existe (cap 1000 retries). - WorkspaceState.stats_history (VecDeque cap 64) — workspace_stats appendea cada call. API workspace_stats_history(id, tail). Protocol WorkspaceStatsHistory. Shell pide history al primer probe → sparkline hidratada al boot, sobrevive restart del shell. 84 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 25, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shipote-daemon/src/main.rs | 25 ++++++++ crates/apps/shipote-shell/src/main.rs | 39 ++++++++++++- .../modules/shipote/shipote-core/src/lib.rs | 58 +++++++++++++++++++ .../shipote/shipote-core/src/pipeline.rs | 30 +++++++--- .../shipote/shipote-protocol/src/lib.rs | 12 ++++ 5 files changed, 154 insertions(+), 10 deletions(-) diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index bbb64c4..e6c6db7 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -465,6 +465,31 @@ async fn dispatch( }, }, + Request::WorkspaceStatsHistory { workspace, tail } => { + match mgr.workspace_stats_history(workspace, tail).await { + Some(samples) => { + let mapped: Vec = samples + .into_iter() + .map(|s| WorkspaceStatsInfo { + commands_alive: s.commands_alive, + commands_total: s.commands_total, + rss_bytes: s.rss_bytes, + rss_peak_bytes: s.rss_peak_bytes, + cpu_usec: s.cpu_usec, + cpu_percent: s.cpu_percent, + cpu_cores: s.cpu_cores, + source: s.source, + uptime_ms: s.uptime_ms, + }) + .collect(); + Response::WorkspaceStatsHistory { samples: mapped } + } + None => Response::Error { + message: format!("workspace {workspace} not found"), + }, + } + } + Request::WorkspaceFullSummary { workspace } => { let stats = match mgr.workspace_stats(workspace).await { Some(s) => WorkspaceStatsInfo { diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index 91b1e03..2802a19 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -82,7 +82,20 @@ impl Shell { me.saved_pipelines = snap.saved_pipelines; me.flows = snap.flows; me.quotas = snap.quotas; - // Append a la history por workspace. + // Hidratar history server-side para workspaces + // que no tenían history local (primer probe). + for ws in &me.workspaces { + let key = ws.id.to_string(); + if !me.stats_history.contains_key(&key) { + if let Some(hydrated) = snap.hydrate_history.get(&key) { + me.stats_history.insert( + key.clone(), + hydrated.iter().cloned().collect(), + ); + } + } + } + // Append fresh sample a la history por workspace. for (ws_id, fresh) in &snap.fresh_stats { let h = me .stats_history @@ -148,6 +161,9 @@ struct Snapshot { fresh_stats: std::collections::BTreeMap, /// Quota report fresco por workspace. quotas: std::collections::BTreeMap, + /// Workspaces nuevos (no en history local): hidratamos history + /// server-side al primer probe que los vea. + hydrate_history: std::collections::BTreeMap>, caps: CapsSummary, /// tail del log del comando más reciente (label + bytes). None si no hay. recent_log: Option<(String, String)>, @@ -175,9 +191,11 @@ fn probe_blocking(path: &std::path::Path) -> Result { }; // Batched: stats+quota+commands+flow_sockets en 1 roundtrip por ws. + // Para workspaces nuevos, también pedimos history server-side. let mut commands_map = std::collections::BTreeMap::new(); let mut fresh_stats = std::collections::BTreeMap::new(); let mut quotas = std::collections::BTreeMap::new(); + let mut hydrate_history = std::collections::BTreeMap::new(); for w in &workspaces { write_frame(&mut stream, &Request::WorkspaceFullSummary { workspace: w.id }) .await @@ -193,6 +211,24 @@ fn probe_blocking(path: &std::path::Path) -> Result { commands_map.insert(key, commands); } } + // History server-side (para hidratar si el shell es nuevo). + write_frame( + &mut stream, + &Request::WorkspaceStatsHistory { + workspace: w.id, + tail: 24, // mismo cap que STATS_HISTORY_LEN + }, + ) + .await + .map_err(|e| format!("write history: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read history: {e}"))?; + if let Response::WorkspaceStatsHistory { samples } = resp { + if !samples.is_empty() { + hydrate_history.insert(w.id.to_string(), samples); + } + } } // Saved pipelines. @@ -296,6 +332,7 @@ fn probe_blocking(path: &std::path::Path) -> Result { flows, fresh_stats, quotas, + hydrate_history, caps, recent_log, }) diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 15cc29c..1b21c6e 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -51,8 +51,14 @@ pub struct WorkspaceState { /// Última muestra de `(wall_instant, cpu_usec)` usada para calcular /// `cpu_percent` en la próxima medición. None hasta el primer measure. pub last_cpu_sample: Option<(Instant, u64)>, + /// Ring buffer de samples recientes para sparklines. Se popula cada + /// vez que `workspace_stats` se llama (típicamente desde el shell). + /// Cap 64 samples = ~2 minutos a 2s/sample. + pub stats_history: std::collections::VecDeque, } +const STATS_HISTORY_CAP: usize = 64; + #[derive(Debug, Clone)] pub struct CommandState { pub id: Ulid, @@ -562,9 +568,28 @@ impl WorkspaceManager { } ws.last_cpu_sample = Some((now, cpu_now)); } + // Append a history (ring buffer cap). + if ws.stats_history.len() >= STATS_HISTORY_CAP { + ws.stats_history.pop_front(); + } + ws.stats_history.push_back(s.clone()); Some(s) } + /// Retorna las últimas N samples de stats (servidas desde el ring + /// buffer interno). Sobrevive restart del shell. + pub async fn workspace_stats_history( + &self, + id: WorkspaceId, + tail: usize, + ) -> Option> { + let g = self.inner.lock().await; + let ws = g.workspaces.get(&id)?; + let take = if tail == 0 { ws.stats_history.len() } else { tail }; + let skip = ws.stats_history.len().saturating_sub(take); + Some(ws.stats_history.iter().skip(skip).cloned().collect()) + } + pub async fn create( self: &Arc, spec: WorkspaceSpec, @@ -604,6 +629,7 @@ impl WorkspaceManager { commands: HashMap::new(), started: Instant::now(), last_cpu_sample: None, + stats_history: std::collections::VecDeque::with_capacity(STATS_HISTORY_CAP), }; self.inner.lock().await.workspaces.insert(id, state); info!(%id, ?ttl, "workspace created"); @@ -1364,6 +1390,38 @@ mod tests { panic!("quota enforce kill never triggered"); } + #[tokio::test] + async fn workspace_stats_history_accumulates() { + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let spec = WorkspaceSpec { + label: "history".into(), + soma: Default::default(), + permissions: Default::default(), + ttl: None, + flow_dirs: vec![], + on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), + }; + let (id, _) = mgr.create(spec).await.unwrap(); + // Necesitamos al menos un comando vivo para que `measure` no + // retorne source=none (que igual se appendea, pero con stats vacíos). + let _ = mgr + .run(id, "/bin/sleep".into(), vec!["5".into()], vec![]) + .await + .unwrap(); + // Llamar stats 5 veces. + for _ in 0..5 { + let _ = mgr.workspace_stats(id).await; + } + let history = mgr.workspace_stats_history(id, 0).await.unwrap(); + assert_eq!(history.len(), 5, "history debería tener 5 samples"); + // tail=3 retorna los últimos 3. + let tail3 = mgr.workspace_stats_history(id, 3).await.unwrap(); + assert_eq!(tail3.len(), 3); + // Cleanup. + let _ = mgr.stop_with_grace(id, std::time::Duration::ZERO).await; + } + #[tokio::test] async fn run_true_in_workspace() { let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index e449647..8cf8954 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -197,20 +197,31 @@ pub async fn run_pipeline( for s in &splitter_specs { let mut senders_per_edge = Vec::with_capacity(s.edges.len()); let mut paths_per_edge = Vec::with_capacity(s.edges.len()); - for (i, em) in s.edges.iter().enumerate() { + for (i, _em) in s.edges.iter().enumerate() { if !s.tap { senders_per_edge.push(None); paths_per_edge.push(None); continue; } - let id = format!( - "{}-{}-{}-{}", - short_ulid(&pipeline_id), - em.from_label, - em.from_output, - i - ); - let socket = crate::flow_channel::default_flow_socket_path(&id); + // Socket name = pipeline_id full (26 chars ULID) + edge_idx. + // ULID es único globalmente → cero colisiones entre runs. + // Edge_idx desambigua múltiples sockets del mismo pipeline. + // No incluimos from_label en el name (puede tener chars que + // no van en paths Unix — los hints van en `EdgeDiscernment`). + let id = format!("{}-{}", pipeline_id, i); + let mut socket = crate::flow_channel::default_flow_socket_path(&id); + // Fallback: si el path existe (raro — daemon crashed sin + // cleanup), agregar suffix numérico hasta encontrar libre. + let mut suffix = 1u32; + while socket.exists() { + let alt = format!("{id}-{suffix}"); + socket = crate::flow_channel::default_flow_socket_path(&alt); + suffix += 1; + if suffix > 1000 { + warn!(orig = id, "flow socket collision: 1000 retries — using as-is"); + break; + } + } match crate::flow_channel::FlowChannel::with_replay_caps( socket.clone(), crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes), @@ -277,6 +288,7 @@ pub async fn run_pipeline( }) } +#[allow(dead_code)] fn short_ulid(u: &Ulid) -> String { let s = u.to_string(); s[s.len() - 6..].to_string() diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 64169d6..0eae4ab 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -110,6 +110,14 @@ pub enum Request { /// Reporte de quotas (rlimits declarados vs uso actual). WorkspaceQuota { workspace: shipote_card::WorkspaceId }, + /// History de samples del workspace (server-side). Sobrevive + /// restart del shell. `tail`: cantidad de samples desde el final + /// (0 = todo). + WorkspaceStatsHistory { + workspace: shipote_card::WorkspaceId, + tail: usize, + }, + /// Resumen completo de un workspace: stats + quota + commands + /// flow sockets en una sola roundtrip. Reduce N×4 requests del /// shell a N×1. @@ -210,6 +218,10 @@ pub enum Response { info: QuotaReportInfo, }, + WorkspaceStatsHistory { + samples: Vec, + }, + WorkspaceFullSummary { stats: WorkspaceStatsInfo, quota: QuotaReportInfo,