From 36dac00c8dd8d39d0306a4804c5432b9ef0c5935 Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 00:29:46 +0000 Subject: [PATCH] feat(shipote): data plane + DAG fan-in/out + stats + lifecycle (fases F-I) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pipeline runtime: - Fan-out 1→N (splitter task replica al N consumers) y fan-in N→1 (merger task con mpsc + reader-per-input). DAGs no lineales soportados. - Flow channels: Unix socket + tokio broadcast con replay buffer configurable por pipeline (DiscernPolicy.replay_chunks). Subscribers externos vía `shipote flow tail `. - Templating en specs con `${KEY}` (CLI `--var KEY=VALUE`). Walk recursivo sobre serde_json::Value, soporta todos los strings del schema. - Pipelines guardados (`pipeline save/saved-list/drop/run-saved`) persisten con el snapshot. Lifecycle de comandos: - Log capture per-stream (stdout/stderr separados) via pipe O_CLOEXEC + AsyncFd. CLI `shipote logs --stream {stdout,stderr,both}`. - Stop graceful con tiempo configurable: SIGTERM → grace → SIGKILL. Tanto a nivel workspace como pipeline individual. - TTL auto-stop ya existente (Fase C) sigue funcionando. ente-incarnate: - ChildStdio declarativo (Fase C) + ChildPreExec declarativo nuevo: NoNewPrivs, ParentDeathSig, Dumpable, NewSession, Chdir, Umask. - Aplicación pre-execve async-signal-safe en ambos paths (plain via Command::pre_exec, namespaced via callback del clone(2)). Observabilidad: - WorkspaceStats: RSS + RSS peak (VmHWM o memory.peak cgroup) + CPU usec + uptime. Fuente per-proc o cgroup según delegation. - shipote-shell con sparkline ASCII por workspace (history cap 24), card de flow channels activos, vista de comandos + saved pipelines. - Tap → broker: cada edge enriquecido con TypeRef se anuncia como Card efímera vía SidecarPool (graceful si broker no corre). Discern: - Integrado en yahweh-provider-fs (mime_type en EntityNode). - Integrado en nouser-core::cluster::pick_lens como fallback cuando la extensión cae a Lens::Grid. 79 tests pasan: ente-incarnate (16), nouser-core (27), shipote-card (8), shipote-core (20), shipote-discern (5), yahweh-provider-fs (3). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shipote-cli/src/main.rs | 164 ++++- crates/apps/shipote-daemon/src/main.rs | 231 +++++-- crates/apps/shipote-shell/src/main.rs | 223 ++++++- .../modules/shipote/shipote-card/src/lib.rs | 130 +++- .../shipote/shipote-core/src/flow_channel.rs | 320 ++++++++++ .../modules/shipote/shipote-core/src/lib.rs | 365 +++++++++-- .../shipote/shipote-core/src/pipeline.rs | 566 +++++++++++++----- .../modules/shipote/shipote-core/src/stats.rs | 168 ++++++ .../shipote/shipote-protocol/src/lib.rs | 82 ++- crates/shared/ente-incarnate/src/lib.rs | 58 +- .../shared/ente-incarnate/src/namespaced.rs | 11 + crates/shared/ente-incarnate/src/plain.rs | 19 + crates/shared/ente-incarnate/src/pre_exec.rs | 103 ++++ 13 files changed, 2187 insertions(+), 253 deletions(-) create mode 100644 crates/modules/shipote/shipote-core/src/flow_channel.rs create mode 100644 crates/modules/shipote/shipote-core/src/stats.rs create mode 100644 crates/shared/ente-incarnate/src/pre_exec.rs diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index eceb3d1..fff25f4 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -63,11 +63,31 @@ enum Cmd { /// Bytes desde el final (0 = todo). #[arg(long, default_value_t = 0)] tail: usize, + /// Stream a leer: stdout | stderr | both. + #[arg(long, default_value = "both")] + stream: String, }, /// Pipeline DAG con flujo tipado. #[command(subcommand)] Pipeline(PipeCmd), + + /// Flow data plane (subscribirse a streams enriquecidos). + #[command(subcommand)] + Flow(FlowCmd), +} + +#[derive(Subcommand, Debug)] +enum FlowCmd { + /// Listar pipelines activos con sus sockets de flow. + List, + /// Cerrar el data plane de un pipeline (drop de todos sus sockets). + Drop { pipeline: String }, + /// Suscribirse a un flow socket y volcar bytes a stdout. + Tail { + /// Path al Unix socket del flow. + socket: PathBuf, + }, } #[derive(Subcommand, Debug)] @@ -80,6 +100,9 @@ enum PipeCmd { /// discernir el TypeRef del flujo. #[arg(long)] tap: bool, + /// Variables `KEY=VALUE` para sustitución `${KEY}` en el spec. + #[arg(long = "var", value_parser = parse_kv)] + vars: Vec<(String, String)>, }, /// Guardar un pipeline bajo un nombre (persiste con el snapshot). Save { @@ -90,16 +113,31 @@ enum PipeCmd { }, /// Listar nombres de pipelines guardados. SavedList, - /// Eliminar un pipeline guardado. + /// Eliminar un pipeline guardado (no afecta runs en curso). Drop { name: String }, + /// Detener un pipeline en curso por ID (SIGTERM → grace → SIGKILL + /// sólo a sus comandos). + Stop { + /// ULID del pipeline (devuelto por `pipeline run`). + pipeline: String, + #[arg(long, default_value_t = 1000)] + grace_ms: u64, + }, /// Ejecutar un pipeline guardado por nombre. RunSaved { name: String, #[arg(long)] tap: bool, + #[arg(long = "var", value_parser = parse_kv)] + vars: Vec<(String, String)>, }, } +fn parse_kv(s: &str) -> Result<(String, String), String> { + let (k, v) = s.split_once('=').ok_or_else(|| format!("expected KEY=VALUE, got `{s}`"))?; + Ok((k.to_string(), v.to_string())) +} + #[derive(Subcommand, Debug)] enum WsCmd { /// Crear un workspace desde un spec TOML/JSON. @@ -112,6 +150,13 @@ enum WsCmd { /// Detener un workspace por ID. Stop { id: String, + /// Milisegundos de gracia tras SIGTERM antes de SIGKILL. + #[arg(long, default_value_t = 1000)] + grace_ms: u64, + }, + /// Resource accounting (RSS, CPU, comandos vivos). + Stats { + id: String, }, } @@ -185,9 +230,33 @@ async fn main() -> Result<()> { } } - Cmd::Workspace(WsCmd::Stop { id }) => { + Cmd::Workspace(WsCmd::Stats { id }) => { let id = parse_ws_id(&id)?; - let resp = round_trip(&mut stream, Request::WorkspaceStop { id }).await?; + let resp = round_trip(&mut stream, Request::WorkspaceStats { workspace: id }).await?; + match resp { + Response::WorkspaceStats { info } => { + println!("commands: {} alive / {} total", info.commands_alive, info.commands_total); + let fmt_mib = |b: u64| format!("{:.2} MiB", b as f64 / 1024.0 / 1024.0); + let rss = info.rss_bytes.map(fmt_mib).unwrap_or_else(|| "—".into()); + let peak = info.rss_peak_bytes.map(fmt_mib).unwrap_or_else(|| "—".into()); + let cpu = info + .cpu_usec + .map(|u| format!("{:.3} s", u as f64 / 1_000_000.0)) + .unwrap_or_else(|| "—".into()); + println!("rss: {rss}"); + println!("rss_peak: {peak}"); + println!("cpu: {cpu}"); + println!("source: {}", info.source); + println!("uptime: {} ms", info.uptime_ms); + } + Response::Error { message } => return Err(anyhow!(message)), + other => print_unexpected(&other), + } + } + + Cmd::Workspace(WsCmd::Stop { id, grace_ms }) => { + let id = parse_ws_id(&id)?; + let resp = round_trip(&mut stream, Request::WorkspaceStop { id, grace_ms }).await?; match resp { Response::WorkspaceStopped { id, reaped } => { println!("stopped {id} (reaped {reaped})"); @@ -218,9 +287,17 @@ async fn main() -> Result<()> { } } - Cmd::Pipeline(PipeCmd::Run { spec, tap }) => { + Cmd::Pipeline(PipeCmd::Run { spec, tap, vars }) => { let p = load_pipeline_spec(&spec).with_context(|| format!("load {}", spec.display()))?; - let resp = round_trip(&mut stream, Request::PipelineRun { spec: p, tap }).await?; + let resp = round_trip( + &mut stream, + Request::PipelineRun { + spec: p, + tap, + vars: vars.into_iter().collect(), + }, + ) + .await?; print_pipeline_started(resp)?; } @@ -249,6 +326,17 @@ async fn main() -> Result<()> { } } + Cmd::Pipeline(PipeCmd::Stop { pipeline, grace_ms }) => { + let pid = Ulid::from_string(&pipeline).map_err(|e| anyhow!("invalid pipeline id: {e}"))?; + let resp = round_trip(&mut stream, Request::PipelineStop { pipeline: pid, grace_ms }).await?; + match resp { + Response::PipelineStopped { pipeline, reaped } => { + println!("stopped pipeline {pipeline} (reaped {reaped})"); + } + other => print_unexpected(&other), + } + } + Cmd::Pipeline(PipeCmd::Drop { name }) => { let resp = round_trip(&mut stream, Request::PipelineDrop { name }).await?; match resp { @@ -263,8 +351,16 @@ async fn main() -> Result<()> { } } - Cmd::Pipeline(PipeCmd::RunSaved { name, tap }) => { - let resp = round_trip(&mut stream, Request::PipelineRunSaved { name, tap }).await?; + Cmd::Pipeline(PipeCmd::RunSaved { name, tap, vars }) => { + let resp = round_trip( + &mut stream, + Request::PipelineRunSaved { + name, + tap, + vars: vars.into_iter().collect(), + }, + ) + .await?; print_pipeline_started(resp)?; } @@ -292,7 +388,7 @@ async fn main() -> Result<()> { } } - Cmd::Logs { workspace, command, tail } => { + Cmd::Logs { workspace, command, tail, stream: which_stream } => { let ws = parse_ws_id(&workspace)?; let cmd_id = Ulid::from_string(&command).map_err(|e| anyhow!("invalid command id: {e}"))?; let resp = round_trip( @@ -301,6 +397,7 @@ async fn main() -> Result<()> { workspace: ws, command: cmd_id, tail_bytes: tail, + stream: which_stream, }, ) .await?; @@ -316,6 +413,57 @@ async fn main() -> Result<()> { } } + Cmd::Flow(FlowCmd::List) => { + let resp = round_trip(&mut stream, Request::FlowList).await?; + match resp { + Response::FlowList { items } => { + if items.is_empty() { + println!("(no active flows)"); + } + for it in items { + println!("{}", it.pipeline); + for s in it.sockets { + println!(" {}", s.display()); + } + } + } + other => print_unexpected(&other), + } + } + + Cmd::Flow(FlowCmd::Drop { pipeline }) => { + let pid = Ulid::from_string(&pipeline).map_err(|e| anyhow!("invalid pipeline id: {e}"))?; + let resp = round_trip(&mut stream, Request::FlowDrop { pipeline: pid }).await?; + match resp { + Response::FlowDropped { pipeline, existed } => { + if existed { + println!("dropped {pipeline}"); + } else { + eprintln!("no existía: {pipeline}"); + } + } + other => print_unexpected(&other), + } + } + + Cmd::Flow(FlowCmd::Tail { socket }) => { + // Subscribirse directo al socket — no pasamos por el daemon. + use tokio::io::AsyncReadExt; + let mut s = UnixStream::connect(&socket) + .await + .with_context(|| format!("connect {}", socket.display()))?; + let mut buf = [0u8; 4096]; + loop { + let n = s.read(&mut buf).await?; + if n == 0 { + break; + } + use std::io::Write; + let _ = std::io::stdout().write_all(&buf[..n]); + let _ = std::io::stdout().flush(); + } + } + Cmd::Discern { path } => { let bytes = std::fs::read(&path).with_context(|| format!("read {}", path.display()))?; // Sample: hasta 4 KiB. diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index 76512b0..c1407a9 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -17,7 +17,7 @@ use shipote_core::WorkspaceManager; use shipote_discern::{DiscernPipeline, Hint}; use shipote_protocol::{ default_socket_path, read_frame, write_frame, CommandInfo as ProtoCommandInfo, - EdgeDiscernmentInfo, Request, Response, WorkspaceSummary, + EdgeDiscernmentInfo, FlowInfo, Request, Response, WorkspaceStatsInfo, WorkspaceSummary, }; use std::sync::Arc; use tokio::net::{UnixListener, UnixStream}; @@ -38,10 +38,18 @@ async fn main() -> anyhow::Result<()> { let listener = UnixListener::bind(&sock).with_context(|| format!("bind {}", sock.display()))?; info!(socket = %sock.display(), "shipote-daemon listening"); - // Sidecar al broker: shipote se anuncia como sesión. Si el Init no - // está corriendo, el sidecar loguea y termina; el daemon sigue - // standalone (UX de v1: ningún feature requiere broker). - brahman_sidecar::spawn(build_daemon_card(&sock)); + // Sidecar pool: una sesión global del daemon + N sesiones efímeras + // por edge enriquecido tras cada pipeline tap. + let sidecar_pool = match brahman_sidecar::SidecarPool::new() { + Ok(p) => Some(Arc::new(p)), + Err(e) => { + warn!(?e, "SidecarPool falló — broker integration disabled"); + None + } + }; + if let Some(pool) = &sidecar_pool { + pool.spawn(build_daemon_card(&sock)); + } let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig { // El daemon aún no se conecta al broker; cuando lo haga, este path @@ -101,8 +109,9 @@ async fn main() -> anyhow::Result<()> { Ok((stream, _)) => { let mgr = mgr.clone(); let disc = discerner.clone(); + let pool = sidecar_pool.clone(); tokio::spawn(async move { - if let Err(e) = handle_client(stream, mgr, disc).await { + if let Err(e) = handle_client(stream, mgr, disc, pool).await { warn!(?e, "client handler error"); } }); @@ -119,6 +128,7 @@ async fn handle_client( mut stream: UnixStream, mgr: Arc, disc: Arc, + pool: Option>, ) -> anyhow::Result<()> { loop { let req: Request = match read_frame(&mut stream).await { @@ -126,12 +136,17 @@ async fn handle_client( Err(shipote_protocol::ProtocolError::Closed) => return Ok(()), Err(e) => return Err(e.into()), }; - let resp = dispatch(&mgr, &disc, req).await; + let resp = dispatch(&mgr, &disc, &pool, req).await; write_frame(&mut stream, &resp).await?; } } -async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Request) -> Response { +async fn dispatch( + mgr: &Arc, + disc: &DiscernPipeline, + pool: &Option>, + req: Request, +) -> Response { match req { Request::Ping => Response::Pong, @@ -155,10 +170,15 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ Response::WorkspaceList { items } } - Request::WorkspaceStop { id } => match mgr.stop(id).await { - Ok(reaped) => Response::WorkspaceStopped { id, reaped }, - Err(e) => Response::Error { message: format!("{e}") }, - }, + Request::WorkspaceStop { id, grace_ms } => { + match mgr + .stop_with_grace(id, std::time::Duration::from_millis(grace_ms)) + .await + { + Ok(reaped) => Response::WorkspaceStopped { id, reaped }, + Err(e) => Response::Error { message: format!("{e}") }, + } + } Request::Run { workspace, exec, argv, envp } => { match mgr.run(workspace, exec, argv, envp).await { @@ -171,7 +191,12 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ } } - Request::PipelineRun { spec, tap } => { + Request::PipelineRun { spec, tap, vars } => { + let vars_map: std::collections::HashMap = vars.into_iter().collect(); + let spec = match shipote_card::substitute_vars(&spec, &vars_map) { + Ok(s) => s, + Err(e) => return Response::Error { message: format!("template: {e}") }, + }; let disc = DiscernPipeline::default_pipeline(); let inc = mgr.incarnator_handle(); let ws_label = mgr.workspace_label(spec.workspace).await.unwrap_or_default(); @@ -181,27 +206,22 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ tap, std::sync::Arc::new(disc), inc, + Some(mgr.clone()), ) .await { - Ok(launch) => Response::PipelineStarted { - pipeline: launch.pipeline, - command_pids: launch.command_pids, - edges: launch - .edge_discernments - .into_iter() - .map(|e| EdgeDiscernmentInfo { - from_label: e.from_label, - from_output: e.from_output, - to_label: e.to_label, - to_input: e.to_input, - ty: e.discernment.as_ref().map(|d| format!("{:?}", d.ty)), - mime: e.discernment.as_ref().and_then(|d| d.mime.clone()), - lens: e.discernment.as_ref().and_then(|d| d.lens.clone()), - confidence: e.discernment.as_ref().map(|d| d.confidence).unwrap_or(0.0), - }) - .collect(), - }, + Ok(launch) => { + let pipeline_id = launch.pipeline; + announce_edges_to_broker(pool.as_deref(), &pipeline_id, &launch.edge_discernments); + let cmds = launch.command_pids; + mgr.register_pipeline_commands(spec.workspace, pipeline_id, cmds.clone()).await; + let edges = launch.edge_discernments.into_iter().map(map_edge_to_info).collect(); + Response::PipelineStarted { + pipeline: pipeline_id, + command_pids: cmds, + edges, + } + } Err(e) => Response::Error { message: format!("{e}") }, } } @@ -240,8 +260,13 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ Response::CommandList { items } } - Request::CommandLogs { workspace, command, tail_bytes } => { - match mgr.get_command_logs(workspace, command, tail_bytes).await { + Request::CommandLogs { workspace, command, tail_bytes, stream } => { + let s = match stream.as_str() { + "stdout" => shipote_core::LogStream::Stdout, + "stderr" => shipote_core::LogStream::Stderr, + _ => shipote_core::LogStream::Both, + }; + match mgr.get_command_logs(workspace, command, tail_bytes, s).await { Some(bytes) => Response::CommandLogs { bytes }, None => Response::Error { message: format!("no logs for command {command} in workspace {workspace}"), @@ -264,8 +289,13 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ Response::PipelineDropped { name, existed } } - Request::PipelineRunSaved { name, tap } => match mgr.get_saved_pipeline(&name).await { + Request::PipelineRunSaved { name, tap, vars } => match mgr.get_saved_pipeline(&name).await { Some(spec) => { + let vars_map: std::collections::HashMap = vars.into_iter().collect(); + let spec = match shipote_card::substitute_vars(&spec, &vars_map) { + Ok(s) => s, + Err(e) => return Response::Error { message: format!("template: {e}") }, + }; let disc = DiscernPipeline::default_pipeline(); let inc = mgr.incarnator_handle(); let ws_label = mgr.workspace_label(spec.workspace).await.unwrap_or_default(); @@ -275,27 +305,22 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ tap, std::sync::Arc::new(disc), inc, + Some(mgr.clone()), ) .await { - Ok(launch) => Response::PipelineStarted { - pipeline: launch.pipeline, - command_pids: launch.command_pids, - edges: launch - .edge_discernments - .into_iter() - .map(|e| EdgeDiscernmentInfo { - from_label: e.from_label, - from_output: e.from_output, - to_label: e.to_label, - to_input: e.to_input, - ty: e.discernment.as_ref().map(|d| format!("{:?}", d.ty)), - mime: e.discernment.as_ref().and_then(|d| d.mime.clone()), - lens: e.discernment.as_ref().and_then(|d| d.lens.clone()), - confidence: e.discernment.as_ref().map(|d| d.confidence).unwrap_or(0.0), - }) - .collect(), - }, + Ok(launch) => { + let pipeline_id = launch.pipeline; + announce_edges_to_broker(pool.as_deref(), &pipeline_id, &launch.edge_discernments); + let cmds = launch.command_pids; + mgr.register_pipeline_commands(spec.workspace, pipeline_id, cmds.clone()).await; + let edges = launch.edge_discernments.into_iter().map(map_edge_to_info).collect(); + Response::PipelineStarted { + pipeline: pipeline_id, + command_pids: cmds, + edges, + } + } Err(e) => Response::Error { message: format!("{e}") }, } } @@ -304,6 +329,45 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ }, }, + Request::PipelineStop { pipeline, grace_ms } => { + let reaped = mgr + .stop_pipeline(pipeline, std::time::Duration::from_millis(grace_ms)) + .await; + Response::PipelineStopped { pipeline, reaped } + } + + Request::WorkspaceStats { workspace } => match mgr.workspace_stats(workspace).await { + Some(s) => Response::WorkspaceStats { + info: WorkspaceStatsInfo { + commands_alive: s.commands_alive, + commands_total: s.commands_total, + rss_bytes: s.rss_bytes, + rss_peak_bytes: s.rss_peak_bytes, + cpu_usec: s.cpu_usec, + source: s.source, + uptime_ms: s.uptime_ms, + }, + }, + None => Response::Error { + message: format!("workspace {workspace} not found"), + }, + }, + + Request::FlowList => { + let items = mgr + .list_flow_pipelines() + .await + .into_iter() + .map(|(pipeline, sockets)| FlowInfo { pipeline, sockets }) + .collect(); + Response::FlowList { items } + } + + Request::FlowDrop { pipeline } => { + let existed = mgr.drop_pipeline_flows(pipeline).await; + Response::FlowDropped { pipeline, existed } + } + Request::Capabilities => { let c = mgr.incarnator().capabilities(); Response::Capabilities { @@ -317,6 +381,69 @@ async fn dispatch(mgr: &Arc, disc: &DiscernPipeline, req: Requ } } +fn map_edge_to_info(e: shipote_core::pipeline::EdgeDiscernment) -> EdgeDiscernmentInfo { + EdgeDiscernmentInfo { + from_label: e.from_label, + from_output: e.from_output, + to_label: e.to_label, + to_input: e.to_input, + ty: e.discernment.as_ref().map(|d| format!("{:?}", d.ty)), + mime: e.discernment.as_ref().and_then(|d| d.mime.clone()), + lens: e.discernment.as_ref().and_then(|d| d.lens.clone()), + confidence: e.discernment.as_ref().map(|d| d.confidence).unwrap_or(0.0), + flow_socket: e.flow_socket, + } +} + +/// Por cada edge con TypeRef detectado, spawneamos una Card efímera en el +/// SidecarPool que se anuncia al broker como producer del TypeRef +/// enriquecido. Esto permite a otros explorers (broker-explorer, etc.) +/// ver que shipote vio JSON/text/wasm/etc. saliendo de un pipeline. +fn announce_edges_to_broker( + pool: Option<&brahman_sidecar::SidecarPool>, + pipeline: &ulid::Ulid, + edges: &[shipote_core::pipeline::EdgeDiscernment], +) { + let Some(pool) = pool else { return }; + for e in edges { + let Some(d) = &e.discernment else { continue }; + let label = format!( + "shipote.flow.{}.{}.{}.{}", + short_ulid(pipeline), + e.from_label, + e.from_output, + type_label(&d.ty) + ); + let mut card = Card::new(label); + card.kind = CardKind::Data; + card.lifecycle = Lifecycle::Oneshot; + card.payload = Payload::Virtual; + card.supervision = Supervision::OneShot; + card.flow = Flows { + input: Vec::new(), + output: vec![Flow { + name: e.from_output.clone(), + ty: d.ty.clone(), + pin_to: None, + }], + }; + pool.spawn(card); + info!(pipeline = %pipeline, from = %e.from_label, ty = ?d.ty, "edge announced to broker"); + } +} + +fn short_ulid(u: &ulid::Ulid) -> String { + let s = u.to_string(); + s[s.len() - 6..].to_string() +} + +fn type_label(t: &TypeRef) -> String { + match t { + TypeRef::Primitive { name } => name.clone(), + TypeRef::Wit { package, name, .. } => format!("{package}.{name}"), + } +} + /// Card del daemon. La presentamos al broker así otras sesiones pueden /// descubrir que shipote está corriendo y, eventualmente, conectarse /// como consumidoras del flow `workspaces` (futuro: que la GUI o el diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index acd4706..952b4cf 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -6,7 +6,8 @@ use gpui::{div, prelude::*, px, Context, IntoElement, Render, SharedString, Window}; use shipote_protocol::{ - default_socket_path, read_frame, write_frame, CommandInfo, Request, Response, WorkspaceSummary, + default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, Request, Response, + WorkspaceStatsInfo, WorkspaceSummary, }; use std::path::PathBuf; use std::time::Duration; @@ -42,10 +43,16 @@ struct Shell { /// Comandos por workspace, indexados por workspace id.toString(). commands: std::collections::BTreeMap>, saved_pipelines: Vec, + flows: Vec, + /// History de RSS por workspace (últimas N samples). + stats_history: std::collections::BTreeMap>, caps: Option, last_probe_ms: u64, + recent_log: Option<(String, String)>, } +const STATS_HISTORY_LEN: usize = 24; + fn main() { launch_app("Shipote — Shell", (820., 560.), Shell::new); } @@ -71,14 +78,36 @@ impl Shell { me.workspaces = snap.workspaces; me.commands = snap.commands; me.saved_pipelines = snap.saved_pipelines; + me.flows = snap.flows; + // Append a la history por workspace. + for (ws_id, fresh) in &snap.fresh_stats { + let h = me + .stats_history + .entry(ws_id.clone()) + .or_default(); + if h.len() >= STATS_HISTORY_LEN { + h.pop_front(); + } + h.push_back(fresh.clone()); + } + // Limpiar history de workspaces que ya no existen. + let alive: std::collections::HashSet = me + .workspaces + .iter() + .map(|w| w.id.to_string()) + .collect(); + me.stats_history.retain(|k, _| alive.contains(k)); me.caps = Some(snap.caps); + me.recent_log = snap.recent_log; } Err(reason) => { me.state = DaemonState::Down { reason }; me.workspaces.clear(); me.commands.clear(); me.saved_pipelines.clear(); + me.flows.clear(); me.caps = None; + me.recent_log = None; } } me.last_probe_ms = elapsed; @@ -95,8 +124,11 @@ impl Shell { workspaces: Vec::new(), commands: std::collections::BTreeMap::new(), saved_pipelines: Vec::new(), + flows: Vec::new(), + stats_history: std::collections::BTreeMap::new(), caps: None, last_probe_ms: 0, + recent_log: None, } } } @@ -106,7 +138,12 @@ struct Snapshot { workspaces: Vec, commands: std::collections::BTreeMap>, saved_pipelines: Vec, + flows: Vec, + /// Stats fresco por workspace (id.toString → stats). + fresh_stats: std::collections::BTreeMap, caps: CapsSummary, + /// tail del log del comando más reciente (label + bytes). None si no hay. + recent_log: Option<(String, String)>, } fn probe_blocking(path: &std::path::Path) -> Result { @@ -132,6 +169,7 @@ fn probe_blocking(path: &std::path::Path) -> Result { // Commands por workspace. let mut commands_map = std::collections::BTreeMap::new(); + let mut fresh_stats = std::collections::BTreeMap::new(); for w in &workspaces { write_frame(&mut stream, &Request::CommandList { workspace: w.id }) .await @@ -144,6 +182,16 @@ fn probe_blocking(path: &std::path::Path) -> Result { commands_map.insert(w.id.to_string(), items); } } + // Stats por workspace. + write_frame(&mut stream, &Request::WorkspaceStats { workspace: w.id }) + .await + .map_err(|e| format!("write stats: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read stats: {e}"))?; + if let Response::WorkspaceStats { info } = resp { + fresh_stats.insert(w.id.to_string(), info); + } } // Saved pipelines. @@ -158,6 +206,68 @@ fn probe_blocking(path: &std::path::Path) -> Result { _ => Vec::new(), }; + // Flow channels activos (data plane). + write_frame(&mut stream, &Request::FlowList) + .await + .map_err(|e| format!("write flows: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read flows: {e}"))?; + let flows = match resp { + Response::FlowList { items } => items, + _ => Vec::new(), + }; + + // Live tail: log del comando más reciente con bytes>0. + let recent_log = { + // Pick: comando con id más alto que tiene log_bytes>0, en cualquier workspace. + let mut best: Option<(&str, &CommandInfo)> = None; + for (ws, cmds) in &commands_map { + for c in cmds { + if c.log_bytes == 0 { + continue; + } + let take = match &best { + Some((_, prev)) => c.id > prev.id, + None => true, + }; + if take { + best = Some((ws.as_str(), c)); + } + } + } + match best { + Some((ws_str, cmd)) => { + let ws_id: shipote_card::WorkspaceId = ws_str + .parse::() + .map(shipote_card::WorkspaceId) + .map_err(|e| format!("ulid parse: {e}"))?; + write_frame( + &mut stream, + &Request::CommandLogs { + workspace: ws_id, + command: cmd.id, + tail_bytes: 512, + stream: "both".into(), + }, + ) + .await + .map_err(|e| format!("write logs: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read logs: {e}"))?; + match resp { + Response::CommandLogs { bytes } => { + let text = String::from_utf8_lossy(&bytes).to_string(); + Some((cmd.label.clone(), text)) + } + _ => None, + } + } + None => None, + } + }; + write_frame(&mut stream, &Request::Capabilities) .await .map_err(|e| format!("write caps: {e}"))?; @@ -182,11 +292,36 @@ fn probe_blocking(path: &std::path::Path) -> Result { workspaces, commands: commands_map, saved_pipelines, + flows, + fresh_stats, caps, + recent_log, }) }) } +/// Render ASCII de sparkline para una serie de valores. `chars` define los +/// glifos (orden ascendente). Auto-scales al máximo de la serie. +fn sparkline(values: &[u64], width: usize) -> String { + if values.is_empty() { + return String::new(); + } + const CHARS: &[char] = &['▁', '▂', '▃', '▄', '▅', '▆', '▇', '█']; + let take = values.len().min(width); + let series = &values[values.len() - take..]; + let max = *series.iter().max().unwrap_or(&1); + if max == 0 { + return "▁".repeat(take); + } + series + .iter() + .map(|v| { + let idx = ((*v as f64 / max as f64) * (CHARS.len() as f64 - 1.0)).round() as usize; + CHARS[idx.min(CHARS.len() - 1)] + }) + .collect() +} + impl Render for Shell { fn render(&mut self, _w: &mut Window, cx: &mut Context) -> impl IntoElement { let theme = Theme::global(cx).clone(); @@ -247,7 +382,28 @@ impl Render for Shell { let ws_items: Vec = self .workspaces .iter() - .map(|w| format!("{} {:<20} cmds={} uptime={}ms", w.id, w.label, w.commands, w.uptime_ms)) + .map(|w| { + let key = w.id.to_string(); + let history = self.stats_history.get(&key); + let rss_series: Vec = history + .map(|h| h.iter().map(|s| s.rss_bytes.unwrap_or(0)).collect()) + .unwrap_or_default(); + let spark = sparkline(&rss_series, STATS_HISTORY_LEN); + let (rss_now, peak) = history + .and_then(|h| h.back()) + .map(|s| (s.rss_bytes.unwrap_or(0), s.rss_peak_bytes.unwrap_or(0))) + .unwrap_or((0, 0)); + let rss_mb = rss_now as f64 / 1024.0 / 1024.0; + let peak_mb = peak as f64 / 1024.0 / 1024.0; + format!( + "{:<14} {:<14} {} {:>6.1}M peak {:>6.1}M", + &w.id.to_string()[20..], + w.label, + spark, + rss_mb, + peak_mb, + ) + }) .collect(); let ws_count = self.workspaces.len().to_string(); let ws_descr = if self.workspaces.is_empty() { @@ -294,6 +450,33 @@ impl Render for Shell { "definiciones reusables vía run-saved".to_string() }; + // Flow channels (data plane). + let flow_count: usize = self.flows.iter().map(|f| f.sockets.len()).sum(); + let flow_items: Vec = self + .flows + .iter() + .flat_map(|f| { + let pipe = f.pipeline.to_string(); + let short = &pipe[pipe.len() - 6..]; + f.sockets + .iter() + .map(move |s| { + format!( + "{short} {}", + s.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| s.display().to_string()) + ) + }) + .collect::>() + }) + .collect(); + let flow_descr = if flow_count == 0 { + "pipelines con --tap exponen sockets aquí".to_string() + } else { + "shipote flow tail para suscribirse".to_string() + }; + let body = div() .flex() .flex_col() @@ -349,8 +532,44 @@ impl Render for Shell { text, text_dim, &saved_items, + )) + .child(stat_card( + cx, + "Flow channels", + flow_count.to_string(), + &flow_descr, + accent_up, + text, + text_dim, + &flow_items, )); + // Live tail del comando más reciente con output. + let live_card = self.recent_log.as_ref().map(|(label, content)| { + // Cortamos a las últimas ~12 líneas para no inflar el panel. + let lines: Vec = content + .lines() + .rev() + .take(12) + .map(|l| l.to_string()) + .collect::>() + .into_iter() + .rev() + .collect(); + stat_card( + cx, + "Live tail", + label.clone(), + "últimas líneas del comando más reciente", + accent_up, + text, + text_dim, + &lines, + ) + }); + + let body = body.when_some(live_card, |d, c| d.child(c)); + div() .flex() .flex_col() diff --git a/crates/modules/shipote/shipote-card/src/lib.rs b/crates/modules/shipote/shipote-card/src/lib.rs index 110c38f..06f936d 100644 --- a/crates/modules/shipote/shipote-card/src/lib.rs +++ b/crates/modules/shipote/shipote-card/src/lib.rs @@ -203,7 +203,7 @@ pub struct FlowEdge { pub to_input: String, } -#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DiscernPolicy { /// Bytes a samplear por flow para el discernidor. Default 4 KiB. #[serde(default = "default_sample_bytes")] @@ -211,6 +211,21 @@ pub struct DiscernPolicy { /// Si `true`, enriquece la Card del producer con el TypeRef detectado. #[serde(default = "default_true")] pub enrich_producer: bool, + /// Chunks que el FlowChannel guarda en replay buffer para subscribers + /// tarde. Default 32. Subir si los productores escriben en ráfagas y + /// querés que los consumidores tardíos vean toda la salida. + #[serde(default = "default_replay_chunks")] + pub replay_chunks: usize, +} + +impl Default for DiscernPolicy { + fn default() -> Self { + Self { + sample_bytes: default_sample_bytes(), + enrich_producer: default_true(), + replay_chunks: default_replay_chunks(), + } + } } fn default_sample_bytes() -> usize { @@ -219,6 +234,9 @@ fn default_sample_bytes() -> usize { fn default_true() -> bool { true } +fn default_replay_chunks() -> usize { + 32 +} // ===================================================================== // Compilación a Card @@ -358,6 +376,116 @@ pub fn load_pipeline_spec(path: &std::path::Path) -> Result, +) -> Result { + if vars.is_empty() { + return Ok(spec.clone()); + } + let mut v = serde_json::to_value(spec)?; + walk_subst(&mut v, vars); + serde_json::from_value(v) +} + +fn walk_subst(v: &mut serde_json::Value, vars: &std::collections::HashMap) { + match v { + serde_json::Value::String(s) => { + *s = subst_str(s, vars); + } + serde_json::Value::Array(arr) => { + for item in arr { + walk_subst(item, vars); + } + } + serde_json::Value::Object(obj) => { + for (_, val) in obj.iter_mut() { + walk_subst(val, vars); + } + } + _ => {} + } +} + +fn subst_str(s: &str, vars: &std::collections::HashMap) -> String { + let mut out = String::with_capacity(s.len()); + let bytes = s.as_bytes(); + let mut i = 0; + while i < bytes.len() { + if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'{' { + // Buscar el cierre `}`. + if let Some(close) = bytes[i + 2..].iter().position(|&b| b == b'}') { + let key = std::str::from_utf8(&bytes[i + 2..i + 2 + close]).unwrap_or(""); + if let Some(val) = vars.get(key) { + out.push_str(val); + i += 2 + close + 1; + continue; + } + } + } + out.push(bytes[i] as char); + i += 1; + } + out +} + +#[cfg(test)] +mod subst_tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn substitute_in_argv_and_label() { + let mut vars = HashMap::new(); + vars.insert("MSG".into(), "hola-mundo".into()); + vars.insert("LABEL".into(), "renamed".into()); + let spec = PipelineSpec { + label: "p-${LABEL}".into(), + workspace: WorkspaceId::new(), + nodes: vec![CommandRef { + label: "node-${LABEL}".into(), + payload: Payload::Native { + exec: "/bin/echo".into(), + argv: vec!["${MSG}".into()], + envp: vec![], + }, + soma: Default::default(), + flows: Default::default(), + supervision: Supervision::OneShot, + }], + edges: vec![], + discern: DiscernPolicy::default(), + }; + let out = substitute_vars(&spec, &vars).unwrap(); + assert_eq!(out.label, "p-renamed"); + assert_eq!(out.nodes[0].label, "node-renamed"); + match &out.nodes[0].payload { + Payload::Native { argv, .. } => assert_eq!(argv[0], "hola-mundo"), + _ => panic!("wrong payload"), + } + } + + #[test] + fn unknown_var_left_intact() { + let vars = HashMap::new(); + let spec = PipelineSpec { + label: "p-${UNDEFINED}".into(), + workspace: WorkspaceId::new(), + nodes: vec![], + edges: vec![], + discern: DiscernPolicy::default(), + }; + let out = substitute_vars(&spec, &vars).unwrap(); + assert_eq!(out.label, "p-${UNDEFINED}"); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/modules/shipote/shipote-core/src/flow_channel.rs b/crates/modules/shipote/shipote-core/src/flow_channel.rs new file mode 100644 index 0000000..a170675 --- /dev/null +++ b/crates/modules/shipote/shipote-core/src/flow_channel.rs @@ -0,0 +1,320 @@ +//! Flow channels: data plane sobre Unix socket por edge enriquecido. +//! +//! Cuando un splitter detecta el TypeRef de un edge, además de replicar a +//! los consumers internos del pipeline, se levanta un FlowChannel que +//! expone los bytes a subscribers externos (otros módulos del fractal). +//! +//! ## Diseño +//! +//! - `tokio::sync::broadcast::channel` para fan-out lock-less entre el +//! splitter (sender) y los N subscribers conectados. +//! - `UnixListener` accept-loop: por cada cliente nuevo, spawn una task +//! que drena el receiver y escribe al socket. +//! - Subscribers lentos pueden perder mensajes (broadcast::Receiver::Lagged) +//! — se loguea warn y se sigue. Esto es deliberado para no bloquear el +//! splitter en consumers lentos. +//! +//! ## Lifetime +//! +//! `FlowChannel` se construye con `new(path)`. Cuando se drop: +//! - El `accept_task` se cancela (vía drop del `tokio::task::JoinHandle` +//! que tenemos abort-on-drop). +//! - El socket file se borra del FS (`Drop` impl). +//! +//! Sender clones son baratos; los subscribers conectados se enteran del +//! cierre cuando todos los senders se dropean. + +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use tokio::io::AsyncWriteExt; +use tokio::net::UnixListener; +use tokio::sync::broadcast; +use tokio::task::AbortHandle; +use tracing::{debug, warn}; + +/// Capacidad del broadcast channel. Si un subscriber está más de N chunks +/// atrasado, queda `Lagged` y empieza a perder mensajes. +const BROADCAST_CAP: usize = 64; + +/// Chunks default del replay buffer. Cuando un cliente nuevo se conecta, +/// recibe hasta estos N chunks antes de iniciar el broadcast live. +/// Override via `FlowChannel::with_replay_cap`. +pub const DEFAULT_REPLAY_CHUNKS: usize = 32; + +pub struct FlowChannel { + sender: broadcast::Sender>>, + replay: Arc>>>>, + replay_cap: usize, + socket_path: PathBuf, + _accept_handle: AbortOnDrop, +} + +#[derive(Clone)] +pub struct FlowSender { + sender: broadcast::Sender>>, + replay: Arc>>>>, + replay_cap: usize, +} + +impl FlowSender { + /// Pushea al broadcast y al replay buffer. Si no hay subscribers, + /// el broadcast::send retorna Err pero igual guardamos en replay + /// (subscribers tarde verán los chunks pasados). + pub fn send(&self, data: Arc>) { + let cap = self.replay_cap; + if let Ok(mut g) = self.replay.lock() { + if g.len() >= cap { + g.pop_front(); + } + g.push_back(data.clone()); + } + let _ = self.sender.send(data); + } +} + +impl std::fmt::Debug for FlowChannel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlowChannel") + .field("socket_path", &self.socket_path) + .field("subscribers", &self.sender.receiver_count()) + .finish() + } +} + +impl FlowChannel { + /// Crea un FlowChannel atado al path `socket_path`. Si el path ya + /// existe, lo borra antes de bind (asume restart limpio). + pub fn new(socket_path: PathBuf) -> std::io::Result { + Self::with_replay_cap(socket_path, DEFAULT_REPLAY_CHUNKS) + } + + pub fn with_replay_cap(socket_path: PathBuf, replay_cap: usize) -> std::io::Result { + let cap = replay_cap.max(1); + if socket_path.exists() { + let _ = std::fs::remove_file(&socket_path); + } + if let Some(parent) = socket_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let listener = UnixListener::bind(&socket_path)?; + let (tx, _rx_unused) = broadcast::channel::>>(BROADCAST_CAP); + let replay: Arc>>>> = + Arc::new(Mutex::new(VecDeque::with_capacity(cap))); + let tx_for_accept = tx.clone(); + let replay_for_accept = replay.clone(); + let path_for_log = socket_path.clone(); + + let join = tokio::spawn(async move { + debug!(path = %path_for_log.display(), "flow channel listening"); + loop { + let (mut stream, _addr) = match listener.accept().await { + Ok(p) => p, + Err(e) => { + warn!(?e, "flow channel accept failed"); + return; + } + }; + // Snapshot del replay buffer Y subscribe al broadcast. + // El orden es crítico: subscribe ANTES de drenar el replay + // para no perder chunks que llegan justo en el medio. + let mut rx = tx_for_accept.subscribe(); + let snapshot: Vec>> = { + let g = replay_for_accept.lock().expect("replay lock"); + g.iter().cloned().collect() + }; + tokio::spawn(async move { + // Fase 1: drenar replay snapshot al subscriber. + for chunk in &snapshot { + if let Err(e) = stream.write_all(chunk).await { + debug!(?e, "flow subscriber dropped during replay"); + return; + } + } + // Fase 2: live broadcast. + loop { + match rx.recv().await { + Ok(chunk) => { + if let Err(e) = stream.write_all(&chunk).await { + debug!(?e, "flow subscriber dropped"); + return; + } + } + Err(broadcast::error::RecvError::Closed) => return, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!(skipped = n, "flow subscriber lagged"); + } + } + } + }); + } + }); + + Ok(Self { + sender: tx, + replay, + replay_cap: cap, + socket_path, + _accept_handle: AbortOnDrop(join.abort_handle()), + }) + } + + /// Push un chunk al channel. Si no hay subscribers, drop silencioso. + /// Siempre se guarda en el replay buffer (con cap rotation). + pub fn send(&self, data: Vec) { + let arc = Arc::new(data); + let cap = self.replay_cap; + if let Ok(mut g) = self.replay.lock() { + if g.len() >= cap { + g.pop_front(); + } + g.push_back(arc.clone()); + } + let _ = self.sender.send(arc); + } + + pub fn socket_path(&self) -> &Path { + &self.socket_path + } + + /// Handle clone-able para que tasks externas (splitter) pushen al + /// channel sin tener ownership del FlowChannel. Cada push se guarda + /// también en el replay buffer. + pub fn sender_handle(&self) -> FlowSender { + FlowSender { + sender: self.sender.clone(), + replay: self.replay.clone(), + replay_cap: self.replay_cap, + } + } + + pub fn subscriber_count(&self) -> usize { + self.sender.receiver_count() + } +} + +impl Drop for FlowChannel { + fn drop(&mut self) { + // El AbortOnDrop cancela el accept loop; sólo nos queda limpiar el + // socket file. + let _ = std::fs::remove_file(&self.socket_path); + } +} + +struct AbortOnDrop(AbortHandle); +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); + } +} + +/// Path canónico para un flow channel: `$XDG_RUNTIME_DIR/shipote-flow-.sock`. +pub fn default_flow_socket_path(id: &str) -> PathBuf { + let base = std::env::var("XDG_RUNTIME_DIR").unwrap_or_else(|_| { + let uid = nix::unistd::getuid().as_raw(); + let p = format!("/run/user/{uid}"); + if std::path::Path::new(&p).exists() { + p + } else { + "/tmp".into() + } + }); + PathBuf::from(base).join(format!("shipote-flow-{id}.sock")) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::AsyncReadExt; + use tokio::net::UnixStream; + + #[tokio::test] + async fn channel_delivers_to_subscriber() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("flow.sock"); + let ch = FlowChannel::new(path.clone()).unwrap(); + + // Subscriber se conecta. + let path_clone = path.clone(); + let task = tokio::spawn(async move { + let mut stream = UnixStream::connect(&path_clone).await.unwrap(); + let mut buf = vec![0u8; 64]; + let n = stream.read(&mut buf).await.unwrap(); + buf.truncate(n); + buf + }); + + // Damos tiempo al accept. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Hasta que haya 1 receiver_count, el send no llega. + for _ in 0..50 { + if ch.subscriber_count() >= 1 { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + } + ch.send(b"hello-flow".to_vec()); + + let received = tokio::time::timeout(std::time::Duration::from_secs(2), task) + .await + .expect("timeout") + .unwrap(); + assert_eq!(received, b"hello-flow"); + } + + #[tokio::test] + async fn replay_buffer_serves_late_subscriber() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("flow.sock"); + let ch = FlowChannel::new(path.clone()).unwrap(); + + // Pushes ANTES de cualquier subscriber: van solo al replay. + ch.send(b"chunk-1".to_vec()); + ch.send(b"chunk-2".to_vec()); + ch.send(b"chunk-3".to_vec()); + + // Subscriber LATE — debe recibir los 3 chunks del replay. + let path_clone = path.clone(); + let task = tokio::spawn(async move { + let mut stream = UnixStream::connect(&path_clone).await.unwrap(); + let mut buf = vec![0u8; 256]; + // Leemos hasta recibir los 3 chunks (21 bytes esperados). + let mut total = Vec::new(); + for _ in 0..20 { + let n = stream.read(&mut buf).await.unwrap(); + if n == 0 { + break; + } + total.extend_from_slice(&buf[..n]); + if total.len() >= 21 { + break; + } + } + total + }); + + let received = tokio::time::timeout(std::time::Duration::from_secs(2), task) + .await + .expect("timeout") + .unwrap(); + let s = String::from_utf8_lossy(&received); + assert!(s.contains("chunk-1"), "got: {s:?}"); + assert!(s.contains("chunk-2"), "got: {s:?}"); + assert!(s.contains("chunk-3"), "got: {s:?}"); + } + + #[tokio::test] + async fn drop_removes_socket() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("flow.sock"); + { + let _ch = FlowChannel::new(path.clone()).unwrap(); + assert!(path.exists()); + } + // Después del drop, el socket file no debe quedar. + // Damos un pelín de tiempo al runtime para que el drop corra + // mientras estamos en task. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + assert!(!path.exists()); + } +} diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 080ac64..9b1af6c 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -11,9 +11,11 @@ // el módulo concreto. #![deny(unsafe_op_in_unsafe_fn)] +pub mod flow_channel; pub mod logbuf; pub mod persist; pub mod pipeline; +pub mod stats; use brahman_card::{Card, Payload, Supervision}; use ente_incarnate::{Incarnator, IncarnatorConfig}; @@ -55,10 +57,22 @@ pub struct CommandState { pub pid: Pid, pub alive: bool, pub exit_status: Option, - /// Ring buffer compartido con la tokio task que drena stdout+stderr - /// del comando. `None` para comandos que no capturan output (futuro: - /// comandos con stdout=inherit). - pub logs: Option, + /// Ring buffer del stdout. `None` para comandos sin captura. + pub stdout: Option, + /// Ring buffer del stderr. Separado de `stdout` para que el CLI + /// pueda filtrarlos. `None` para comandos sin captura. + pub stderr: Option, + /// Si el comando fue lanzado como parte de un Pipeline, su ULID. + pub pipeline_id: Option, +} + +/// Stream a leer en `get_command_logs`. `Both` concatena stderr-después-stdout +/// para una vista combinada (orden temporal aproximado). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogStream { + Stdout, + Stderr, + Both, } pub struct WorkspaceManager { @@ -72,6 +86,11 @@ struct Inner { /// que "pipelines vivos" — son specs guardados para reusar con /// `run-saved`. Sobreviven restart vía snapshot. saved_pipelines: HashMap, + /// Flow channels vivos por pipeline. Se retienen hasta que el + /// pipeline termine — cuando todos los hijos del pipeline murieron, + /// el reaper los borra (futuro). v1: viven hasta `stop_pipeline_flows` + /// explícito o hasta shutdown. + pipeline_flows: HashMap>, } #[derive(Debug, Clone)] @@ -156,11 +175,134 @@ impl WorkspaceManager { inner: Arc::new(Mutex::new(Inner { workspaces: HashMap::new(), saved_pipelines: HashMap::new(), + pipeline_flows: HashMap::new(), })), incarnator: Arc::new(Incarnator::new(cfg)), } } + /// Registra los comandos lanzados por un pipeline en el workspace. + /// Esto permite `pipeline_stop` (matar selectivamente sólo los pids + /// de un pipeline). `pipeline_id` se setea en cada CommandState. + pub async fn register_pipeline_commands( + &self, + workspace: WorkspaceId, + pipeline_id: Ulid, + commands: Vec<(String, i32)>, + ) { + let mut g = self.inner.lock().await; + let Some(ws) = g.workspaces.get_mut(&workspace) else { return }; + for (label, pid) in commands { + let cmd_id = Ulid::new(); + ws.commands.insert( + cmd_id, + CommandState { + id: cmd_id, + label, + pid: Pid::from_raw(pid), + alive: true, + exit_status: None, + stdout: None, + stderr: None, + pipeline_id: Some(pipeline_id), + }, + ); + } + } + + /// Detiene selectivamente los comandos de un pipeline. SIGTERM → + /// `grace` → SIGKILL. Devuelve cantidad reapeada. Si no hay comandos + /// del pipeline en ningún workspace, retorna 0. + pub async fn stop_pipeline( + &self, + pipeline_id: Ulid, + grace: std::time::Duration, + ) -> u32 { + // 1) Recolectamos pids de ese pipeline en todos los workspaces. + let mut targets: Vec = Vec::new(); + { + let g = self.inner.lock().await; + for ws in g.workspaces.values() { + for cmd in ws.commands.values() { + if cmd.alive && cmd.pipeline_id == Some(pipeline_id) { + targets.push(cmd.pid); + } + } + } + } + if targets.is_empty() { + return 0; + } + let initial = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM }; + for pid in &targets { + let _ = kill(*pid, initial); + } + let mut reaped = 0u32; + let mut still = targets.clone(); + let deadline = std::time::Instant::now() + grace; + let poll = std::time::Duration::from_millis(20); + while !still.is_empty() && std::time::Instant::now() < deadline { + still.retain(|pid| match waitpid(*pid, Some(WaitPidFlag::WNOHANG)) { + Ok(WaitStatus::StillAlive) => true, + Ok(_) => { + reaped += 1; + false + } + Err(_) => false, + }); + if !still.is_empty() { + tokio::time::sleep(poll).await; + } + } + for pid in &still { + let _ = kill(*pid, Signal::SIGKILL); + let _ = waitpid(*pid, None); + reaped += 1; + } + // Marcar como dead en estado in-memory. + let mut g = self.inner.lock().await; + for ws in g.workspaces.values_mut() { + for cmd in ws.commands.values_mut() { + if cmd.pipeline_id == Some(pipeline_id) && cmd.alive { + cmd.alive = false; + } + } + } + // Drop flows del pipeline. + g.pipeline_flows.remove(&pipeline_id); + info!(%pipeline_id, reaped, "pipeline stopped"); + reaped + } + + /// Retiene los FlowChannels de un pipeline para que sobrevivan al + /// fin del request. Drop = cierre del data plane. + pub async fn retain_pipeline_flows( + &self, + pipeline: Ulid, + flows: Vec, + ) { + self.inner.lock().await.pipeline_flows.insert(pipeline, flows); + } + + /// Lista pipelines vivos con sus sockets activos. + pub async fn list_flow_pipelines(&self) -> Vec<(Ulid, Vec)> { + let g = self.inner.lock().await; + g.pipeline_flows + .iter() + .map(|(id, flows)| { + ( + *id, + flows.iter().map(|f| f.socket_path().to_path_buf()).collect(), + ) + }) + .collect() + } + + /// Cierra el data plane de un pipeline (drop = remove_file de sockets). + pub async fn drop_pipeline_flows(&self, pipeline: Ulid) -> bool { + self.inner.lock().await.pipeline_flows.remove(&pipeline).is_some() + } + pub fn incarnator(&self) -> &Incarnator { &self.incarnator } @@ -208,6 +350,35 @@ impl WorkspaceManager { .map(|w| w.spec.label.clone()) } + /// Estadísticas de recursos del workspace: RSS + CPU agregado de sus + /// comandos vivos. Lee `/proc//` directamente; si el spec declara + /// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye + /// descendants). + pub async fn workspace_stats(&self, id: WorkspaceId) -> Option { + let g = self.inner.lock().await; + let ws = g.workspaces.get(&id)?; + let alive: Vec = ws + .commands + .values() + .filter(|c| c.alive) + .map(|c| c.pid.as_raw()) + .collect(); + let total = ws.commands.len() as u32; + let cgroup_path = if ws.spec.soma.cgroup.path.is_empty() { + None + } else { + // resolve_cgroup_path está en ente_incarnate, pero acá basta + // con el path absoluto bajo /sys/fs/cgroup. Resolución gruesa. + Some(std::path::PathBuf::from(format!( + "/sys/fs/cgroup{}", + ws.spec.soma.cgroup.path + ))) + }; + let mut s = stats::measure(&alive, cgroup_path.as_deref(), ws.started); + s.commands_total = total; + Some(s) + } + pub async fn create( self: &Arc, spec: WorkspaceSpec, @@ -269,31 +440,66 @@ impl WorkspaceManager { } pub async fn stop(&self, id: WorkspaceId) -> Result { + self.stop_with_grace(id, std::time::Duration::from_millis(1000)).await + } + + /// Variante con tiempo de gracia configurable. SIGTERM → espera `grace` + /// → SIGKILL si quedan vivos. `grace=0` = SIGKILL inmediato. + pub async fn stop_with_grace( + &self, + id: WorkspaceId, + grace: std::time::Duration, + ) -> Result { let mut g = self.inner.lock().await; let ws = g.workspaces.remove(&id).ok_or(CoreError::WorkspaceNotFound(id))?; + // También limpiamos flow_channels del workspace si los hubiera — + // por workspace lo retenemos por pipeline, no por workspace. + drop(g); + + // 1) SIGTERM (o SIGKILL si grace=0) a todos vivos. + let initial_signal = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM }; + let alive_pids: Vec = ws + .commands + .values() + .filter(|c| c.alive) + .map(|c| c.pid) + .collect(); + for pid in &alive_pids { + let _ = kill(*pid, initial_signal); + } + + // 2) Esperar hasta `grace` haciendo polling WNOHANG. let mut reaped = 0u32; - for (_cid, cmd) in ws.commands { - if cmd.alive { - let _ = kill(cmd.pid, Signal::SIGTERM); - // Cosecha sin bloquear infinito: WNOHANG en loop con un par de intentos. - for _ in 0..50 { - match waitpid(cmd.pid, Some(WaitPidFlag::WNOHANG)) { - Ok(WaitStatus::StillAlive) => { - std::thread::sleep(std::time::Duration::from_millis(20)); - } - Ok(_) => { - reaped += 1; - break; - } - Err(_) => break, - } + let mut still_alive: Vec = alive_pids.clone(); + let deadline = std::time::Instant::now() + grace; + let poll_interval = std::time::Duration::from_millis(20); + while !still_alive.is_empty() && std::time::Instant::now() < deadline { + still_alive.retain(|pid| match waitpid(*pid, Some(WaitPidFlag::WNOHANG)) { + Ok(WaitStatus::StillAlive) => true, + Ok(_) => { + reaped += 1; + false } - // Último recurso: SIGKILL. - let _ = kill(cmd.pid, Signal::SIGKILL); - let _ = waitpid(cmd.pid, None); + Err(_) => false, + }); + if !still_alive.is_empty() { + tokio::time::sleep(poll_interval).await; } } - info!(%id, reaped, "workspace stopped"); + + // 3) SIGKILL forzoso a los que quedan, y wait blocking. + for pid in &still_alive { + let _ = kill(*pid, Signal::SIGKILL); + let _ = waitpid(*pid, None); + reaped += 1; + } + info!( + %id, + reaped, + grace_ms = grace.as_millis() as u64, + sigkilled = still_alive.len(), + "workspace stopped" + ); Ok(reaped) } @@ -321,31 +527,36 @@ impl WorkspaceManager { }; let card = cmd_ref.to_card(0, &workspace_label)?; - // Pipe para capturar stdout. O_CLOEXEC para que hijos del hijo - // no hereden la copia. v1: stderr=inherit (simplicidad; tail útil - // para stdout solo). Futuro: stderr separado en el ring. - let (capture_r, capture_w) = + // Dos pipes O_CLOEXEC: uno para stdout, otro para stderr. + use std::os::fd::IntoRawFd; + let (sout_r, sout_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).map_err(|e| { CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) })?; - use std::os::fd::IntoRawFd; - let capture_r_fd = capture_r.into_raw_fd(); - let capture_w_fd = capture_w.into_raw_fd(); + let (serr_r, serr_w) = + nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).map_err(|e| { + CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) + })?; + let sout_r_fd = sout_r.into_raw_fd(); + let sout_w_fd = sout_w.into_raw_fd(); + let serr_r_fd = serr_r.into_raw_fd(); + let serr_w_fd = serr_w.into_raw_fd(); - let logs = logbuf::LogBuf::new(); + let stdout_buf = logbuf::LogBuf::new(); + let stderr_buf = logbuf::LogBuf::new(); let stdio = ente_incarnate::ChildStdio { stdin_fd: None, - stdout_fd: Some(capture_w_fd), - stderr_fd: None, + stdout_fd: Some(sout_w_fd), + stderr_fd: Some(serr_w_fd), }; let out = self.incarnator.incarnate_with(&card, stdio)?; let cmd_id = card.id; let cmd_label = cmd_ref.label.clone(); let pid = out.pid; - // Drainer: tokio task que lee capture_r_fd y appendea al ring. - spawn_log_drainer(capture_r_fd, logs.clone()); + spawn_log_drainer(sout_r_fd, stdout_buf.clone()); + spawn_log_drainer(serr_r_fd, stderr_buf.clone()); let mut g = self.inner.lock().await; if let Some(ws) = g.workspaces.get_mut(&id) { @@ -357,7 +568,9 @@ impl WorkspaceManager { pid, alive: true, exit_status: None, - logs: Some(logs), + stdout: Some(stdout_buf), + stderr: Some(stderr_buf), + pipeline_id: None, }, ); } @@ -372,16 +585,28 @@ impl WorkspaceManager { } /// Devuelve el tail del log capturado para `(workspace, command)`. + /// `stream` selecciona stdout/stderr/both. pub async fn get_command_logs( &self, workspace: WorkspaceId, command: Ulid, tail_bytes: usize, + stream: LogStream, ) -> Option> { let g = self.inner.lock().await; let ws = g.workspaces.get(&workspace)?; let cmd = ws.commands.get(&command)?; - cmd.logs.as_ref().map(|lb| lb.tail(tail_bytes)) + match stream { + LogStream::Stdout => cmd.stdout.as_ref().map(|lb| lb.tail(tail_bytes)), + LogStream::Stderr => cmd.stderr.as_ref().map(|lb| lb.tail(tail_bytes)), + LogStream::Both => { + let so = cmd.stdout.as_ref().map(|lb| lb.tail(tail_bytes)).unwrap_or_default(); + let se = cmd.stderr.as_ref().map(|lb| lb.tail(tail_bytes)).unwrap_or_default(); + let mut out = so; + out.extend_from_slice(&se); + Some(out) + } + } } /// Lista comandos de un workspace. @@ -397,7 +622,8 @@ impl WorkspaceManager { pid: c.pid.as_raw(), alive: c.alive, exit_status: c.exit_status, - log_bytes: c.logs.as_ref().map(|l| l.written_total()).unwrap_or(0), + log_bytes: c.stdout.as_ref().map(|l| l.written_total()).unwrap_or(0) + + c.stderr.as_ref().map(|l| l.written_total()).unwrap_or(0), }) .collect(); // Orden estable por ULID (temporal). @@ -435,7 +661,9 @@ impl WorkspaceManager { pid: out.pid, alive: true, exit_status: None, - logs: None, // run_pipeline NO captura logs (los conecta por pipes). + stdout: None, // run_pipeline NO captura (conecta por pipes). + stderr: None, + pipeline_id: None, }, ); } @@ -538,19 +766,16 @@ mod tests { }; let (id, _) = mgr.create(spec).await.unwrap(); let summary = mgr - .run( - id, - "/bin/echo".into(), - vec!["captured-output".into()], - vec![], - ) + .run(id, "/bin/echo".into(), vec!["captured-output".into()], vec![]) .await .unwrap(); - // Esperamos a que el comando termine y el drainer drene. for _ in 0..50 { tokio::time::sleep(std::time::Duration::from_millis(20)).await; mgr.reap_dead().await; - let logs = mgr.get_command_logs(id, summary.id, 0).await.unwrap_or_default(); + let logs = mgr + .get_command_logs(id, summary.id, 0, LogStream::Stdout) + .await + .unwrap_or_default(); if !logs.is_empty() { let s = String::from_utf8_lossy(&logs); assert!(s.contains("captured-output"), "got: {s:?}"); @@ -560,6 +785,52 @@ mod tests { panic!("logs never captured"); } + #[tokio::test] + async fn run_captures_stderr_separately() { + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let spec = WorkspaceSpec { + label: "stderr".into(), + soma: Default::default(), + permissions: Default::default(), + ttl: None, + flow_dirs: vec![], + on_exit: shipote_card::ExitPolicy::Reap, + }; + let (id, _) = mgr.create(spec).await.unwrap(); + // sh -c "echo OUT; echo ERR >&2" + let summary = mgr + .run( + id, + "/bin/sh".into(), + vec!["-c".into(), "echo OUT; echo ERR >&2".into()], + vec![], + ) + .await + .unwrap(); + for _ in 0..50 { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + mgr.reap_dead().await; + let so = mgr + .get_command_logs(id, summary.id, 0, LogStream::Stdout) + .await + .unwrap_or_default(); + let se = mgr + .get_command_logs(id, summary.id, 0, LogStream::Stderr) + .await + .unwrap_or_default(); + if !so.is_empty() && !se.is_empty() { + let so_s = String::from_utf8_lossy(&so); + let se_s = String::from_utf8_lossy(&se); + assert!(so_s.contains("OUT"), "stdout: {so_s:?}"); + assert!(se_s.contains("ERR"), "stderr: {se_s:?}"); + assert!(!so_s.contains("ERR"), "stdout no debería tener ERR"); + assert!(!se_s.contains("OUT"), "stderr no debería tener OUT"); + return; + } + } + panic!("logs never captured on both streams"); + } + #[tokio::test] async fn run_true_in_workspace() { let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index 9554cd1..90c84de 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -12,7 +12,7 @@ use brahman_card::Payload; use ente_incarnate::{ChildStdio, Incarnator}; use nix::fcntl::OFlag; use nix::unistd::pipe2; -use shipote_card::{FlowEdge, PipelineSpec}; +use shipote_card::PipelineSpec; use shipote_discern::{DiscernPipeline, Discernment, Hint}; use std::os::fd::{AsRawFd, IntoRawFd, RawFd}; use std::sync::Arc; @@ -22,7 +22,7 @@ use tracing::{debug, info, warn}; use ulid::Ulid; /// Resultado de lanzar un pipeline. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct PipelineLaunch { pub pipeline: Ulid, pub command_pids: Vec<(String, i32)>, @@ -37,19 +37,29 @@ pub struct EdgeDiscernment { pub to_label: String, pub to_input: String, pub discernment: Option, + /// Path del Unix socket donde otros módulos pueden suscribirse al + /// stream replicado por este edge. `None` cuando tap=false (no hay + /// data plane porque no hay sampling). + pub flow_socket: Option, } /// Lanza un pipeline conectando nodos por stdin/stdout. Cada nodo se /// encarna via `Incarnator` (con o sin namespacing según su SomaSpec). /// -/// v1: pipeline lineal (un edge entrante por nodo). Múltiples edges -/// entrantes generan warning y sólo el primero se honra. +/// Soporta: +/// - Pipeline lineal (1 producer → 1 consumer). +/// - **Fan-out** (1 producer → N consumers): shipote interpone un +/// splitter que duplica bytes a cada destino. Cuando `tap=true`, el +/// splitter además samplea para discernir. +/// - Múltiples predecessors por nodo NO se soporta aún (fan-in): sólo se +/// honra el primer edge entrante. pub async fn run_pipeline( spec: &PipelineSpec, workspace_label: &str, tap: bool, discerner: Arc, incarnator: Arc, + manager: Option>, ) -> Result { spec.validate()?; let n = spec.nodes.len(); @@ -60,30 +70,100 @@ pub async fn run_pipeline( "launching pipeline (incarnated)" ); - // Predecessor: para cada nodo, su edge entrante (si tiene). - let mut predecessor: Vec> = vec![None; n]; - for e in &spec.edges { - if predecessor[e.to].is_some() { - warn!(node = e.to, "v1 pipeline: nodo con múltiples predecessors — sólo se honra el primero"); - continue; - } - predecessor[e.to] = Some(e); + // Pre-compute grafo: + // - `consumers[i]` = índices de edges salientes de `i`. + // - `predecessors[j]` = índices de edges entrantes a `j`. + let mut consumers: Vec> = vec![Vec::new(); n]; + let mut predecessors: Vec> = vec![Vec::new(); n]; + for (idx, e) in spec.edges.iter().enumerate() { + consumers[e.from].push(idx); + predecessors[e.to].push(idx); } - let mut pids = Vec::with_capacity(n); - let mut taps: Vec = Vec::new(); - // Para cada nodo i que produce, guardamos el FD de read del pipe - // del productor → al armar el consumidor lo consume. - // Pero como puede haber tap intermedio, llevamos un esquema: - // - Sin tap: read FD del pipe productor → stdin del consumidor. - // - Con tap: read FD del pipe productor → tokio proxy → write FD - // del pipe consumidor → stdin del consumidor. - // Para simplicidad lineal, `pending_stdin_for_next` guarda el FD que - // el siguiente consumidor debe usar como stdin. - let mut pending_stdin_for_next: Option = None; + // Por cada edge: par (r_to_consumer, w_from_producer_side). + // El consumer recibe r_to_consumer; el producer escribe a w_from_producer_side + // (directa o vía splitter). + let mut edge_r: Vec = vec![-1; spec.edges.len()]; + let mut edge_w: Vec = vec![-1; spec.edges.len()]; + for i in 0..spec.edges.len() { + let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| { + CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) + })?; + edge_r[i] = r.into_raw_fd(); + edge_w[i] = w.into_raw_fd(); + } + let mut consumer_stdin_fd: Vec> = vec![None; n]; + let mut producer_stdout_fd: Vec> = vec![None; n]; + let mut splitter_specs: Vec = Vec::new(); + let mut merger_specs: Vec = Vec::new(); + + // Stdout del producer: directo a edge_w[único] si tiene 1 consumer y NO tap; + // sino, pipe propio que va al splitter task. + for i in 0..n { + if consumers[i].is_empty() { + continue; + } + if consumers[i].len() == 1 && !tap { + producer_stdout_fd[i] = Some(edge_w[consumers[i][0]]); + continue; + } + // Splitter: pipe propio para el productor → splitter lee y replica a edge_w[*]. + let (prod_r, prod_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| { + CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) + })?; + producer_stdout_fd[i] = Some(prod_w.into_raw_fd()); + let prod_r_fd = prod_r.into_raw_fd(); + let mut consumer_writes: Vec = Vec::with_capacity(consumers[i].len()); + let mut edge_meta: Vec = Vec::with_capacity(consumers[i].len()); + for edge_idx in &consumers[i] { + let edge = &spec.edges[*edge_idx]; + consumer_writes.push(edge_w[*edge_idx]); + edge_meta.push(EdgeMeta { + from_label: spec.nodes[edge.from].label.clone(), + from_output: edge.from_output.clone(), + to_label: spec.nodes[edge.to].label.clone(), + to_input: edge.to_input.clone(), + }); + } + splitter_specs.push(SplitterSpec { + producer_r_fd: prod_r_fd, + consumer_w_fds: consumer_writes, + edges: edge_meta, + tap, + sample_bytes: spec.discern.sample_bytes, + }); + } + + // Stdin del consumer: edge_r[único] si tiene 1 predecessor; sino, merger. + for j in 0..n { + match predecessors[j].len() { + 0 => {} + 1 => { + consumer_stdin_fd[j] = Some(edge_r[predecessors[j][0]]); + } + _ => { + // Merger: lee de N edge_r y escribe a un nuevo pipe cuyo + // read end es el stdin del consumer. + let (cons_r, cons_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| { + CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) + })?; + consumer_stdin_fd[j] = Some(cons_r.into_raw_fd()); + let inputs: Vec = predecessors[j] + .iter() + .map(|eidx| edge_r[*eidx]) + .collect(); + merger_specs.push(MergerSpec { + producer_r_fds: inputs, + consumer_w_fd: cons_w.into_raw_fd(), + }); + } + } + } + + // Encarnamos cada nodo con su stdin/stdout fd asignado. + let mut pids = Vec::with_capacity(n); for (i, node) in spec.nodes.iter().enumerate() { - // Validar payload ejecutable. match &node.payload { Payload::Native { .. } | Payload::Legacy { .. } => {} _ => { @@ -92,91 +172,98 @@ pub async fn run_pipeline( )) } } - - // Compilamos a Card. let card = node.to_card(i, workspace_label)?; - - // ¿Soy productor? Necesito stdout_fd hacia un pipe nuevo. - let i_is_producer = spec.edges.iter().any(|e| e.from == i); - let stdin_fd: Option = pending_stdin_for_next.take(); - let mut stdout_fd: Option = None; - let mut next_pending: Option = None; - - // FDs que el PADRE debe cerrar tras spawn (son nuestra copia del - // extremo que pasamos al hijo). - let mut parent_closes: Vec = Vec::new(); - - if i_is_producer { - let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| { - CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) - })?; - let r_raw = r.into_raw_fd(); - let w_raw = w.into_raw_fd(); - stdout_fd = Some(w_raw); - parent_closes.push(w_raw); - - if tap { - // Necesitamos un segundo pipe entre tap y consumidor. - let (r2, w2) = pipe2(OFlag::O_CLOEXEC).map_err(|e| { - CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e)) - })?; - let r2_raw = r2.into_raw_fd(); - let w2_raw = w2.into_raw_fd(); - next_pending = Some(r2_raw); - // El tap lee de r_raw y escribe a w2_raw. - let edge = predecessor - .iter() - .find_map(|p| *p) - .and_then(|e| if e.from == i { Some(e) } else { None }) - // Edge donde i es from: - .or_else(|| spec.edges.iter().find(|e| e.from == i)); - let from_label = node.label.clone(); - let to_label = edge - .map(|e| spec.nodes[e.to].label.clone()) - .unwrap_or_default(); - let from_output = edge.map(|e| e.from_output.clone()).unwrap_or_default(); - let to_input = edge.map(|e| e.to_input.clone()).unwrap_or_default(); - let sample_bytes = spec.discern.sample_bytes; - let disc = discerner.clone(); - let h = spawn_tap( - r_raw, w2_raw, sample_bytes, disc, from_label, from_output, to_label, to_input, - ); - taps.push(h); - // r_raw y w2_raw pasaron a manos del tokio task. No los - // cerramos en el padre. - } else { - // Sin tap, el read del productor va directo al stdin del - // siguiente consumidor. - next_pending = Some(r_raw); - } - } - let stdio = ChildStdio { - stdin_fd, - stdout_fd, + stdin_fd: consumer_stdin_fd[i], + stdout_fd: producer_stdout_fd[i], stderr_fd: None, }; - - // Incarnator absorbe los fds de `stdio` — no los cerramos acá. - // `parent_closes` queda obsoleto. - let _ = parent_closes; let outcome = incarnator .incarnate_with(&card, stdio) .map_err(CoreError::Incarnate)?; let pid = outcome.pid; pids.push((node.label.clone(), pid.as_raw())); debug!(label = %node.label, pid = pid.as_raw(), "node incarnated"); - - pending_stdin_for_next = next_pending; } - let pipeline_id = Ulid::new(); + let pipeline_id_for_flows = Ulid::new(); + // Si tap=true, creamos un FlowChannel por edge para el data plane. + // Cada splitter pushea al sender del channel correspondiente. + let pipeline_id = pipeline_id_for_flows; + let mut flow_channels: Vec = Vec::new(); + let mut splitter_channels: Vec>> = + Vec::with_capacity(splitter_specs.len()); + let mut edge_socket_for_splitter: Vec>> = Vec::new(); + for s in &splitter_specs { + let mut senders_per_edge = Vec::with_capacity(s.edges.len()); + let mut paths_per_edge = Vec::with_capacity(s.edges.len()); + for (i, em) in s.edges.iter().enumerate() { + if !s.tap { + senders_per_edge.push(None); + paths_per_edge.push(None); + continue; + } + let id = format!( + "{}-{}-{}-{}", + short_ulid(&pipeline_id), + em.from_label, + em.from_output, + i + ); + let socket = crate::flow_channel::default_flow_socket_path(&id); + match crate::flow_channel::FlowChannel::with_replay_cap(socket.clone(), spec.discern.replay_chunks) { + Ok(fc) => { + senders_per_edge.push(Some(fc.sender_handle())); + paths_per_edge.push(Some(socket)); + flow_channels.push(fc); + } + Err(e) => { + warn!(?e, "flow channel new failed"); + senders_per_edge.push(None); + paths_per_edge.push(None); + } + } + } + splitter_channels.push(senders_per_edge); + edge_socket_for_splitter.push(paths_per_edge); + } - let mut edge_discernments = Vec::with_capacity(taps.len()); - for t in taps { - match t.handle.await { - Ok(d) => edge_discernments.push(d), - Err(e) => warn!(?e, "tap handle joined with error"), + // Registramos los flow_channels en el manager AHORA, antes de await + // las tasks. Esto permite que clientes externos hagan `flow list` y + // se suscriban mientras el pipeline aún produce data. + if let Some(mgr) = &manager { + if !flow_channels.is_empty() { + let drained: Vec = flow_channels.drain(..).collect(); + mgr.retain_pipeline_flows(pipeline_id, drained).await; + } + } + + // Spawn mergers + splitters después del incarnate. Cada task posee + // sus fds y los cierra al terminar (via Drop de OwnedFd). + let mut merger_handles: Vec> = Vec::new(); + for m in merger_specs { + merger_handles.push(spawn_merger(m)); + } + let mut tap_handles: Vec = Vec::new(); + for (s, senders) in splitter_specs.into_iter().zip(splitter_channels.into_iter()) { + tap_handles.push(spawn_splitter(s, discerner.clone(), senders)); + } + + let mut edge_discernments = Vec::new(); + for (h, paths) in tap_handles.into_iter().zip(edge_socket_for_splitter.into_iter()) { + match h.handle.await { + Ok(eds) => { + for (mut ed, path) in eds.into_iter().zip(paths.into_iter()) { + ed.flow_socket = path; + edge_discernments.push(ed); + } + } + Err(e) => warn!(?e, "splitter handle joined with error"), + } + } + for h in merger_handles { + if let Err(e) = h.await { + warn!(?e, "merger handle joined with error"); } } @@ -187,57 +274,156 @@ pub async fn run_pipeline( }) } -struct TapHandle { - handle: tokio::task::JoinHandle, +fn short_ulid(u: &Ulid) -> String { + let s = u.to_string(); + s[s.len() - 6..].to_string() } -#[allow(clippy::too_many_arguments)] -fn spawn_tap( - producer_r_fd: RawFd, - consumer_w_fd: RawFd, - sample_bytes: usize, - discerner: Arc, +#[derive(Debug, Clone)] +struct EdgeMeta { from_label: String, from_output: String, to_label: String, to_input: String, -) -> TapHandle { - // Marcar non-blocking ANTES de envolverlos en AsyncFd. Sino tokio - // bloquea el reactor en operaciones lentas. - set_nonblocking(producer_r_fd); - set_nonblocking(consumer_w_fd); +} + +struct SplitterSpec { + producer_r_fd: RawFd, + consumer_w_fds: Vec, + edges: Vec, + tap: bool, + sample_bytes: usize, +} + +struct SplitterHandle { + handle: tokio::task::JoinHandle>, +} + +struct MergerSpec { + producer_r_fds: Vec, + consumer_w_fd: RawFd, +} + +fn spawn_merger(spec: MergerSpec) -> tokio::task::JoinHandle<()> { + for fd in &spec.producer_r_fds { + set_nonblocking(*fd); + } + set_nonblocking(spec.consumer_w_fd); + // Patrón: una task lectora por cada producer reenvía bytes a un mpsc. + // El merger principal consume del mpsc y escribe al consumer. + // Esto evita el "block en reader idle" del enfoque round-robin sobre + // AsyncFd::ready() (los readers idle nunca dejan turno). + tokio::spawn(async move { + let (tx, mut rx) = tokio::sync::mpsc::channel::>(32); + let nr = spec.producer_r_fds.len(); + for fd in spec.producer_r_fds { + let tx = tx.clone(); + tokio::spawn(async move { + // SAFETY: ownership transferida. + let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) }; + let r = match AsyncFd::with_interest(owned, Interest::READABLE) { + Ok(a) => a, + Err(e) => { + warn!(?e, "merger reader AsyncFd"); + return; + } + }; + let mut buf = [0u8; 4096]; + loop { + match async_read(&r, &mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx.send(buf[..n].to_vec()).await.is_err() { + break; + } + } + Err(_) => break, + } + } + // Drop de tx → cuando todos los readers cerraron, el rx + // recibe None y el merger termina. + }); + } + drop(tx); // sólo los reader tasks tienen sus clones ahora. + + // SAFETY: ownership transferida al task. + let w_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.consumer_w_fd) }; + let w = match AsyncFd::with_interest(w_owned, Interest::WRITABLE) { + Ok(a) => a, + Err(e) => { + warn!(?e, "merger AsyncFd w"); + return; + } + }; + + let mut total: u64 = 0; + while let Some(chunk) = rx.recv().await { + if async_write_all(&w, &chunk).await.is_err() { + return; + } + total += chunk.len() as u64; + } + debug!(bytes = total, readers = nr, "merger finished"); + }) +} + +fn spawn_splitter( + spec: SplitterSpec, + discerner: Arc, + edge_senders: Vec>, +) -> SplitterHandle { + set_nonblocking(spec.producer_r_fd); + for fd in &spec.consumer_w_fds { + set_nonblocking(*fd); + } let handle = tokio::spawn(async move { - // SAFETY: el caller transfiere ownership de los fds al task. - let r_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(producer_r_fd) }; - let w_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(consumer_w_fd) }; - let r = AsyncFd::with_interest(r_std, Interest::READABLE).expect("AsyncFd r"); - let w = AsyncFd::with_interest(w_std, Interest::WRITABLE).expect("AsyncFd w"); + // SAFETY: ownership transferida al task. + let r_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.producer_r_fd) }; + let r = match AsyncFd::with_interest(r_owned, Interest::READABLE) { + Ok(a) => a, + Err(e) => { + warn!(?e, "splitter AsyncFd r"); + return Vec::new(); + } + }; + let mut writers: Vec> = Vec::with_capacity(spec.consumer_w_fds.len()); + for fd in spec.consumer_w_fds { + let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) }; + match AsyncFd::with_interest(owned, Interest::WRITABLE) { + Ok(a) => writers.push(a), + Err(e) => warn!(?e, "splitter AsyncFd w"), + } + } - let mut sample: Vec = Vec::with_capacity(sample_bytes); + let mut sample: Vec = Vec::with_capacity(spec.sample_bytes); let mut buf = [0u8; 4096]; let mut total: u64 = 0; - - // Fase 1: sampling + pump. let mut eof = false; - while !eof && sample.len() < sample_bytes { + + // Fase 1: sampling (sólo si tap=true) + replicación. + while !eof && (spec.tap && sample.len() < spec.sample_bytes) { let n = match async_read(&r, &mut buf).await { Ok(0) => { eof = true; 0 } Ok(n) => n, - Err(e) => { warn!(?e, "tap producer read failed"); break; } + Err(e) => { warn!(?e, "splitter read"); break; } }; if n == 0 { break; } - let take = n.min(sample_bytes - sample.len()); - sample.extend_from_slice(&buf[..take]); - if let Err(e) = async_write_all(&w, &buf[..n]).await { - warn!(?e, "tap consumer write failed"); - break; + if spec.tap { + let take = n.min(spec.sample_bytes - sample.len()); + sample.extend_from_slice(&buf[..take]); } + broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; } - let d = discerner.discern(&sample, &Hint { path: None, size_total: None }); - // Fase 2: pump-only hasta EOF. + let d = if spec.tap { + discerner.discern(&sample, &Hint { path: None, size_total: None }) + } else { + None + }; + + // Fase 2: replicación pura. while !eof { let n = match async_read(&r, &mut buf).await { Ok(0) => { eof = true; 0 } @@ -245,19 +431,50 @@ fn spawn_tap( Err(_) => break, }; if n == 0 { break; } - if async_write_all(&w, &buf[..n]).await.is_err() { break; } + broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; } - debug!(bytes = total, "tap finished"); - EdgeDiscernment { - from_label, - from_output, - to_label, - to_input, - discernment: d, - } + debug!(bytes = total, consumers = writers.len(), "splitter finished"); + + // Mismo discernment para todos los edges del splitter (es el mismo + // stream replicado). Devolvemos N entries (una por edge) para que + // la UI/CLI los liste todos. flow_socket lo rellena el caller. + spec.edges + .into_iter() + .map(|em| EdgeDiscernment { + from_label: em.from_label, + from_output: em.from_output, + to_label: em.to_label, + to_input: em.to_input, + discernment: d.clone(), + flow_socket: None, + }) + .collect() }); - TapHandle { handle } + SplitterHandle { handle } +} + +async fn broadcast_chunk( + writers: &[AsyncFd], + edge_senders: &[Option], + data: &[u8], +) { + // Internal pipes a los consumers del pipeline. + for w in writers { + let _ = async_write_all(w, data).await; + } + // Externos: broadcast a subscribers vía FlowChannel. + // Cada edge tiene su propio sender (mismo data — el sample/discernment + // viaja por broadcast separados para que un subscriber por edge vea su + // stream específico). + if edge_senders.iter().any(|s| s.is_some()) { + let shared = std::sync::Arc::new(data.to_vec()); + for s in edge_senders { + if let Some(s) = s { + let _ = s.send(shared.clone()); + } + } + } } async fn async_read( @@ -377,7 +594,7 @@ mod tests { }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); - let launch = run_pipeline(&spec, "ws", false, disc, inc).await.unwrap(); + let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap(); assert_eq!(launch.command_pids.len(), 2); // Cosecha. for (_, pid) in &launch.command_pids { @@ -385,6 +602,78 @@ mod tests { } } + #[tokio::test] + async fn pipeline_fanin_two_to_one() { + // 2 productores → 1 consumer (cat). El merger multiplexa. + let spec = PipelineSpec { + label: "fanin".into(), + workspace: WorkspaceId::new(), + nodes: vec![ + cmd("p1", "/bin/echo", &["from-p1"]), + cmd("p2", "/bin/echo", &["from-p2"]), + cmd("c", "/bin/cat", &[]), + ], + edges: vec![ + FlowEdge { + from: 0, + from_output: "stdout".into(), + to: 2, + to_input: "stdin".into(), + }, + FlowEdge { + from: 1, + from_output: "stdout".into(), + to: 2, + to_input: "stdin".into(), + }, + ], + discern: DiscernPolicy::default(), + }; + let disc = Arc::new(DiscernPipeline::default_pipeline()); + let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); + let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap(); + assert_eq!(launch.command_pids.len(), 3); + for (_, pid) in &launch.command_pids { + let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None); + } + } + + #[tokio::test] + async fn pipeline_fanout_one_to_two() { + // 1 productor (echo) → 2 consumers (wc -c). Splitter replica. + let spec = PipelineSpec { + label: "fanout".into(), + workspace: WorkspaceId::new(), + nodes: vec![ + cmd("p", "/bin/echo", &["fanout-test"]), + cmd("c1", "/bin/cat", &[]), + cmd("c2", "/bin/cat", &[]), + ], + edges: vec![ + FlowEdge { + from: 0, + from_output: "stdout".into(), + to: 1, + to_input: "stdin".into(), + }, + FlowEdge { + from: 0, + from_output: "stdout".into(), + to: 2, + to_input: "stdin".into(), + }, + ], + discern: DiscernPolicy::default(), + }; + let disc = Arc::new(DiscernPipeline::default_pipeline()); + let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); + let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap(); + assert_eq!(launch.command_pids.len(), 3); + for (_, pid) in &launch.command_pids { + let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None); + } + } + #[tokio::test] async fn pipeline_isolated_with_tap_captures_discernment() { let spec = PipelineSpec { @@ -403,11 +692,12 @@ mod tests { discern: DiscernPolicy { sample_bytes: 4096, enrich_producer: true, + replay_chunks: 32, }, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); - let launch = run_pipeline(&spec, "ws", true, disc, inc).await.unwrap(); + let launch = run_pipeline(&spec, "ws", true, disc, inc, None).await.unwrap(); assert_eq!(launch.edge_discernments.len(), 1); let d = &launch.edge_discernments[0]; let dis = d.discernment.as_ref().expect("discernment present"); diff --git a/crates/modules/shipote/shipote-core/src/stats.rs b/crates/modules/shipote/shipote-core/src/stats.rs new file mode 100644 index 0000000..8ae470d --- /dev/null +++ b/crates/modules/shipote/shipote-core/src/stats.rs @@ -0,0 +1,168 @@ +//! Resource accounting por workspace. +//! +//! Dos fuentes: +//! - **Per-proc** (`/proc//status` + `stat`): suma RSS y CPU ticks de +//! los comandos vivos del workspace. Siempre disponible. Costo: O(N pids). +//! - **Cgroup v2** (`memory.current`, `cpu.stat`): un read por workspace si +//! `SomaSpec.cgroup.path` está y es leíble. Más preciso (incluye descendants). +//! +//! Si ambos están disponibles, devolvemos el cgroup (más preciso) y dejamos +//! el per-proc como `sample_via_proc`. + +use std::path::Path; +use std::time::Instant; + +#[derive(Debug, Clone, Default)] +pub struct WorkspaceStats { + pub commands_alive: u32, + pub commands_total: u32, + /// RSS sumado en bytes. `None` si no se pudo medir. + pub rss_bytes: Option, + /// High-water mark de RSS (peak alguna vez observado). Cgroup v2: + /// `memory.peak` (≥6.5). Per-proc: suma de `VmHWM` de cada pid. + pub rss_peak_bytes: Option, + /// Tiempo CPU acumulado en microsegundos. `None` si no se pudo medir. + pub cpu_usec: Option, + /// Fuente del dato: "proc" | "cgroup" | "mixed". + pub source: String, + /// Wall-clock uptime del workspace en milisegundos. + pub uptime_ms: u64, +} + +/// Mide stats para un set de PIDs vivos + un path de cgroup opcional. +pub fn measure( + alive_pids: &[i32], + cgroup_path: Option<&Path>, + workspace_started: Instant, +) -> WorkspaceStats { + let mut rss_proc: u64 = 0; + let mut rss_peak_proc: u64 = 0; + let mut cpu_proc: u64 = 0; + let mut proc_ok = false; + for &pid in alive_pids { + if let Some((rss, peak, cpu)) = read_proc_pid(pid) { + rss_proc += rss; + rss_peak_proc += peak; + cpu_proc += cpu; + proc_ok = true; + } + } + + let cgroup = cgroup_path.and_then(read_cgroup_stats); + + let (rss, rss_peak, cpu, source) = match (cgroup, proc_ok) { + (Some(cg), _) => (Some(cg.rss), cg.rss_peak, Some(cg.cpu_usec), "cgroup".to_string()), + (None, true) => ( + Some(rss_proc), + Some(rss_peak_proc), + Some(cpu_proc), + "proc".to_string(), + ), + (None, false) => (None, None, None, "none".to_string()), + }; + + WorkspaceStats { + commands_alive: alive_pids.len() as u32, + commands_total: 0, + rss_bytes: rss, + rss_peak_bytes: rss_peak, + cpu_usec: cpu, + source, + uptime_ms: workspace_started.elapsed().as_millis() as u64, + } +} + +struct CgroupStats { + rss: u64, + rss_peak: Option, + cpu_usec: u64, +} + +/// Lee `(rss_bytes, rss_peak_bytes, cpu_usec)` de `/proc//`. None si el proc desapareció. +fn read_proc_pid(pid: i32) -> Option<(u64, u64, u64)> { + let (rss_kb, hwm_kb) = { + let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?; + let mut rss = 0u64; + let mut hwm = 0u64; + for l in status.lines() { + if let Some(rest) = l.strip_prefix("VmRSS:") { + rss = rest + .trim() + .split_whitespace() + .next() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + } else if let Some(rest) = l.strip_prefix("VmHWM:") { + hwm = rest + .trim() + .split_whitespace() + .next() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + } + } + (rss, hwm) + }; + let cpu_usec = { + let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?; + // formato: pid (comm) state ppid pgrp ... utime stime cutime cstime + // Cuidado: comm puede tener espacios y paréntesis. Buscamos la última `)`. + let end_comm = stat.rfind(')')?; + let after = &stat[end_comm + 1..]; + let fields: Vec<&str> = after.split_whitespace().collect(); + // Tras `)`, índice 0 = state, índice 11 = utime, 12 = stime. + let utime = fields.get(11).and_then(|s| s.parse::().ok()).unwrap_or(0); + let stime = fields.get(12).and_then(|s| s.parse::().ok()).unwrap_or(0); + let ticks = utime + stime; + // Convertimos ticks → microsegundos. SC_CLK_TCK típicamente 100. + let clk_tck = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }.max(1) as u64; + ticks * 1_000_000 / clk_tck + }; + Some((rss_kb * 1024, hwm_kb * 1024, cpu_usec)) +} + +/// Lee `CgroupStats` del cgroup. None si no existe o no es leíble. +/// `memory.peak` requiere kernel ≥6.5; si falta, `rss_peak` queda None. +fn read_cgroup_stats(cgroup_path: &Path) -> Option { + let mem = std::fs::read_to_string(cgroup_path.join("memory.current")) + .ok() + .and_then(|s| s.trim().parse::().ok())?; + let cpu_stat = std::fs::read_to_string(cgroup_path.join("cpu.stat")).ok()?; + let cpu_usec = cpu_stat + .lines() + .find_map(|l| l.strip_prefix("usage_usec")) + .and_then(|s| s.split_whitespace().next()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + let peak = std::fs::read_to_string(cgroup_path.join("memory.peak")) + .ok() + .and_then(|s| s.trim().parse::().ok()); + Some(CgroupStats { + rss: mem, + rss_peak: peak, + cpu_usec, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn measure_with_no_pids_returns_zero() { + let stats = measure(&[], None, Instant::now()); + assert_eq!(stats.commands_alive, 0); + assert_eq!(stats.rss_bytes, None); + assert_eq!(stats.source, "none"); + } + + #[test] + fn measure_self_pid_returns_data() { + let me = std::process::id() as i32; + let stats = measure(&[me], None, Instant::now()); + assert_eq!(stats.commands_alive, 1); + // Nuestro propio RSS debería ser > 0. + assert!(stats.rss_bytes.unwrap_or(0) > 0); + assert_eq!(stats.source, "proc"); + } +} diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 6e18842..0c77c2e 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -17,6 +17,10 @@ use ulid::Ulid; pub const DEFAULT_SOCK_NAME: &str = "shipote.sock"; pub const MAX_FRAME: usize = 1 << 20; +fn default_grace_ms() -> u64 { + 1000 +} + // ===================================================================== // Mensajes // ===================================================================== @@ -32,8 +36,13 @@ pub enum Request { /// Listar todos los workspaces vivos. WorkspaceList, - /// Detener un workspace y reapear sus comandos. - WorkspaceStop { id: WorkspaceId }, + /// Detener un workspace y reapear sus comandos. `grace_ms`: tiempo + /// que se espera tras SIGTERM antes de SIGKILL. 0 = SIGKILL inmediato. + WorkspaceStop { + id: WorkspaceId, + #[serde(default = "default_grace_ms")] + grace_ms: u64, + }, /// Ejecutar un comando one-shot dentro de un workspace existente. Run { @@ -50,6 +59,10 @@ pub enum Request { /// consumidor de cada FlowEdge, sampleando los primeros bytes /// y discerniendo el TypeRef. tap: bool, + /// Variables para sustitución `${KEY}` en strings del spec + /// antes de spawn (templating). + #[serde(default)] + vars: std::collections::BTreeMap, }, /// Discernir un buffer ad-hoc (sin workspace). Útil para `shipote discern `. @@ -66,6 +79,8 @@ pub enum Request { workspace: shipote_card::WorkspaceId, command: Ulid, tail_bytes: usize, + /// "stdout" | "stderr" | "both" (default "both" si vacío). + stream: String, }, /// Guardar (o reemplazar) un PipelineSpec bajo un nombre. @@ -78,7 +93,29 @@ pub enum Request { PipelineDrop { name: String }, /// Ejecutar un pipeline guardado. - PipelineRunSaved { name: String, tap: bool }, + PipelineRunSaved { + name: String, + tap: bool, + #[serde(default)] + vars: std::collections::BTreeMap, + }, + + /// Resource accounting de un workspace. + WorkspaceStats { workspace: shipote_card::WorkspaceId }, + + /// Detener selectivamente los comandos de un pipeline (no el workspace + /// entero). `grace_ms`: SIGTERM → wait → SIGKILL. + PipelineStop { + pipeline: Ulid, + #[serde(default = "default_grace_ms")] + grace_ms: u64, + }, + + /// Listar pipelines activos con sus flow channels (data plane). + FlowList, + + /// Cerrar el data plane de un pipeline (drop sockets + canales). + FlowDrop { pipeline: Ulid }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -148,11 +185,47 @@ pub enum Response { existed: bool, }, + PipelineStopped { + pipeline: Ulid, + reaped: u32, + }, + + WorkspaceStats { + info: WorkspaceStatsInfo, + }, + + FlowList { + items: Vec, + }, + + FlowDropped { + pipeline: Ulid, + existed: bool, + }, + Error { message: String, }, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkspaceStatsInfo { + pub commands_alive: u32, + pub commands_total: u32, + pub rss_bytes: Option, + #[serde(default)] + pub rss_peak_bytes: Option, + pub cpu_usec: Option, + pub source: String, + pub uptime_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlowInfo { + pub pipeline: Ulid, + pub sockets: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommandInfo { pub id: Ulid, @@ -175,6 +248,9 @@ pub struct EdgeDiscernmentInfo { pub mime: Option, pub lens: Option, pub confidence: f32, + /// Path del Unix socket donde otros módulos pueden suscribirse a los + /// bytes replicados de este edge (data plane). `None` si tap=false. + pub flow_socket: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/shared/ente-incarnate/src/lib.rs b/crates/shared/ente-incarnate/src/lib.rs index 4f4b66b..bcdb8b7 100644 --- a/crates/shared/ente-incarnate/src/lib.rs +++ b/crates/shared/ente-incarnate/src/lib.rs @@ -29,11 +29,13 @@ pub mod env; pub mod error; pub mod namespaced; pub mod plain; +pub mod pre_exec; pub use brahman_card::Card; pub use caps::{CapabilitySet, CgroupStatus, NsKind, UserNsStatus}; pub use env::{EnvSpec, ENV_BUS_SOCK, ENV_ENTE_ID}; pub use error::{Degradation, IncarnateError}; +pub use pre_exec::{ChildPreExec, ChildSetup}; use std::os::fd::RawFd; @@ -201,6 +203,16 @@ impl Incarnator { &self, card: &Card, stdio: ChildStdio, + ) -> Result { + self.incarnate_full(card, stdio, ChildSetup::default()) + } + + /// Variante full: stdio + setup pre-execve. + pub fn incarnate_full( + &self, + card: &Card, + stdio: ChildStdio, + setup: ChildSetup, ) -> Result { if self.cfg.strict_caps { let v = self.dry_run(card); @@ -231,9 +243,9 @@ impl Incarnator { let mut degradations = Vec::new(); let pid = if namespaced::needs_namespacing(&card.soma.namespaces) { - namespaced::incarnate_namespaced(card, &env_spec, &stdio, &mut degradations)? + namespaced::incarnate_namespaced(card, &env_spec, &stdio, &setup, &mut degradations)? } else { - plain::incarnate_plain(card, &env_spec, &stdio)? + plain::incarnate_plain(card, &env_spec, &stdio, &setup)? }; Ok(IncarnateOutcome { pid, degradations }) } @@ -345,6 +357,48 @@ mod tests { // r se cierra al drop del OwnedFd. } + /// child_pre_exec aplica chdir + NoNewPrivs en path plain. + #[test] + fn child_pre_exec_chdir_changes_pwd() { + use crate::{ChildPreExec, ChildSetup}; + use nix::fcntl::OFlag; + use nix::unistd::{pipe2, read}; + use std::ffi::CString; + use std::os::fd::{AsRawFd, IntoRawFd}; + + let inc = Incarnator::new(IncarnatorConfig::default()); + // Comando: /bin/pwd. Si chdir funciona, output = /tmp. + let card = make_card( + Payload::Native { + exec: "/bin/pwd".into(), + argv: vec![], + envp: vec![], + }, + NamespaceSet::default(), + ); + + let (r, w) = pipe2(OFlag::empty()).expect("pipe"); + let w_raw = w.into_raw_fd(); + let r_raw = r.as_raw_fd(); + + let stdio = ChildStdio { + stdin_fd: None, + stdout_fd: Some(w_raw), + stderr_fd: None, + }; + let setup = ChildSetup::new() + .with(ChildPreExec::Chdir(CString::new("/tmp").unwrap())) + .with(ChildPreExec::NoNewPrivs); + let out = inc.incarnate_full(&card, stdio, setup).expect("incarnate"); + + let _ = nix::sys::wait::waitpid(out.pid, None); + + let mut buf = [0u8; 64]; + let n = read(r_raw, &mut buf).expect("read"); + let s = std::str::from_utf8(&buf[..n]).unwrap(); + assert!(s.starts_with("/tmp"), "pwd output was: {s:?}"); + } + /// Smoke: encarnar /bin/true sin ns. No requiere root. #[test] fn incarnate_plain_true_succeeds() { diff --git a/crates/shared/ente-incarnate/src/namespaced.rs b/crates/shared/ente-incarnate/src/namespaced.rs index 8a22609..f33e637 100644 --- a/crates/shared/ente-incarnate/src/namespaced.rs +++ b/crates/shared/ente-incarnate/src/namespaced.rs @@ -24,6 +24,7 @@ use crate::child::{apply_rlimits, make_root_private}; use crate::cgroup::{ensure_cgroup, move_to_cgroup}; use crate::env::{build_env, EnvSpec}; use crate::error::{Degradation, IncarnateError}; +use crate::pre_exec::{apply_unchecked, ChildSetup}; use crate::ChildStdio; use brahman_card::{Card, NamespaceSet, Payload}; use nix::fcntl::OFlag; @@ -53,6 +54,7 @@ pub fn incarnate_namespaced( card: &Card, env_spec: &EnvSpec, stdio: &ChildStdio, + setup: &ChildSetup, degradations: &mut Vec, ) -> Result { let flags = build_clone_flags(&card.soma.namespaces); @@ -96,6 +98,7 @@ pub fn incarnate_namespaced( let stdin_fd = stdio.stdin_fd; let stdout_fd = stdio.stdout_fd; let stderr_fd = stdio.stderr_fd; + let setup_ops = setup.ops.clone(); // SAFETY: la clausura corre en stack nuevo dentro de un proceso recién // clonado, COW del padre. Sólo syscalls async-signal-safe; sin allocator, @@ -142,6 +145,14 @@ pub fn incarnate_namespaced( } } + // Aplica las ops declarativas pre-execve (NoNewPrivs, chdir, etc.). + if !setup_ops.is_empty() { + let r = unsafe { apply_unchecked(&setup_ops) }; + if r != 0 { + unsafe { libc::_exit(r) }; + } + } + unsafe { libc::execve(exec_c.as_ptr(), argv_ptrs.as_ptr(), envp_ptrs.as_ptr()); libc::_exit(102); diff --git a/crates/shared/ente-incarnate/src/plain.rs b/crates/shared/ente-incarnate/src/plain.rs index 63ab835..b8e5b60 100644 --- a/crates/shared/ente-incarnate/src/plain.rs +++ b/crates/shared/ente-incarnate/src/plain.rs @@ -2,16 +2,19 @@ use crate::env::{build_env, EnvSpec}; use crate::error::IncarnateError; +use crate::pre_exec::{apply_unchecked, ChildSetup}; use crate::ChildStdio; use brahman_card::{Card, Payload}; use nix::unistd::Pid; use std::os::fd::FromRawFd; +use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; pub fn incarnate_plain( card: &Card, env_spec: &EnvSpec, stdio: &ChildStdio, + setup: &ChildSetup, ) -> Result { let (exec, argv, base_envp) = match &card.payload { Payload::Native { exec, argv, envp } => (exec.clone(), argv.clone(), envp.clone()), @@ -36,6 +39,22 @@ pub fn incarnate_plain( if let Some(fd) = stdio.stderr_fd { cmd.stderr(unsafe { Stdio::from_raw_fd(fd) }); } + if !setup.is_empty() { + // Clone para que la closure sea 'static (Command::pre_exec lo exige). + let ops = setup.ops.clone(); + // SAFETY: pre_exec corre post-fork pre-exec. apply_unchecked sólo + // hace syscalls async-signal-safe. + unsafe { + cmd.pre_exec(move || { + let r = apply_unchecked(&ops); + if r != 0 { + Err(std::io::Error::from_raw_os_error(libc::EINVAL)) + } else { + Ok(()) + } + }); + } + } let child = cmd.spawn()?; Ok(Pid::from_raw(child.id() as i32)) } diff --git a/crates/shared/ente-incarnate/src/pre_exec.rs b/crates/shared/ente-incarnate/src/pre_exec.rs new file mode 100644 index 0000000..2b07277 --- /dev/null +++ b/crates/shared/ente-incarnate/src/pre_exec.rs @@ -0,0 +1,103 @@ +//! Hook declarativo pre-execve para el hijo. +//! +//! Las ops corren EN EL HIJO, post-fork/clone, pre-execve. Reglas: +//! - sólo syscalls async-signal-safe. +//! - sin allocator (los CStrings ya están construidos por el padre). +//! - sin Drop con efectos. + +use std::ffi::CString; + +/// Operaciones declarativas aplicables pre-execve. +#[derive(Debug, Clone)] +pub enum ChildPreExec { + /// `PR_SET_NO_NEW_PRIVS = 1` — bloquea escaladas futuras + /// (suid bits, file caps, AT_SECURE). Recomendado en sandboxes. + NoNewPrivs, + /// `PR_SET_PDEATHSIG = sig` — el child recibe esta señal cuando su + /// padre (PID 1 del namespace, o el que sea) muere. Útil para + /// auto-cleanup de procesos huérfanos. + ParentDeathSig(i32), + /// `PR_SET_DUMPABLE` — controla si el proceso permite core dump. + Dumpable(bool), + /// `setsid()` — nuevo session/group leader (desconecta del controlling tty). + NewSession, + /// `chdir(path)` — cambiar working dir. Path pre-allocado. + Chdir(CString), + /// `umask(mode)` — fijar umask (octal, e.g. 0o022). + Umask(libc::mode_t), +} + +/// Setup completo del hijo. Default = sin ops. +#[derive(Debug, Clone, Default)] +pub struct ChildSetup { + pub ops: Vec, +} + +impl ChildSetup { + pub fn new() -> Self { + Self::default() + } + + pub fn push(&mut self, op: ChildPreExec) -> &mut Self { + self.ops.push(op); + self + } + + pub fn with(mut self, op: ChildPreExec) -> Self { + self.ops.push(op); + self + } + + pub fn is_empty(&self) -> bool { + self.ops.is_empty() + } +} + +/// Aplica las ops en orden. SAFETY: ejecuta en el hijo, post-fork, +/// pre-execve. Sólo libc, sin allocator, sin Drop. +/// +/// En caso de error, retorna el código de exit que el caller usará para +/// abortar el child (igual semántica que el resto de la closure de clone). +/// 0 = todo OK. +pub unsafe fn apply_unchecked(ops: &[ChildPreExec]) -> i32 { + for op in ops { + match op { + ChildPreExec::NoNewPrivs => { + // PR_SET_NO_NEW_PRIVS = 38 en Linux. + let r = unsafe { libc::prctl(libc::PR_SET_NO_NEW_PRIVS, 1u64, 0u64, 0u64, 0u64) }; + if r != 0 { + return 110; + } + } + ChildPreExec::ParentDeathSig(sig) => { + let r = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, *sig as u64, 0u64, 0u64, 0u64) }; + if r != 0 { + return 111; + } + } + ChildPreExec::Dumpable(yes) => { + let v: u64 = if *yes { 1 } else { 0 }; + let r = unsafe { libc::prctl(libc::PR_SET_DUMPABLE, v, 0u64, 0u64, 0u64) }; + if r != 0 { + return 112; + } + } + ChildPreExec::NewSession => { + let r = unsafe { libc::setsid() }; + if r < 0 { + return 113; + } + } + ChildPreExec::Chdir(path) => { + let r = unsafe { libc::chdir(path.as_ptr()) }; + if r != 0 { + return 114; + } + } + ChildPreExec::Umask(mode) => { + unsafe { libc::umask(*mode) }; + } + } + } + 0 +}