From 3486949d24f846d552e9ff0fbff73324bd15d00f Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 13:58:41 +0000 Subject: [PATCH] feat(shipote): throughput + stats persistente + auth peer (fase P) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FlowMeter (atomic u64 + rolling window 32 samples) en cada FlowChannel. flow_throughput() → (socket, bytes_total, bytes_per_sec). CLI: shipote flow throughput. Idle threshold 5s = rate 0.0. - Snapshot v4 con stats_history persistente por workspace (cap 16). PersistedStats separado para evitar Instant. Restore hidrata el VecDeque con source="persisted". - Auth SO_PEERCRED: daemon rechaza peers con uid distinto al propio. SHIPOTE_TRUST_ANYONE=1 = escape hatch documentado. 84 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 25, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 1 + crates/apps/shipote-cli/src/main.rs | 22 +++++ crates/apps/shipote-daemon/Cargo.toml | 1 + crates/apps/shipote-daemon/src/main.rs | 64 +++++++++++++- .../shipote/shipote-core/src/flow_channel.rs | 79 ++++++++++++++++- .../modules/shipote/shipote-core/src/lib.rs | 16 ++++ .../shipote/shipote-core/src/persist.rs | 88 +++++++++++++++++-- .../shipote/shipote-protocol/src/lib.rs | 14 +++ 8 files changed, 273 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9185e17..1d597cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9639,6 +9639,7 @@ dependencies = [ "brahman-card", "brahman-sidecar", "ente-incarnate", + "libc", "nix 0.29.0", "shipote-card", "shipote-core", diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index 94a8865..244485a 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -87,6 +87,8 @@ enum Cmd { enum FlowCmd { /// Listar pipelines activos con sus sockets de flow. List, + /// Throughput por flow socket (bytes_total + bytes/s). + Throughput, /// Cerrar el data plane de un pipeline (drop de todos sus sockets). Drop { pipeline: String }, /// Suscribirse a un flow socket y volcar bytes a stdout. @@ -557,6 +559,26 @@ async fn main() -> Result<()> { } } + Cmd::Flow(FlowCmd::Throughput) => { + let resp = round_trip(&mut stream, Request::FlowThroughput).await?; + match resp { + Response::FlowThroughput { items } => { + if items.is_empty() { + println!("(no active flows)"); + } + for it in items { + let name = it.socket.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| it.socket.display().to_string()); + let kib = it.bytes_total as f64 / 1024.0; + let kbs = it.bytes_per_sec / 1024.0; + println!("{:<60} {:>8.1} KiB total {:>8.2} KiB/s", name, kib, kbs); + } + } + other => print_unexpected(&other), + } + } + Cmd::Flow(FlowCmd::Drop { pipeline }) => { let pid = Ulid::from_string(&pipeline).map_err(|e| anyhow!("invalid pipeline id: {e}"))?; let resp = round_trip(&mut stream, Request::FlowDrop { pipeline: pid }).await?; diff --git a/crates/apps/shipote-daemon/Cargo.toml b/crates/apps/shipote-daemon/Cargo.toml index 1f08310..5302210 100644 --- a/crates/apps/shipote-daemon/Cargo.toml +++ b/crates/apps/shipote-daemon/Cargo.toml @@ -26,3 +26,4 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } ulid = { workspace = true } nix = { workspace = true } +libc = { workspace = true } diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index e6c6db7..f4d45a6 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -17,8 +17,8 @@ use shipote_core::WorkspaceManager; use shipote_discern::{DiscernPipeline, Hint}; use shipote_protocol::{ default_socket_path, read_frame, write_frame, CommandInfo as ProtoCommandInfo, - EdgeDiscernmentInfo, FlowInfo, QuotaReportInfo, Request, Response, WorkspaceStatsInfo, - WorkspaceSummary, + EdgeDiscernmentInfo, FlowInfo, FlowThroughputInfo, QuotaReportInfo, Request, Response, + WorkspaceStatsInfo, WorkspaceSummary, }; use std::sync::Arc; use tokio::net::{UnixListener, UnixStream}; @@ -209,9 +209,34 @@ async fn main() -> anyhow::Result<()> { }); } + // UID propio (para auth). SHIPOTE_TRUST_ANYONE=1 deshabilita. + let own_uid = nix::unistd::getuid().as_raw(); + let trust_anyone = std::env::var("SHIPOTE_TRUST_ANYONE").as_deref() == Ok("1"); + if trust_anyone { + warn!("SHIPOTE_TRUST_ANYONE=1 — accepting any peer uid"); + } + loop { match listener.accept().await { Ok((stream, _)) => { + // Auth: SO_PEERCRED es automático en Unix sockets. Si + // el uid del peer no coincide con el nuestro, rechazo + // antes de procesar nada (a menos que esté permitido). + if !trust_anyone { + match peer_uid(&stream) { + Ok(peer) if peer == own_uid => {} + Ok(peer) => { + warn!(peer, own = own_uid, "rejecting peer with different uid"); + drop(stream); + continue; + } + Err(e) => { + warn!(?e, "could not read peer uid — rejecting"); + drop(stream); + continue; + } + } + } let mgr = mgr.clone(); let disc = discerner.clone(); let pool = sidecar_pool.clone(); @@ -229,6 +254,27 @@ async fn main() -> anyhow::Result<()> { } } +/// Lee SO_PEERCRED del Unix socket conectado. Devuelve el uid del peer. +fn peer_uid(stream: &tokio::net::UnixStream) -> std::io::Result { + use std::os::fd::AsRawFd; + let fd = stream.as_raw_fd(); + let mut ucred: libc::ucred = unsafe { std::mem::zeroed() }; + let mut len = std::mem::size_of::() as libc::socklen_t; + let r = unsafe { + libc::getsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_PEERCRED, + &mut ucred as *mut _ as *mut _, + &mut len, + ) + }; + if r != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(ucred.uid) +} + async fn handle_client( mut stream: UnixStream, mgr: Arc, @@ -559,6 +605,20 @@ async fn dispatch( Response::FlowList { items } } + Request::FlowThroughput => { + let items = mgr + .flow_throughput() + .await + .into_iter() + .map(|(socket, bytes_total, bytes_per_sec)| FlowThroughputInfo { + socket, + bytes_total, + bytes_per_sec, + }) + .collect(); + Response::FlowThroughput { items } + } + Request::FlowDrop { pipeline } => { let existed = mgr.drop_pipeline_flows(pipeline).await; Response::FlowDropped { pipeline, existed } diff --git a/crates/modules/shipote/shipote-core/src/flow_channel.rs b/crates/modules/shipote/shipote-core/src/flow_channel.rs index be0932c..be3740d 100644 --- a/crates/modules/shipote/shipote-core/src/flow_channel.rs +++ b/crates/modules/shipote/shipote-core/src/flow_channel.rs @@ -47,9 +47,77 @@ pub struct FlowChannel { replay: Arc>>>>, replay_caps: ReplayCaps, socket_path: PathBuf, + meter: Arc, _accept_handle: AbortOnDrop, } +/// Contador de bytes y rate (bytes/s ventana 1s). +#[derive(Debug)] +pub struct FlowMeter { + /// Bytes acumulados desde la creación del FlowChannel. + total_bytes: std::sync::atomic::AtomicU64, + /// Ring buffer de (timestamp_ms, bytes_acumulados) para calcular + /// el rate sobre los últimos N samples. + rate_window: Mutex>, +} + +const RATE_WINDOW_SAMPLES: usize = 32; + +impl FlowMeter { + fn new() -> Self { + Self { + total_bytes: std::sync::atomic::AtomicU64::new(0), + rate_window: Mutex::new(VecDeque::with_capacity(RATE_WINDOW_SAMPLES)), + } + } + + fn record(&self, delta: u64) { + let now = self.total_bytes + .fetch_add(delta, std::sync::atomic::Ordering::Relaxed) + + delta; + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + if let Ok(mut w) = self.rate_window.lock() { + if w.len() >= RATE_WINDOW_SAMPLES { + w.pop_front(); + } + w.push_back((ts, now)); + } + } + + /// Bytes totales acumulados desde la creación. + pub fn total_bytes(&self) -> u64 { + self.total_bytes.load(std::sync::atomic::Ordering::Relaxed) + } + + /// Bytes por segundo (rolling sobre la ventana). 0 si no hay + /// historia suficiente o si el último sample es muy viejo (>5s). + pub fn bytes_per_sec(&self) -> f64 { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let w = match self.rate_window.lock() { + Ok(w) => w, + Err(_) => return 0.0, + }; + if w.len() < 2 { + return 0.0; + } + let last = w.back().copied().unwrap(); + // Si el último sample tiene >5s, asumimos idle. + if now_ms.saturating_sub(last.0) > 5000 { + return 0.0; + } + let first = w.front().copied().unwrap(); + let dt_ms = last.0.saturating_sub(first.0).max(1); + let d_bytes = last.1.saturating_sub(first.1); + (d_bytes as f64 * 1000.0) / dt_ms as f64 + } +} + #[derive(Debug, Clone, Copy)] pub struct ReplayCaps { /// Máximo de chunks retenidos. @@ -72,6 +140,7 @@ pub struct FlowSender { sender: broadcast::Sender>>, replay: Arc>>>>, replay_caps: ReplayCaps, + meter: Arc, } impl FlowSender { @@ -85,6 +154,7 @@ impl FlowSender { evict_for_incoming(&mut g, caps, incoming); g.push_back(data.clone()); } + self.meter.record(incoming as u64); let _ = self.sender.send(data); } } @@ -195,10 +265,15 @@ impl FlowChannel { replay, replay_caps: caps, socket_path, + meter: Arc::new(FlowMeter::new()), _accept_handle: AbortOnDrop(join.abort_handle()), }) } + pub fn meter(&self) -> &FlowMeter { + &self.meter + } + /// Push un chunk al channel. Si no hay subscribers, drop silencioso. /// Siempre se guarda en el replay buffer (con cap rotation por chunks /// y opcionalmente por bytes). @@ -210,6 +285,7 @@ impl FlowChannel { evict_for_incoming(&mut g, caps, incoming); g.push_back(arc.clone()); } + self.meter.record(incoming as u64); let _ = self.sender.send(arc); } @@ -219,12 +295,13 @@ impl FlowChannel { /// Handle clone-able para que tasks externas (splitter) pushen al /// channel sin tener ownership del FlowChannel. Cada push se guarda - /// también en el replay buffer. + /// también en el replay buffer y se contabiliza en el meter. pub fn sender_handle(&self) -> FlowSender { FlowSender { sender: self.sender.clone(), replay: self.replay.clone(), replay_caps: self.replay_caps, + meter: self.meter.clone(), } } diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 1b21c6e..9ee59ec 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -442,6 +442,22 @@ impl WorkspaceManager { .collect() } + /// Throughput per-socket: bytes_total + bytes_per_sec por flow socket. + pub async fn flow_throughput(&self) -> Vec<(std::path::PathBuf, u64, f64)> { + let g = self.inner.lock().await; + let mut out = Vec::new(); + for flows in g.pipeline_flows.values() { + for fc in flows { + out.push(( + fc.socket_path().to_path_buf(), + fc.meter().total_bytes(), + fc.meter().bytes_per_sec(), + )); + } + } + out + } + /// Cierra el data plane de un pipeline (drop = remove_file de sockets). pub async fn drop_pipeline_flows(&self, pipeline: Ulid) -> bool { self.inner.lock().await.pipeline_flows.remove(&pipeline).is_some() diff --git a/crates/modules/shipote/shipote-core/src/persist.rs b/crates/modules/shipote/shipote-core/src/persist.rs index 456f470..32c78bd 100644 --- a/crates/modules/shipote/shipote-core/src/persist.rs +++ b/crates/modules/shipote/shipote-core/src/persist.rs @@ -10,10 +10,10 @@ use shipote_card::{PipelineSpec, WorkspaceId, WorkspaceSpec}; use std::path::{Path, PathBuf}; use tracing::{info, warn}; -/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines` (pipelines -/// con supervisor vivo al momento del snapshot — el daemon los relanza -/// al restore). Versiones inferiores leen campos ausentes como vacío. -pub const SNAPSHOT_VERSION: u16 = 3; +/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines`. v4 agrega +/// `stats_history` por workspace (sparkline survives daemon restart). +/// Versiones inferiores leen campos ausentes como vacío. +pub const SNAPSHOT_VERSION: u16 = 4; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShipoteSnapshot { @@ -32,6 +32,37 @@ pub struct ShipoteSnapshot { pub struct WorkspaceEntry { pub id: WorkspaceId, pub spec: WorkspaceSpec, + /// Stats history persistida — cap reasonable para no inflar el JSON. + /// Sólo se guardan campos serializables (no Instant). + #[serde(default)] + pub stats_history: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersistedStats { + pub commands_alive: u32, + pub commands_total: u32, + pub rss_bytes: Option, + pub rss_peak_bytes: Option, + pub cpu_usec: Option, + pub cpu_percent: Option, + pub cpu_cores: u32, + pub uptime_ms: u64, +} + +impl From<&crate::stats::WorkspaceStats> for PersistedStats { + fn from(s: &crate::stats::WorkspaceStats) -> Self { + Self { + commands_alive: s.commands_alive, + commands_total: s.commands_total, + rss_bytes: s.rss_bytes, + rss_peak_bytes: s.rss_peak_bytes, + cpu_usec: s.cpu_usec, + cpu_percent: s.cpu_percent, + cpu_cores: s.cpu_cores, + uptime_ms: s.uptime_ms, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -98,13 +129,27 @@ fn now_ms() -> u64 { impl WorkspaceManager { /// Toma snapshot del estado actual. pub async fn snapshot(&self) -> ShipoteSnapshot { + const PERSIST_STATS_CAP: usize = 16; let g = self.inner.lock().await; let workspaces = g .workspaces .iter() - .map(|(id, ws)| WorkspaceEntry { - id: *id, - spec: ws.spec.clone(), + .map(|(id, ws)| { + // Persist sólo los últimos N samples — el resto crece + // y el JSON se infla. + let take = ws.stats_history.len().min(PERSIST_STATS_CAP); + let skip = ws.stats_history.len() - take; + let stats_history: Vec = ws + .stats_history + .iter() + .skip(skip) + .map(PersistedStats::from) + .collect(); + WorkspaceEntry { + id: *id, + spec: ws.spec.clone(), + stats_history, + } }) .collect(); let saved_pipelines = g @@ -165,8 +210,33 @@ impl WorkspaceManager { // v2+: reusamos el id original así clients que tracking // workspace_id no se rompen al restart. let label = entry.spec.label.clone(); - match self.create_with_id(entry.id, entry.spec).await { - Ok(_) => out.workspaces_restored += 1, + let id = entry.id; + let history = entry.stats_history; + match self.create_with_id(id, entry.spec).await { + Ok(_) => { + out.workspaces_restored += 1; + // Hidratar history persistida. Convertimos + // PersistedStats → WorkspaceStats (perdemos + // los campos no serializables como `source`). + if !history.is_empty() { + let mut g = self.inner.lock().await; + if let Some(ws) = g.workspaces.get_mut(&id) { + for ps in history { + ws.stats_history.push_back(crate::stats::WorkspaceStats { + commands_alive: ps.commands_alive, + commands_total: ps.commands_total, + rss_bytes: ps.rss_bytes, + rss_peak_bytes: ps.rss_peak_bytes, + cpu_usec: ps.cpu_usec, + cpu_percent: ps.cpu_percent, + cpu_cores: ps.cpu_cores, + source: "persisted".into(), + uptime_ms: ps.uptime_ms, + }); + } + } + } + } Err(e) => warn!(?e, %label, "skipped workspace en restore"), } } diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 0eae4ab..cc8154d 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -134,6 +134,9 @@ pub enum Request { /// Listar pipelines activos con sus flow channels (data plane). FlowList, + /// Throughput por flow socket: bytes_total + bytes_per_sec. + FlowThroughput, + /// Cerrar el data plane de un pipeline (drop sockets + canales). FlowDrop { pipeline: Ulid }, } @@ -233,6 +236,10 @@ pub enum Response { items: Vec, }, + FlowThroughput { + items: Vec, + }, + FlowDropped { pipeline: Ulid, existed: bool, @@ -270,6 +277,13 @@ fn default_cpu_cores() -> u32 { 1 } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlowThroughputInfo { + pub socket: PathBuf, + pub bytes_total: u64, + pub bytes_per_sec: f64, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowInfo { pub pipeline: Ulid,