feat(shipote): collision detection + stats history server-side (fase O)

- Flow socket names usan pipeline_id full (ULID 26 chars) + edge_idx.
  Cero colisiones entre pipelines (ULID es único global). Fallback con
  suffix -N si el path existe (cap 1000 retries).
- WorkspaceState.stats_history (VecDeque cap 64) — workspace_stats
  appendea cada call. API workspace_stats_history(id, tail). Protocol
  WorkspaceStatsHistory. Shell pide history al primer probe → sparkline
  hidratada al boot, sobrevive restart del shell.

84 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 25, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 10:55:21 +00:00
parent a823c40fe1
commit 1cce50b290
5 changed files with 154 additions and 10 deletions
+25
View File
@@ -465,6 +465,31 @@ async fn dispatch(
}, },
}, },
Request::WorkspaceStatsHistory { workspace, tail } => {
match mgr.workspace_stats_history(workspace, tail).await {
Some(samples) => {
let mapped: Vec<WorkspaceStatsInfo> = samples
.into_iter()
.map(|s| WorkspaceStatsInfo {
commands_alive: s.commands_alive,
commands_total: s.commands_total,
rss_bytes: s.rss_bytes,
rss_peak_bytes: s.rss_peak_bytes,
cpu_usec: s.cpu_usec,
cpu_percent: s.cpu_percent,
cpu_cores: s.cpu_cores,
source: s.source,
uptime_ms: s.uptime_ms,
})
.collect();
Response::WorkspaceStatsHistory { samples: mapped }
}
None => Response::Error {
message: format!("workspace {workspace} not found"),
},
}
}
Request::WorkspaceFullSummary { workspace } => { Request::WorkspaceFullSummary { workspace } => {
let stats = match mgr.workspace_stats(workspace).await { let stats = match mgr.workspace_stats(workspace).await {
Some(s) => WorkspaceStatsInfo { Some(s) => WorkspaceStatsInfo {
+38 -1
View File
@@ -82,7 +82,20 @@ impl Shell {
me.saved_pipelines = snap.saved_pipelines; me.saved_pipelines = snap.saved_pipelines;
me.flows = snap.flows; me.flows = snap.flows;
me.quotas = snap.quotas; me.quotas = snap.quotas;
// Append a la history por workspace. // Hidratar history server-side para workspaces
// que no tenían history local (primer probe).
for ws in &me.workspaces {
let key = ws.id.to_string();
if !me.stats_history.contains_key(&key) {
if let Some(hydrated) = snap.hydrate_history.get(&key) {
me.stats_history.insert(
key.clone(),
hydrated.iter().cloned().collect(),
);
}
}
}
// Append fresh sample a la history por workspace.
for (ws_id, fresh) in &snap.fresh_stats { for (ws_id, fresh) in &snap.fresh_stats {
let h = me let h = me
.stats_history .stats_history
@@ -148,6 +161,9 @@ struct Snapshot {
fresh_stats: std::collections::BTreeMap<String, WorkspaceStatsInfo>, fresh_stats: std::collections::BTreeMap<String, WorkspaceStatsInfo>,
/// Quota report fresco por workspace. /// Quota report fresco por workspace.
quotas: std::collections::BTreeMap<String, QuotaReportInfo>, quotas: std::collections::BTreeMap<String, QuotaReportInfo>,
/// Workspaces nuevos (no en history local): hidratamos history
/// server-side al primer probe que los vea.
hydrate_history: std::collections::BTreeMap<String, Vec<WorkspaceStatsInfo>>,
caps: CapsSummary, caps: CapsSummary,
/// tail del log del comando más reciente (label + bytes). None si no hay. /// tail del log del comando más reciente (label + bytes). None si no hay.
recent_log: Option<(String, String)>, recent_log: Option<(String, String)>,
@@ -175,9 +191,11 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
}; };
// Batched: stats+quota+commands+flow_sockets en 1 roundtrip por ws. // Batched: stats+quota+commands+flow_sockets en 1 roundtrip por ws.
// Para workspaces nuevos, también pedimos history server-side.
let mut commands_map = std::collections::BTreeMap::new(); let mut commands_map = std::collections::BTreeMap::new();
let mut fresh_stats = std::collections::BTreeMap::new(); let mut fresh_stats = std::collections::BTreeMap::new();
let mut quotas = std::collections::BTreeMap::new(); let mut quotas = std::collections::BTreeMap::new();
let mut hydrate_history = std::collections::BTreeMap::new();
for w in &workspaces { for w in &workspaces {
write_frame(&mut stream, &Request::WorkspaceFullSummary { workspace: w.id }) write_frame(&mut stream, &Request::WorkspaceFullSummary { workspace: w.id })
.await .await
@@ -193,6 +211,24 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
commands_map.insert(key, commands); commands_map.insert(key, commands);
} }
} }
// History server-side (para hidratar si el shell es nuevo).
write_frame(
&mut stream,
&Request::WorkspaceStatsHistory {
workspace: w.id,
tail: 24, // mismo cap que STATS_HISTORY_LEN
},
)
.await
.map_err(|e| format!("write history: {e}"))?;
let resp: Response = read_frame(&mut stream)
.await
.map_err(|e| format!("read history: {e}"))?;
if let Response::WorkspaceStatsHistory { samples } = resp {
if !samples.is_empty() {
hydrate_history.insert(w.id.to_string(), samples);
}
}
} }
// Saved pipelines. // Saved pipelines.
@@ -296,6 +332,7 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
flows, flows,
fresh_stats, fresh_stats,
quotas, quotas,
hydrate_history,
caps, caps,
recent_log, recent_log,
}) })
@@ -51,8 +51,14 @@ pub struct WorkspaceState {
/// Última muestra de `(wall_instant, cpu_usec)` usada para calcular /// Última muestra de `(wall_instant, cpu_usec)` usada para calcular
/// `cpu_percent` en la próxima medición. None hasta el primer measure. /// `cpu_percent` en la próxima medición. None hasta el primer measure.
pub last_cpu_sample: Option<(Instant, u64)>, pub last_cpu_sample: Option<(Instant, u64)>,
/// Ring buffer de samples recientes para sparklines. Se popula cada
/// vez que `workspace_stats` se llama (típicamente desde el shell).
/// Cap 64 samples = ~2 minutos a 2s/sample.
pub stats_history: std::collections::VecDeque<stats::WorkspaceStats>,
} }
const STATS_HISTORY_CAP: usize = 64;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CommandState { pub struct CommandState {
pub id: Ulid, pub id: Ulid,
@@ -562,9 +568,28 @@ impl WorkspaceManager {
} }
ws.last_cpu_sample = Some((now, cpu_now)); ws.last_cpu_sample = Some((now, cpu_now));
} }
// Append a history (ring buffer cap).
if ws.stats_history.len() >= STATS_HISTORY_CAP {
ws.stats_history.pop_front();
}
ws.stats_history.push_back(s.clone());
Some(s) Some(s)
} }
/// Retorna las últimas N samples de stats (servidas desde el ring
/// buffer interno). Sobrevive restart del shell.
pub async fn workspace_stats_history(
&self,
id: WorkspaceId,
tail: usize,
) -> Option<Vec<stats::WorkspaceStats>> {
let g = self.inner.lock().await;
let ws = g.workspaces.get(&id)?;
let take = if tail == 0 { ws.stats_history.len() } else { tail };
let skip = ws.stats_history.len().saturating_sub(take);
Some(ws.stats_history.iter().skip(skip).cloned().collect())
}
pub async fn create( pub async fn create(
self: &Arc<Self>, self: &Arc<Self>,
spec: WorkspaceSpec, spec: WorkspaceSpec,
@@ -604,6 +629,7 @@ impl WorkspaceManager {
commands: HashMap::new(), commands: HashMap::new(),
started: Instant::now(), started: Instant::now(),
last_cpu_sample: None, last_cpu_sample: None,
stats_history: std::collections::VecDeque::with_capacity(STATS_HISTORY_CAP),
}; };
self.inner.lock().await.workspaces.insert(id, state); self.inner.lock().await.workspaces.insert(id, state);
info!(%id, ?ttl, "workspace created"); info!(%id, ?ttl, "workspace created");
@@ -1364,6 +1390,38 @@ mod tests {
panic!("quota enforce kill never triggered"); panic!("quota enforce kill never triggered");
} }
#[tokio::test]
async fn workspace_stats_history_accumulates() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let spec = WorkspaceSpec {
label: "history".into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
// Necesitamos al menos un comando vivo para que `measure` no
// retorne source=none (que igual se appendea, pero con stats vacíos).
let _ = mgr
.run(id, "/bin/sleep".into(), vec!["5".into()], vec![])
.await
.unwrap();
// Llamar stats 5 veces.
for _ in 0..5 {
let _ = mgr.workspace_stats(id).await;
}
let history = mgr.workspace_stats_history(id, 0).await.unwrap();
assert_eq!(history.len(), 5, "history debería tener 5 samples");
// tail=3 retorna los últimos 3.
let tail3 = mgr.workspace_stats_history(id, 3).await.unwrap();
assert_eq!(tail3.len(), 3);
// Cleanup.
let _ = mgr.stop_with_grace(id, std::time::Duration::ZERO).await;
}
#[tokio::test] #[tokio::test]
async fn run_true_in_workspace() { async fn run_true_in_workspace() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
@@ -197,20 +197,31 @@ pub async fn run_pipeline(
for s in &splitter_specs { for s in &splitter_specs {
let mut senders_per_edge = Vec::with_capacity(s.edges.len()); let mut senders_per_edge = Vec::with_capacity(s.edges.len());
let mut paths_per_edge = Vec::with_capacity(s.edges.len()); let mut paths_per_edge = Vec::with_capacity(s.edges.len());
for (i, em) in s.edges.iter().enumerate() { for (i, _em) in s.edges.iter().enumerate() {
if !s.tap { if !s.tap {
senders_per_edge.push(None); senders_per_edge.push(None);
paths_per_edge.push(None); paths_per_edge.push(None);
continue; continue;
} }
let id = format!( // Socket name = pipeline_id full (26 chars ULID) + edge_idx.
"{}-{}-{}-{}", // ULID es único globalmente → cero colisiones entre runs.
short_ulid(&pipeline_id), // Edge_idx desambigua múltiples sockets del mismo pipeline.
em.from_label, // No incluimos from_label en el name (puede tener chars que
em.from_output, // no van en paths Unix — los hints van en `EdgeDiscernment`).
i let id = format!("{}-{}", pipeline_id, i);
); let mut socket = crate::flow_channel::default_flow_socket_path(&id);
let socket = crate::flow_channel::default_flow_socket_path(&id); // Fallback: si el path existe (raro — daemon crashed sin
// cleanup), agregar suffix numérico hasta encontrar libre.
let mut suffix = 1u32;
while socket.exists() {
let alt = format!("{id}-{suffix}");
socket = crate::flow_channel::default_flow_socket_path(&alt);
suffix += 1;
if suffix > 1000 {
warn!(orig = id, "flow socket collision: 1000 retries — using as-is");
break;
}
}
match crate::flow_channel::FlowChannel::with_replay_caps( match crate::flow_channel::FlowChannel::with_replay_caps(
socket.clone(), socket.clone(),
crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes), crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes),
@@ -277,6 +288,7 @@ pub async fn run_pipeline(
}) })
} }
#[allow(dead_code)]
fn short_ulid(u: &Ulid) -> String { fn short_ulid(u: &Ulid) -> String {
let s = u.to_string(); let s = u.to_string();
s[s.len() - 6..].to_string() s[s.len() - 6..].to_string()
@@ -110,6 +110,14 @@ pub enum Request {
/// Reporte de quotas (rlimits declarados vs uso actual). /// Reporte de quotas (rlimits declarados vs uso actual).
WorkspaceQuota { workspace: shipote_card::WorkspaceId }, WorkspaceQuota { workspace: shipote_card::WorkspaceId },
/// History de samples del workspace (server-side). Sobrevive
/// restart del shell. `tail`: cantidad de samples desde el final
/// (0 = todo).
WorkspaceStatsHistory {
workspace: shipote_card::WorkspaceId,
tail: usize,
},
/// Resumen completo de un workspace: stats + quota + commands + /// Resumen completo de un workspace: stats + quota + commands +
/// flow sockets en una sola roundtrip. Reduce N×4 requests del /// flow sockets en una sola roundtrip. Reduce N×4 requests del
/// shell a N×1. /// shell a N×1.
@@ -210,6 +218,10 @@ pub enum Response {
info: QuotaReportInfo, info: QuotaReportInfo,
}, },
WorkspaceStatsHistory {
samples: Vec<WorkspaceStatsInfo>,
},
WorkspaceFullSummary { WorkspaceFullSummary {
stats: WorkspaceStatsInfo, stats: WorkspaceStatsInfo,
quota: QuotaReportInfo, quota: QuotaReportInfo,