From 18c0344a524cb188176cbc410c5bd24b7df83b50 Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 16:20:50 +0000 Subject: [PATCH] feat(shipote): throughput card + rate-limit + snapshot incremental (fase Q) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - shipote-shell Flow channels card extiende con bytes_total + bytes/s por socket. Lookup helper evita borrows en closures. - DiscernPolicy.max_bytes_per_sec: splitter task hace sleep proporcional al tamaño de chunk tras cada broadcast. Token-bucket simple v1. - WorkspaceManager.dirty: AtomicBool. mark_dirty() en mutaciones que afectan al snapshot. save_snapshot skip si clean y path existe. restore_snapshot resetea dirty=false (hidratación no es mutation). 85 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 26, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shipote-shell/src/main.rs | 71 +++++++++++++------ .../modules/shipote/shipote-card/src/lib.rs | 6 ++ .../modules/shipote/shipote-core/src/lib.rs | 29 +++++++- .../shipote/shipote-core/src/persist.rs | 33 ++++++++- .../shipote/shipote-core/src/pipeline.rs | 20 ++++++ 5 files changed, 134 insertions(+), 25 deletions(-) diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index 2802a19..fae8099 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -6,8 +6,8 @@ use gpui::{div, prelude::*, px, Context, IntoElement, Render, SharedString, Window}; use shipote_protocol::{ - default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, QuotaReportInfo, Request, - Response, WorkspaceStatsInfo, WorkspaceSummary, + default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, FlowThroughputInfo, + QuotaReportInfo, Request, Response, WorkspaceStatsInfo, WorkspaceSummary, }; use std::path::PathBuf; use std::time::Duration; @@ -44,6 +44,8 @@ struct Shell { commands: std::collections::BTreeMap>, saved_pipelines: Vec, flows: Vec, + /// Throughput por flow socket (bytes_total + bytes/s). + flow_throughput: Vec, /// History de RSS por workspace (últimas N samples). stats_history: std::collections::BTreeMap>, /// Quota report fresco por workspace. @@ -81,6 +83,7 @@ impl Shell { me.commands = snap.commands; me.saved_pipelines = snap.saved_pipelines; me.flows = snap.flows; + me.flow_throughput = snap.flow_throughput; me.quotas = snap.quotas; // Hidratar history server-side para workspaces // que no tenían history local (primer probe). @@ -122,6 +125,7 @@ impl Shell { me.commands.clear(); me.saved_pipelines.clear(); me.flows.clear(); + me.flow_throughput.clear(); me.quotas.clear(); me.caps = None; me.recent_log = None; @@ -142,6 +146,7 @@ impl Shell { commands: std::collections::BTreeMap::new(), saved_pipelines: Vec::new(), flows: Vec::new(), + flow_throughput: Vec::new(), stats_history: std::collections::BTreeMap::new(), quotas: std::collections::BTreeMap::new(), caps: None, @@ -157,6 +162,7 @@ struct Snapshot { commands: std::collections::BTreeMap>, saved_pipelines: Vec, flows: Vec, + flow_throughput: Vec, /// Stats fresco por workspace (id.toString → stats). fresh_stats: std::collections::BTreeMap, /// Quota report fresco por workspace. @@ -254,6 +260,17 @@ fn probe_blocking(path: &std::path::Path) -> Result { Response::FlowList { items } => items, _ => Vec::new(), }; + // Throughput per-socket. + write_frame(&mut stream, &Request::FlowThroughput) + .await + .map_err(|e| format!("write throughput: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read throughput: {e}"))?; + let flow_throughput = match resp { + Response::FlowThroughput { items } => items, + _ => Vec::new(), + }; // Live tail: log del comando más reciente con bytes>0. let recent_log = { @@ -330,6 +347,7 @@ fn probe_blocking(path: &std::path::Path) -> Result { commands: commands_map, saved_pipelines, flows, + flow_throughput, fresh_stats, quotas, hydrate_history, @@ -509,31 +527,38 @@ impl Render for Shell { "ws_suffix · recurso · uso > limit".to_string() }; - // Flow channels (data plane). + // Flow channels (data plane) con throughput. let flow_count: usize = self.flows.iter().map(|f| f.sockets.len()).sum(); - let flow_items: Vec = self - .flows - .iter() - .flat_map(|f| { - let pipe = f.pipeline.to_string(); - let short = &pipe[pipe.len() - 6..]; - f.sockets - .iter() - .map(move |s| { - format!( - "{short} {}", - s.file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| s.display().to_string()) - ) - }) - .collect::>() - }) - .collect(); + // Lookup helper que NO captura por ref (evita issue de borrow + // en el closure de flat_map). + let find_tp = |s: &std::path::PathBuf| -> (f64, f64) { + for t in &self.flow_throughput { + if t.socket == *s { + return (t.bytes_total as f64 / 1024.0, t.bytes_per_sec / 1024.0); + } + } + (0.0, 0.0) + }; + let mut flow_items: Vec = Vec::new(); + for f in &self.flows { + let pipe = f.pipeline.to_string(); + let short_pipe = &pipe[pipe.len() - 6..]; + for s in &f.sockets { + let name = s + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| s.display().to_string()); + let (total_kib, rate_kib) = find_tp(s); + flow_items.push(format!( + "{short_pipe} {:<48} {:>7.1} KiB {:>6.2} KiB/s", + name, total_kib, rate_kib + )); + } + } let flow_descr = if flow_count == 0 { "pipelines con --tap exponen sockets aquí".to_string() } else { - "shipote flow tail para suscribirse".to_string() + "pipe6 · socket · total · rate".to_string() }; let body = div() diff --git a/crates/modules/shipote/shipote-card/src/lib.rs b/crates/modules/shipote/shipote-card/src/lib.rs index 64a04f7..bc3780b 100644 --- a/crates/modules/shipote/shipote-card/src/lib.rs +++ b/crates/modules/shipote/shipote-card/src/lib.rs @@ -274,6 +274,11 @@ pub struct DiscernPolicy { /// productores con chunks de tamaño variable. #[serde(default)] pub replay_bytes: usize, + /// Rate-limit del flow channel (bytes/s). `0` = sin límite. Si está + /// definido, el splitter sleeps proporcional al tamaño del chunk + /// antes de re-broadcastear. Protege subscribers lentos. + #[serde(default)] + pub max_bytes_per_sec: u64, } impl Default for DiscernPolicy { @@ -283,6 +288,7 @@ impl Default for DiscernPolicy { enrich_producer: default_true(), replay_chunks: default_replay_chunks(), replay_bytes: 0, + max_bytes_per_sec: 0, } } } diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 9ee59ec..0e3147e 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -87,6 +87,10 @@ pub enum LogStream { pub struct WorkspaceManager { inner: Arc>, incarnator: Arc, + /// True si hubo alguna mutación desde el último `save_snapshot`. + /// `save_snapshot` skip si false (snapshot incremental — evita + /// re-serialize cuando nada cambió, ej. SIGTERM tras un período idle). + dirty: std::sync::atomic::AtomicBool, } struct Inner { @@ -238,9 +242,23 @@ impl WorkspaceManager { pending_pipeline_restarts: Vec::new(), })), incarnator: Arc::new(Incarnator::new(cfg)), + dirty: std::sync::atomic::AtomicBool::new(false), } } + /// Marca el manager como dirty. Cualquier mutación que afecta al + /// snapshot debería llamar esto. + #[inline] + fn mark_dirty(&self) { + self.dirty.store(true, std::sync::atomic::Ordering::Relaxed); + } + + /// True si hubo cambios desde el último `save_snapshot`. Útil para + /// chequeos cooperativos (ej. monitoring que pollea cada N). + pub fn is_dirty(&self) -> bool { + self.dirty.load(std::sync::atomic::Ordering::Relaxed) + } + /// Registra un supervisor para un pipeline con `restart_on_failure=true`. /// El daemon llama esto tras `run_pipeline` para que `reap_dead` agregue /// el pipeline a la cola de restart cuando algún command falle. @@ -267,6 +285,8 @@ impl WorkspaceManager { current_backoff_ms: initial_backoff, }, ); + drop(g); + self.mark_dirty(); } /// Variante que preserva backoff/count del supervisor anterior (para @@ -480,6 +500,7 @@ impl WorkspaceManager { /// Guarda (o reemplaza) un PipelineSpec bajo `name`. pub async fn save_pipeline(&self, name: String, spec: PipelineSpec) { self.inner.lock().await.saved_pipelines.insert(name, spec); + self.mark_dirty(); } /// Devuelve los nombres de los pipelines guardados. @@ -497,7 +518,11 @@ impl WorkspaceManager { /// Elimina un saved pipeline. pub async fn drop_saved_pipeline(&self, name: &str) -> bool { - self.inner.lock().await.saved_pipelines.remove(name).is_some() + let existed = self.inner.lock().await.saved_pipelines.remove(name).is_some(); + if existed { + self.mark_dirty(); + } + existed } /// Label del workspace, si existe. @@ -648,6 +673,7 @@ impl WorkspaceManager { stats_history: std::collections::VecDeque::with_capacity(STATS_HISTORY_CAP), }; self.inner.lock().await.workspaces.insert(id, state); + self.mark_dirty(); info!(%id, ?ttl, "workspace created"); // Si tiene TTL, programar auto-stop. El task captura un weak ref @@ -698,6 +724,7 @@ impl WorkspaceManager { // También limpiamos flow_channels del workspace si los hubiera — // por workspace lo retenemos por pipeline, no por workspace. drop(g); + self.mark_dirty(); // 1) SIGTERM (o SIGKILL si grace=0) a todos vivos. let initial_signal = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM }; diff --git a/crates/modules/shipote/shipote-core/src/persist.rs b/crates/modules/shipote/shipote-core/src/persist.rs index 32c78bd..a88098c 100644 --- a/crates/modules/shipote/shipote-core/src/persist.rs +++ b/crates/modules/shipote/shipote-core/src/persist.rs @@ -181,10 +181,18 @@ impl WorkspaceManager { } } - /// Escribe snapshot a disco. + /// Escribe snapshot a disco. Si `is_dirty()` es false **y** el path + /// existe (snapshot previo válido), skip la escritura. pub async fn save_snapshot(&self, path: &Path) -> anyhow::Result<()> { + if !self.is_dirty() && path.exists() { + info!(path = %path.display(), "snapshot SKIPPED (clean)"); + return Ok(()); + } let snap = self.snapshot().await; snap.write(path)?; + // Clear dirty: lo que está en disco es el current state. + self.dirty + .store(false, std::sync::atomic::Ordering::Relaxed); info!(path = %path.display(), workspaces = snap.workspaces.len(), "snapshot saved"); Ok(()) } @@ -245,6 +253,11 @@ impl WorkspaceManager { out.saved_pipelines_restored += 1; } out.live_pipelines = snap.live_pipelines; + // Restore no cuenta como mutación — lo que está en disco es lo + // que acabamos de cargar. Sin esto, el próximo SIGTERM siempre + // re-escribiría aunque no hubiese cambios reales. + self.dirty + .store(false, std::sync::atomic::Ordering::Relaxed); info!( workspaces = out.workspaces_restored, saved_pipelines = out.saved_pipelines_restored, @@ -304,6 +317,24 @@ mod tests { assert!(restored_ids.contains(&id2)); } + #[tokio::test] + async fn save_snapshot_skips_when_clean() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("state.json"); + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let _ = mgr.create(sample_ws("dirty-test")).await.unwrap(); + assert!(mgr.is_dirty(), "create debería marcar dirty"); + mgr.save_snapshot(&path).await.unwrap(); + assert!(!mgr.is_dirty(), "save_snapshot debería limpiar dirty"); + let mtime1 = std::fs::metadata(&path).unwrap().modified().unwrap(); + // Esperamos un pelín para que mtime cambie si fuera re-escrito. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + // Segundo save sin mutación → skip. + mgr.save_snapshot(&path).await.unwrap(); + let mtime2 = std::fs::metadata(&path).unwrap().modified().unwrap(); + assert_eq!(mtime1, mtime2, "skip cuando clean — mtime no cambia"); + } + #[tokio::test] async fn snapshot_includes_saved_pipelines() { use shipote_card::{CommandRef, DiscernPolicy, PipelineSpec}; diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index 8cf8954..3a2b708 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -132,6 +132,7 @@ pub async fn run_pipeline( edges: edge_meta, tap, sample_bytes: spec.discern.sample_bytes, + max_bytes_per_sec: spec.discern.max_bytes_per_sec, }); } @@ -308,6 +309,9 @@ struct SplitterSpec { edges: Vec, tap: bool, sample_bytes: usize, + /// Rate-limit en bytes/s (0 = sin limit). Tras cada chunk de `n` + /// bytes, splitter sleeps `n / max_bytes_per_sec` segundos. + max_bytes_per_sec: u64, } struct SplitterHandle { @@ -430,6 +434,7 @@ fn spawn_splitter( } broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; + rate_limit_sleep(spec.max_bytes_per_sec, n).await; } let d = if spec.tap { @@ -448,6 +453,7 @@ fn spawn_splitter( if n == 0 { break; } broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; + rate_limit_sleep(spec.max_bytes_per_sec, n).await; } debug!(bytes = total, consumers = writers.len(), "splitter finished"); @@ -469,6 +475,19 @@ fn spawn_splitter( SplitterHandle { handle } } +/// Token-bucket simple: si `max_bps > 0`, sleep `chunk_size / max_bps` +/// segundos. Implementación crude pero suficiente para v1. +async fn rate_limit_sleep(max_bps: u64, chunk_bytes: usize) { + if max_bps == 0 { + return; + } + let secs = chunk_bytes as f64 / max_bps as f64; + let ms = (secs * 1000.0) as u64; + if ms > 0 { + tokio::time::sleep(std::time::Duration::from_millis(ms)).await; + } +} + async fn broadcast_chunk( writers: &[AsyncFd], edge_senders: &[Option], @@ -721,6 +740,7 @@ mod tests { enrich_producer: true, replay_chunks: 32, replay_bytes: 0, + max_bytes_per_sec: 0, }, restart_on_failure: false, restart_backoff_ms: 200,