diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index 244485a..ee4d41c 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -24,6 +24,9 @@ enum Cmd { /// Health-check del daemon. Ping, + /// Health endpoint estructurado. + Health, + /// Capacidades runtime detectadas por el daemon. Caps, @@ -195,6 +198,30 @@ async fn main() -> Result<()> { } } + Cmd::Health => { + let resp = round_trip(&mut stream, Request::Health).await?; + match resp { + Response::Health { + version, + uptime_ms, + alive_workspaces, + alive_commands, + alive_pipelines, + active_flows, + dirty, + } => { + println!("version: {version}"); + println!("uptime: {} ms", uptime_ms); + println!("alive_workspaces: {alive_workspaces}"); + println!("alive_commands: {alive_commands}"); + println!("alive_pipelines: {alive_pipelines}"); + println!("active_flows: {active_flows}"); + println!("dirty: {dirty}"); + } + other => print_unexpected(&other), + } + } + Cmd::Caps => { let resp = round_trip(&mut stream, Request::Capabilities).await?; match resp { diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index f4d45a6..5709b64 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -38,6 +38,7 @@ async fn main() -> anyhow::Result<()> { } let listener = UnixListener::bind(&sock).with_context(|| format!("bind {}", sock.display()))?; info!(socket = %sock.display(), "shipote-daemon listening"); + let daemon_started = std::time::Instant::now(); // Sidecar pool: una sesión global del daemon + N sesiones efímeras // por edge enriquecido tras cada pipeline tap. @@ -241,7 +242,7 @@ async fn main() -> anyhow::Result<()> { let disc = discerner.clone(); let pool = sidecar_pool.clone(); tokio::spawn(async move { - if let Err(e) = handle_client(stream, mgr, disc, pool).await { + if let Err(e) = handle_client(stream, mgr, disc, pool, daemon_started).await { warn!(?e, "client handler error"); } }); @@ -280,27 +281,80 @@ async fn handle_client( mgr: Arc, disc: Arc, pool: Option>, + daemon_started: std::time::Instant, ) -> anyhow::Result<()> { + // Audit: peer uid lo leemos una vez aquí (no cambia durante la conexión). + let peer = peer_uid(&stream).unwrap_or(u32::MAX); loop { let req: Request = match read_frame(&mut stream).await { Ok(r) => r, Err(shipote_protocol::ProtocolError::Closed) => return Ok(()), Err(e) => return Err(e.into()), }; - let resp = dispatch(&mgr, &disc, &pool, req).await; + audit_request(peer, &req); + let resp = dispatch(&mgr, &disc, &pool, daemon_started, req).await; write_frame(&mut stream, &resp).await?; } } +/// Loguea cada mutación con target="audit" y el peer uid. Reads (ping, +/// list, stats) se omiten para no inundar el log. +fn audit_request(peer_uid: u32, req: &Request) { + let (action, detail) = match req { + Request::WorkspaceCreate { spec } => ("workspace.create", format!("label={}", spec.label)), + Request::WorkspaceStop { id, grace_ms } => ("workspace.stop", format!("id={id} grace_ms={grace_ms}")), + Request::Run { workspace, exec, restart_on_failure, .. } => ( + "run", + format!("ws={workspace} exec={exec} restart={restart_on_failure}"), + ), + Request::PipelineRun { spec, tap, .. } => ("pipeline.run", format!("label={} tap={tap}", spec.label)), + Request::PipelineRunSaved { name, tap, .. } => ("pipeline.run-saved", format!("name={name} tap={tap}")), + Request::PipelineStop { pipeline, grace_ms } => ("pipeline.stop", format!("id={pipeline} grace_ms={grace_ms}")), + Request::PipelineSave { name, .. } => ("pipeline.save", format!("name={name}")), + Request::PipelineDrop { name } => ("pipeline.drop", format!("name={name}")), + Request::FlowDrop { pipeline } => ("flow.drop", format!("pipeline={pipeline}")), + // Reads (no audit): + Request::Ping + | Request::Health + | Request::WorkspaceList + | Request::WorkspaceStats { .. } + | Request::WorkspaceQuota { .. } + | Request::WorkspaceStatsHistory { .. } + | Request::WorkspaceFullSummary { .. } + | Request::CommandList { .. } + | Request::CommandLogs { .. } + | Request::PipelineSavedList + | Request::FlowList + | Request::FlowThroughput + | Request::Discern { .. } + | Request::Capabilities => return, + }; + info!(target: "audit", uid = peer_uid, action, detail = %detail, "audit"); +} + async fn dispatch( mgr: &Arc, disc: &DiscernPipeline, pool: &Option>, + daemon_started: std::time::Instant, req: Request, ) -> Response { match req { Request::Ping => Response::Pong, + Request::Health => { + let counts = mgr.health_counts().await; + Response::Health { + version: env!("CARGO_PKG_VERSION").to_string(), + uptime_ms: daemon_started.elapsed().as_millis() as u64, + alive_workspaces: counts.alive_workspaces, + alive_commands: counts.alive_commands, + alive_pipelines: counts.alive_pipelines, + active_flows: counts.active_flows, + dirty: mgr.is_dirty(), + } + } + Request::WorkspaceCreate { spec } => match mgr.create(spec).await { Ok((id, warnings)) => Response::WorkspaceCreated { id, warnings }, Err(e) => Response::Error { message: format!("{e}") }, diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 0e3147e..c3b717b 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -148,6 +148,14 @@ pub struct CommandSummary { pub pid: i32, } +#[derive(Debug, Clone, Default)] +pub struct HealthCounts { + pub alive_workspaces: u32, + pub alive_commands: u32, + pub alive_pipelines: u32, + pub active_flows: u32, +} + #[derive(Debug, Clone)] pub struct CommandInfo { pub id: Ulid, @@ -448,6 +456,25 @@ impl WorkspaceManager { self.inner.lock().await.pipeline_flows.insert(pipeline, flows); } + /// Snapshot de counts agregados para health endpoint. + pub async fn health_counts(&self) -> HealthCounts { + let g = self.inner.lock().await; + let alive_workspaces = g.workspaces.len() as u32; + let alive_commands: u32 = g + .workspaces + .values() + .map(|ws| ws.commands.values().filter(|c| c.alive).count() as u32) + .sum(); + let alive_pipelines = g.pipeline_supervisors.len() as u32; + let active_flows: u32 = g.pipeline_flows.values().map(|v| v.len() as u32).sum(); + HealthCounts { + alive_workspaces, + alive_commands, + alive_pipelines, + active_flows, + } + } + /// Lista pipelines vivos con sus sockets activos. pub async fn list_flow_pipelines(&self) -> Vec<(Ulid, Vec)> { let g = self.inner.lock().await; diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index 3a2b708..817eeb9 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -419,6 +419,11 @@ fn spawn_splitter( let mut buf = [0u8; 4096]; let mut total: u64 = 0; let mut eof = false; + let mut bucket = if spec.max_bytes_per_sec > 0 { + Some(TokenBucket::new(spec.max_bytes_per_sec)) + } else { + None + }; // Fase 1: sampling (sólo si tap=true) + replicación. while !eof && (spec.tap && sample.len() < spec.sample_bytes) { @@ -432,9 +437,16 @@ fn spawn_splitter( let take = n.min(spec.sample_bytes - sample.len()); sample.extend_from_slice(&buf[..take]); } + // Token bucket: reserva ANTES de broadcast — si hay debt, + // sleep antes de mandar al subscriber. + if let Some(b) = bucket.as_mut() { + let wait = b.reserve(n as u64); + if !wait.is_zero() { + tokio::time::sleep(wait).await; + } + } broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; - rate_limit_sleep(spec.max_bytes_per_sec, n).await; } let d = if spec.tap { @@ -451,9 +463,14 @@ fn spawn_splitter( Err(_) => break, }; if n == 0 { break; } + if let Some(b) = bucket.as_mut() { + let wait = b.reserve(n as u64); + if !wait.is_zero() { + tokio::time::sleep(wait).await; + } + } broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; total += n as u64; - rate_limit_sleep(spec.max_bytes_per_sec, n).await; } debug!(bytes = total, consumers = writers.len(), "splitter finished"); @@ -475,16 +492,45 @@ fn spawn_splitter( SplitterHandle { handle } } -/// Token-bucket simple: si `max_bps > 0`, sleep `chunk_size / max_bps` -/// segundos. Implementación crude pero suficiente para v1. -async fn rate_limit_sleep(max_bps: u64, chunk_bytes: usize) { - if max_bps == 0 { - return; +/// Token-bucket real con capacidad de burst. +/// - `rate_bps`: tokens (bytes) por segundo de refill. +/// - `capacity`: máx tokens acumulables. Default = 1 segundo de rate. +/// - `tokens`: tokens disponibles (puede negativos para "debt"). +/// - `last_refill`: para calcular cuántos refill desde la última call. +struct TokenBucket { + rate_bps: u64, + capacity: u64, + tokens: f64, + last_refill: std::time::Instant, +} + +impl TokenBucket { + fn new(rate_bps: u64) -> Self { + Self { + rate_bps, + capacity: rate_bps, // 1 second worth of burst. + tokens: rate_bps as f64, + last_refill: std::time::Instant::now(), + } } - let secs = chunk_bytes as f64 / max_bps as f64; - let ms = (secs * 1000.0) as u64; - if ms > 0 { - tokio::time::sleep(std::time::Duration::from_millis(ms)).await; + + /// Refill desde la última call según wall time. Reserva `cost` + /// tokens; si no alcanza, retorna el sleep necesario. + fn reserve(&mut self, cost: u64) -> std::time::Duration { + let now = std::time::Instant::now(); + let elapsed_secs = now.duration_since(self.last_refill).as_secs_f64(); + self.tokens = (self.tokens + elapsed_secs * self.rate_bps as f64) + .min(self.capacity as f64); + self.last_refill = now; + + self.tokens -= cost as f64; + if self.tokens >= 0.0 { + std::time::Duration::ZERO + } else { + // Debt: tiempo para recuperar a 0 tokens. + let secs_needed = -self.tokens / self.rate_bps as f64; + std::time::Duration::from_secs_f64(secs_needed) + } } } diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index cc8154d..cc72bb0 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -30,6 +30,9 @@ pub enum Request { /// Health-check. Ping, + /// Health endpoint estructurado: versión + uptime + counts. + Health, + /// Crear un workspace nuevo. WorkspaceCreate { spec: WorkspaceSpec }, @@ -145,6 +148,16 @@ pub enum Request { pub enum Response { Pong, + Health { + version: String, + uptime_ms: u64, + alive_workspaces: u32, + alive_commands: u32, + alive_pipelines: u32, + active_flows: u32, + dirty: bool, + }, + WorkspaceCreated { id: WorkspaceId, warnings: Vec,