diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index fff25f4..9740f3f 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -103,6 +103,10 @@ enum PipeCmd { /// Variables `KEY=VALUE` para sustitución `${KEY}` en el spec. #[arg(long = "var", value_parser = parse_kv)] vars: Vec<(String, String)>, + /// Tras lanzar, suscribir al primer flow socket y volcar bytes + /// a stdout hasta EOF. Implica `--tap`. + #[arg(long)] + tail: bool, }, /// Guardar un pipeline bajo un nombre (persiste con el snapshot). Save { @@ -130,6 +134,8 @@ enum PipeCmd { tap: bool, #[arg(long = "var", value_parser = parse_kv)] vars: Vec<(String, String)>, + #[arg(long)] + tail: bool, }, } @@ -243,9 +249,14 @@ async fn main() -> Result<()> { .cpu_usec .map(|u| format!("{:.3} s", u as f64 / 1_000_000.0)) .unwrap_or_else(|| "—".into()); + let cpu_pct = info + .cpu_percent + .map(|p| format!("{p:.1} %")) + .unwrap_or_else(|| "— (esperando 2do sample)".into()); println!("rss: {rss}"); println!("rss_peak: {peak}"); println!("cpu: {cpu}"); + println!("cpu_pct: {cpu_pct}"); println!("source: {}", info.source); println!("uptime: {} ms", info.uptime_ms); } @@ -287,18 +298,28 @@ async fn main() -> Result<()> { } } - Cmd::Pipeline(PipeCmd::Run { spec, tap, vars }) => { + Cmd::Pipeline(PipeCmd::Run { spec, tap, vars, tail }) => { let p = load_pipeline_spec(&spec).with_context(|| format!("load {}", spec.display()))?; + // --tail implica --tap (no hay flow socket sin tap). + let effective_tap = tap || tail; let resp = round_trip( &mut stream, Request::PipelineRun { spec: p, - tap, + tap: effective_tap, vars: vars.into_iter().collect(), }, ) .await?; - print_pipeline_started(resp)?; + let socket = print_pipeline_started_returning_socket(resp)?; + if tail { + if let Some(sock) = socket { + eprintln!("--- tailing {} ---", sock.display()); + tail_socket(&sock).await?; + } else { + eprintln!("--tail: no hay flow socket disponible"); + } + } } Cmd::Pipeline(PipeCmd::Save { name, spec }) => { @@ -351,17 +372,26 @@ async fn main() -> Result<()> { } } - Cmd::Pipeline(PipeCmd::RunSaved { name, tap, vars }) => { + Cmd::Pipeline(PipeCmd::RunSaved { name, tap, vars, tail }) => { + let effective_tap = tap || tail; let resp = round_trip( &mut stream, Request::PipelineRunSaved { name, - tap, + tap: effective_tap, vars: vars.into_iter().collect(), }, ) .await?; - print_pipeline_started(resp)?; + let socket = print_pipeline_started_returning_socket(resp)?; + if tail { + if let Some(sock) = socket { + eprintln!("--- tailing {} ---", sock.display()); + tail_socket(&sock).await?; + } else { + eprintln!("--tail: no hay flow socket disponible"); + } + } } Cmd::Commands { workspace } => { @@ -511,29 +541,55 @@ fn print_unexpected(r: &Response) { eprintln!("unexpected response: {r:?}"); } -fn print_pipeline_started(resp: Response) -> Result<()> { +/// Imprime el resultado del launch del pipeline y retorna el path del +/// primer flow socket (si hay), útil para `--tail`. +fn print_pipeline_started_returning_socket(resp: Response) -> Result> { match resp { Response::PipelineStarted { pipeline, command_pids, edges } => { println!("pipeline {pipeline}"); for (label, pid) in command_pids { println!(" {:<20} pid={pid}", label); } + let mut first_socket: Option = None; if !edges.is_empty() { println!("edges:"); - for e in edges { + for e in &edges { println!( " {}.{} → {}.{} ty={:?} mime={:?} conf={:.2}", e.from_label, e.from_output, e.to_label, e.to_input, e.ty, e.mime, e.confidence, ); + if first_socket.is_none() { + first_socket = e.flow_socket.clone(); + } } } - Ok(()) + Ok(first_socket) } Response::Error { message } => Err(anyhow!(message)), other => { print_unexpected(&other); - Ok(()) + Ok(None) } } } + +async fn tail_socket(socket: &std::path::Path) -> Result<()> { + use tokio::io::AsyncReadExt; + // Pequeña ventana de retry — el daemon retiene el flow channel + // antes de retornar, así que en la práctica ya está bindeado. + let mut s = UnixStream::connect(socket) + .await + .with_context(|| format!("connect {}", socket.display()))?; + let mut buf = [0u8; 4096]; + loop { + let n = s.read(&mut buf).await?; + if n == 0 { + break; + } + use std::io::Write; + let _ = std::io::stdout().write_all(&buf[..n]); + let _ = std::io::stdout().flush(); + } + Ok(()) +} diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index c1407a9..d065c10 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -344,6 +344,7 @@ async fn dispatch( rss_bytes: s.rss_bytes, rss_peak_bytes: s.rss_peak_bytes, cpu_usec: s.cpu_usec, + cpu_percent: s.cpu_percent, source: s.source, uptime_ms: s.uptime_ms, }, diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index 952b4cf..d8bf01a 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -389,19 +389,24 @@ impl Render for Shell { .map(|h| h.iter().map(|s| s.rss_bytes.unwrap_or(0)).collect()) .unwrap_or_default(); let spark = sparkline(&rss_series, STATS_HISTORY_LEN); - let (rss_now, peak) = history - .and_then(|h| h.back()) - .map(|s| (s.rss_bytes.unwrap_or(0), s.rss_peak_bytes.unwrap_or(0))) - .unwrap_or((0, 0)); + let latest = history.and_then(|h| h.back()); + let (rss_now, peak, cpu_pct) = latest + .map(|s| ( + s.rss_bytes.unwrap_or(0), + s.rss_peak_bytes.unwrap_or(0), + s.cpu_percent.unwrap_or(0.0), + )) + .unwrap_or((0, 0, 0.0)); let rss_mb = rss_now as f64 / 1024.0 / 1024.0; let peak_mb = peak as f64 / 1024.0 / 1024.0; format!( - "{:<14} {:<14} {} {:>6.1}M peak {:>6.1}M", + "{:<14} {:<14} {} {:>6.1}M peak {:>6.1}M {:>5.1}%cpu", &w.id.to_string()[20..], w.label, spark, rss_mb, peak_mb, + cpu_pct, ) }) .collect(); diff --git a/crates/modules/shipote/shipote-card/src/lib.rs b/crates/modules/shipote/shipote-card/src/lib.rs index 06f936d..b1fc374 100644 --- a/crates/modules/shipote/shipote-card/src/lib.rs +++ b/crates/modules/shipote/shipote-card/src/lib.rs @@ -216,6 +216,12 @@ pub struct DiscernPolicy { /// querés que los consumidores tardíos vean toda la salida. #[serde(default = "default_replay_chunks")] pub replay_chunks: usize, + /// Tope adicional por **bytes** acumulados en el replay buffer. Lo + /// que se exceda primero (chunks o bytes) drop-ea el chunk más viejo. + /// `0` = sin tope por bytes (sólo aplica `replay_chunks`). Útil para + /// productores con chunks de tamaño variable. + #[serde(default)] + pub replay_bytes: usize, } impl Default for DiscernPolicy { @@ -224,6 +230,7 @@ impl Default for DiscernPolicy { sample_bytes: default_sample_bytes(), enrich_producer: default_true(), replay_chunks: default_replay_chunks(), + replay_bytes: 0, } } } diff --git a/crates/modules/shipote/shipote-core/src/flow_channel.rs b/crates/modules/shipote/shipote-core/src/flow_channel.rs index a170675..be0932c 100644 --- a/crates/modules/shipote/shipote-core/src/flow_channel.rs +++ b/crates/modules/shipote/shipote-core/src/flow_channel.rs @@ -45,16 +45,33 @@ pub const DEFAULT_REPLAY_CHUNKS: usize = 32; pub struct FlowChannel { sender: broadcast::Sender>>, replay: Arc>>>>, - replay_cap: usize, + replay_caps: ReplayCaps, socket_path: PathBuf, _accept_handle: AbortOnDrop, } +#[derive(Debug, Clone, Copy)] +pub struct ReplayCaps { + /// Máximo de chunks retenidos. + pub chunks: usize, + /// Máximo de bytes (sumando len de chunks). `0` = sin tope. + pub bytes: usize, +} + +impl ReplayCaps { + pub fn chunks_only(chunks: usize) -> Self { + Self { chunks: chunks.max(1), bytes: 0 } + } + pub fn new(chunks: usize, bytes: usize) -> Self { + Self { chunks: chunks.max(1), bytes } + } +} + #[derive(Clone)] pub struct FlowSender { sender: broadcast::Sender>>, replay: Arc>>>>, - replay_cap: usize, + replay_caps: ReplayCaps, } impl FlowSender { @@ -62,17 +79,37 @@ impl FlowSender { /// el broadcast::send retorna Err pero igual guardamos en replay /// (subscribers tarde verán los chunks pasados). pub fn send(&self, data: Arc>) { - let cap = self.replay_cap; + let incoming = data.len(); + let caps = self.replay_caps; if let Ok(mut g) = self.replay.lock() { - if g.len() >= cap { - g.pop_front(); - } + evict_for_incoming(&mut g, caps, incoming); g.push_back(data.clone()); } let _ = self.sender.send(data); } } +/// Evict los chunks más viejos para hacer espacio a un chunk entrante de +/// `incoming` bytes — el buffer post-push queda dentro de los caps. +fn evict_for_incoming(buf: &mut VecDeque>>, caps: ReplayCaps, incoming: usize) { + // 1) chunks: dejar lugar para 1 más. + while buf.len() + 1 > caps.chunks { + if buf.pop_front().is_none() { + break; + } + } + // 2) bytes (si está activado). + if caps.bytes > 0 { + let mut current: usize = buf.iter().map(|a| a.len()).sum(); + while current + incoming > caps.bytes { + match buf.pop_front() { + Some(c) => current = current.saturating_sub(c.len()), + None => break, + } + } + } +} + impl std::fmt::Debug for FlowChannel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlowChannel") @@ -86,11 +123,14 @@ impl FlowChannel { /// Crea un FlowChannel atado al path `socket_path`. Si el path ya /// existe, lo borra antes de bind (asume restart limpio). pub fn new(socket_path: PathBuf) -> std::io::Result { - Self::with_replay_cap(socket_path, DEFAULT_REPLAY_CHUNKS) + Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(DEFAULT_REPLAY_CHUNKS)) } - pub fn with_replay_cap(socket_path: PathBuf, replay_cap: usize) -> std::io::Result { - let cap = replay_cap.max(1); + pub fn with_replay_cap(socket_path: PathBuf, chunks: usize) -> std::io::Result { + Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(chunks)) + } + + pub fn with_replay_caps(socket_path: PathBuf, caps: ReplayCaps) -> std::io::Result { if socket_path.exists() { let _ = std::fs::remove_file(&socket_path); } @@ -100,7 +140,7 @@ impl FlowChannel { let listener = UnixListener::bind(&socket_path)?; let (tx, _rx_unused) = broadcast::channel::>>(BROADCAST_CAP); let replay: Arc>>>> = - Arc::new(Mutex::new(VecDeque::with_capacity(cap))); + Arc::new(Mutex::new(VecDeque::with_capacity(caps.chunks))); let tx_for_accept = tx.clone(); let replay_for_accept = replay.clone(); let path_for_log = socket_path.clone(); @@ -153,21 +193,21 @@ impl FlowChannel { Ok(Self { sender: tx, replay, - replay_cap: cap, + replay_caps: caps, socket_path, _accept_handle: AbortOnDrop(join.abort_handle()), }) } /// Push un chunk al channel. Si no hay subscribers, drop silencioso. - /// Siempre se guarda en el replay buffer (con cap rotation). + /// Siempre se guarda en el replay buffer (con cap rotation por chunks + /// y opcionalmente por bytes). pub fn send(&self, data: Vec) { + let incoming = data.len(); let arc = Arc::new(data); - let cap = self.replay_cap; + let caps = self.replay_caps; if let Ok(mut g) = self.replay.lock() { - if g.len() >= cap { - g.pop_front(); - } + evict_for_incoming(&mut g, caps, incoming); g.push_back(arc.clone()); } let _ = self.sender.send(arc); @@ -184,7 +224,7 @@ impl FlowChannel { FlowSender { sender: self.sender.clone(), replay: self.replay.clone(), - replay_cap: self.replay_cap, + replay_caps: self.replay_caps, } } @@ -303,6 +343,47 @@ mod tests { assert!(s.contains("chunk-3"), "got: {s:?}"); } + #[tokio::test] + async fn replay_evicts_by_bytes_cap() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("flow.sock"); + // chunks=100 (no limita), bytes=20: deberíamos retener sólo los + // últimos chunks cuyos bytes sumen ≤ 20. + let ch = FlowChannel::with_replay_caps(path.clone(), ReplayCaps::new(100, 20)).unwrap(); + ch.send(b"AAAAAAAA".to_vec()); // 8 bytes + ch.send(b"BBBBBBBB".to_vec()); // 8 → total 16 + ch.send(b"CCCCCCCC".to_vec()); // 8 → total 24 > 20, evict A → 16 + ch.send(b"DDDDDDDD".to_vec()); // 8 → total 24 > 20, evict B → 16 + + let path_clone = path.clone(); + let task = tokio::spawn(async move { + let mut stream = UnixStream::connect(&path_clone).await.unwrap(); + let mut buf = vec![0u8; 64]; + let mut total = Vec::new(); + for _ in 0..20 { + let n = stream.read(&mut buf).await.unwrap(); + if n == 0 { + break; + } + total.extend_from_slice(&buf[..n]); + if total.len() >= 16 { + break; + } + } + total + }); + let got = tokio::time::timeout(std::time::Duration::from_secs(2), task) + .await + .expect("timeout") + .unwrap(); + let s = String::from_utf8_lossy(&got); + // Sólo C y D (los más viejos A y B fueron evicted). + assert!(!s.contains("AAAA"), "should have evicted A: {s:?}"); + assert!(!s.contains("BBBB"), "should have evicted B: {s:?}"); + assert!(s.contains("CCCC"), "should keep C: {s:?}"); + assert!(s.contains("DDDD"), "should keep D: {s:?}"); + } + #[tokio::test] async fn drop_removes_socket() { let tmp = tempfile::tempdir().unwrap(); diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 9b1af6c..a72e74c 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -48,6 +48,9 @@ pub struct WorkspaceState { pub root_card: Card, pub commands: HashMap, pub started: Instant, + /// Última muestra de `(wall_instant, cpu_usec)` usada para calcular + /// `cpu_percent` en la próxima medición. None hasta el primer measure. + pub last_cpu_sample: Option<(Instant, u64)>, } #[derive(Debug, Clone)] @@ -354,9 +357,12 @@ impl WorkspaceManager { /// comandos vivos. Lee `/proc//` directamente; si el spec declara /// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye /// descendants). + /// + /// `cpu_percent` se calcula entre samples consecutivos. Necesita ≥2 + /// llamadas para tener un valor (la primera siempre retorna `None`). pub async fn workspace_stats(&self, id: WorkspaceId) -> Option { - let g = self.inner.lock().await; - let ws = g.workspaces.get(&id)?; + let mut g = self.inner.lock().await; + let ws = g.workspaces.get_mut(&id)?; let alive: Vec = ws .commands .values() @@ -367,8 +373,6 @@ impl WorkspaceManager { let cgroup_path = if ws.spec.soma.cgroup.path.is_empty() { None } else { - // resolve_cgroup_path está en ente_incarnate, pero acá basta - // con el path absoluto bajo /sys/fs/cgroup. Resolución gruesa. Some(std::path::PathBuf::from(format!( "/sys/fs/cgroup{}", ws.spec.soma.cgroup.path @@ -376,6 +380,20 @@ impl WorkspaceManager { }; let mut s = stats::measure(&alive, cgroup_path.as_deref(), ws.started); s.commands_total = total; + + // CPU%: diff entre el sample actual y el previo, dividido por + // wall time. 100% = 1 core saturado. >100% = varios cores. + let now = Instant::now(); + if let Some(cpu_now) = s.cpu_usec { + if let Some((prev_t, prev_cpu)) = ws.last_cpu_sample { + let dt_us = now.duration_since(prev_t).as_micros() as u64; + let d_cpu = cpu_now.saturating_sub(prev_cpu); + if dt_us > 0 { + s.cpu_percent = Some(100.0 * d_cpu as f32 / dt_us as f32); + } + } + ws.last_cpu_sample = Some((now, cpu_now)); + } Some(s) } @@ -403,6 +421,7 @@ impl WorkspaceManager { root_card: card, commands: HashMap::new(), started: Instant::now(), + last_cpu_sample: None, }; self.inner.lock().await.workspaces.insert(id, state); info!(%id, ?ttl, "workspace created"); diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index 90c84de..ece10e6 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -211,7 +211,10 @@ pub async fn run_pipeline( i ); let socket = crate::flow_channel::default_flow_socket_path(&id); - match crate::flow_channel::FlowChannel::with_replay_cap(socket.clone(), spec.discern.replay_chunks) { + match crate::flow_channel::FlowChannel::with_replay_caps( + socket.clone(), + crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes), + ) { Ok(fc) => { senders_per_edge.push(Some(fc.sender_handle())); paths_per_edge.push(Some(socket)); @@ -693,6 +696,7 @@ mod tests { sample_bytes: 4096, enrich_producer: true, replay_chunks: 32, + replay_bytes: 0, }, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); diff --git a/crates/modules/shipote/shipote-core/src/stats.rs b/crates/modules/shipote/shipote-core/src/stats.rs index 8ae470d..cd199ec 100644 --- a/crates/modules/shipote/shipote-core/src/stats.rs +++ b/crates/modules/shipote/shipote-core/src/stats.rs @@ -23,6 +23,9 @@ pub struct WorkspaceStats { pub rss_peak_bytes: Option, /// Tiempo CPU acumulado en microsegundos. `None` si no se pudo medir. pub cpu_usec: Option, + /// %CPU instantáneo derivado entre dos samples consecutivos. `None` + /// en el primer sample (no hay baseline). `100.0` = 1 core saturado. + pub cpu_percent: Option, /// Fuente del dato: "proc" | "cgroup" | "mixed". pub source: String, /// Wall-clock uptime del workspace en milisegundos. @@ -67,6 +70,7 @@ pub fn measure( rss_bytes: rss, rss_peak_bytes: rss_peak, cpu_usec: cpu, + cpu_percent: None, // El caller lo rellena con el diff vs prev sample. source, uptime_ms: workspace_started.elapsed().as_millis() as u64, } diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 0c77c2e..a5c1e7e 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -216,6 +216,8 @@ pub struct WorkspaceStatsInfo { #[serde(default)] pub rss_peak_bytes: Option, pub cpu_usec: Option, + #[serde(default)] + pub cpu_percent: Option, pub source: String, pub uptime_ms: u64, }