feat(shipote): health endpoint + audit log + token-bucket real (fase R)

- Request::Health → Response::Health { version, uptime_ms, alive_*,
  active_flows, dirty }. CLI: shipote health.
- handle_client lee peer_uid una vez al accept. audit_request emite
  info!(target: "audit", uid, action, detail) por mutación (create/stop/
  run/pipeline.*/flow.drop). Reads omitidos. Filtrable con SHIPOTE_LOG=
  warn,audit=info.
- TokenBucket real reemplaza rate_limit_sleep: refill por wall time,
  capacity = 1s de rate, debt negativo dispara sleep proporcional.
  Permite burst real, no chunk-by-chunk uniforme.

85 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 26, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 16:58:10 +00:00
parent 18c0344a52
commit a9124449f9
5 changed files with 180 additions and 13 deletions
+27
View File
@@ -24,6 +24,9 @@ enum Cmd {
/// Health-check del daemon. /// Health-check del daemon.
Ping, Ping,
/// Health endpoint estructurado.
Health,
/// Capacidades runtime detectadas por el daemon. /// Capacidades runtime detectadas por el daemon.
Caps, Caps,
@@ -195,6 +198,30 @@ async fn main() -> Result<()> {
} }
} }
Cmd::Health => {
let resp = round_trip(&mut stream, Request::Health).await?;
match resp {
Response::Health {
version,
uptime_ms,
alive_workspaces,
alive_commands,
alive_pipelines,
active_flows,
dirty,
} => {
println!("version: {version}");
println!("uptime: {} ms", uptime_ms);
println!("alive_workspaces: {alive_workspaces}");
println!("alive_commands: {alive_commands}");
println!("alive_pipelines: {alive_pipelines}");
println!("active_flows: {active_flows}");
println!("dirty: {dirty}");
}
other => print_unexpected(&other),
}
}
Cmd::Caps => { Cmd::Caps => {
let resp = round_trip(&mut stream, Request::Capabilities).await?; let resp = round_trip(&mut stream, Request::Capabilities).await?;
match resp { match resp {
+56 -2
View File
@@ -38,6 +38,7 @@ async fn main() -> anyhow::Result<()> {
} }
let listener = UnixListener::bind(&sock).with_context(|| format!("bind {}", sock.display()))?; let listener = UnixListener::bind(&sock).with_context(|| format!("bind {}", sock.display()))?;
info!(socket = %sock.display(), "shipote-daemon listening"); info!(socket = %sock.display(), "shipote-daemon listening");
let daemon_started = std::time::Instant::now();
// Sidecar pool: una sesión global del daemon + N sesiones efímeras // Sidecar pool: una sesión global del daemon + N sesiones efímeras
// por edge enriquecido tras cada pipeline tap. // por edge enriquecido tras cada pipeline tap.
@@ -241,7 +242,7 @@ async fn main() -> anyhow::Result<()> {
let disc = discerner.clone(); let disc = discerner.clone();
let pool = sidecar_pool.clone(); let pool = sidecar_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle_client(stream, mgr, disc, pool).await { if let Err(e) = handle_client(stream, mgr, disc, pool, daemon_started).await {
warn!(?e, "client handler error"); warn!(?e, "client handler error");
} }
}); });
@@ -280,27 +281,80 @@ async fn handle_client(
mgr: Arc<WorkspaceManager>, mgr: Arc<WorkspaceManager>,
disc: Arc<DiscernPipeline>, disc: Arc<DiscernPipeline>,
pool: Option<Arc<brahman_sidecar::SidecarPool>>, pool: Option<Arc<brahman_sidecar::SidecarPool>>,
daemon_started: std::time::Instant,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Audit: peer uid lo leemos una vez aquí (no cambia durante la conexión).
let peer = peer_uid(&stream).unwrap_or(u32::MAX);
loop { loop {
let req: Request = match read_frame(&mut stream).await { let req: Request = match read_frame(&mut stream).await {
Ok(r) => r, Ok(r) => r,
Err(shipote_protocol::ProtocolError::Closed) => return Ok(()), Err(shipote_protocol::ProtocolError::Closed) => return Ok(()),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
let resp = dispatch(&mgr, &disc, &pool, req).await; audit_request(peer, &req);
let resp = dispatch(&mgr, &disc, &pool, daemon_started, req).await;
write_frame(&mut stream, &resp).await?; write_frame(&mut stream, &resp).await?;
} }
} }
/// Loguea cada mutación con target="audit" y el peer uid. Reads (ping,
/// list, stats) se omiten para no inundar el log.
fn audit_request(peer_uid: u32, req: &Request) {
let (action, detail) = match req {
Request::WorkspaceCreate { spec } => ("workspace.create", format!("label={}", spec.label)),
Request::WorkspaceStop { id, grace_ms } => ("workspace.stop", format!("id={id} grace_ms={grace_ms}")),
Request::Run { workspace, exec, restart_on_failure, .. } => (
"run",
format!("ws={workspace} exec={exec} restart={restart_on_failure}"),
),
Request::PipelineRun { spec, tap, .. } => ("pipeline.run", format!("label={} tap={tap}", spec.label)),
Request::PipelineRunSaved { name, tap, .. } => ("pipeline.run-saved", format!("name={name} tap={tap}")),
Request::PipelineStop { pipeline, grace_ms } => ("pipeline.stop", format!("id={pipeline} grace_ms={grace_ms}")),
Request::PipelineSave { name, .. } => ("pipeline.save", format!("name={name}")),
Request::PipelineDrop { name } => ("pipeline.drop", format!("name={name}")),
Request::FlowDrop { pipeline } => ("flow.drop", format!("pipeline={pipeline}")),
// Reads (no audit):
Request::Ping
| Request::Health
| Request::WorkspaceList
| Request::WorkspaceStats { .. }
| Request::WorkspaceQuota { .. }
| Request::WorkspaceStatsHistory { .. }
| Request::WorkspaceFullSummary { .. }
| Request::CommandList { .. }
| Request::CommandLogs { .. }
| Request::PipelineSavedList
| Request::FlowList
| Request::FlowThroughput
| Request::Discern { .. }
| Request::Capabilities => return,
};
info!(target: "audit", uid = peer_uid, action, detail = %detail, "audit");
}
async fn dispatch( async fn dispatch(
mgr: &Arc<WorkspaceManager>, mgr: &Arc<WorkspaceManager>,
disc: &DiscernPipeline, disc: &DiscernPipeline,
pool: &Option<Arc<brahman_sidecar::SidecarPool>>, pool: &Option<Arc<brahman_sidecar::SidecarPool>>,
daemon_started: std::time::Instant,
req: Request, req: Request,
) -> Response { ) -> Response {
match req { match req {
Request::Ping => Response::Pong, Request::Ping => Response::Pong,
Request::Health => {
let counts = mgr.health_counts().await;
Response::Health {
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_ms: daemon_started.elapsed().as_millis() as u64,
alive_workspaces: counts.alive_workspaces,
alive_commands: counts.alive_commands,
alive_pipelines: counts.alive_pipelines,
active_flows: counts.active_flows,
dirty: mgr.is_dirty(),
}
}
Request::WorkspaceCreate { spec } => match mgr.create(spec).await { Request::WorkspaceCreate { spec } => match mgr.create(spec).await {
Ok((id, warnings)) => Response::WorkspaceCreated { id, warnings }, Ok((id, warnings)) => Response::WorkspaceCreated { id, warnings },
Err(e) => Response::Error { message: format!("{e}") }, Err(e) => Response::Error { message: format!("{e}") },
@@ -148,6 +148,14 @@ pub struct CommandSummary {
pub pid: i32, pub pid: i32,
} }
#[derive(Debug, Clone, Default)]
pub struct HealthCounts {
pub alive_workspaces: u32,
pub alive_commands: u32,
pub alive_pipelines: u32,
pub active_flows: u32,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CommandInfo { pub struct CommandInfo {
pub id: Ulid, pub id: Ulid,
@@ -448,6 +456,25 @@ impl WorkspaceManager {
self.inner.lock().await.pipeline_flows.insert(pipeline, flows); self.inner.lock().await.pipeline_flows.insert(pipeline, flows);
} }
/// Snapshot de counts agregados para health endpoint.
pub async fn health_counts(&self) -> HealthCounts {
let g = self.inner.lock().await;
let alive_workspaces = g.workspaces.len() as u32;
let alive_commands: u32 = g
.workspaces
.values()
.map(|ws| ws.commands.values().filter(|c| c.alive).count() as u32)
.sum();
let alive_pipelines = g.pipeline_supervisors.len() as u32;
let active_flows: u32 = g.pipeline_flows.values().map(|v| v.len() as u32).sum();
HealthCounts {
alive_workspaces,
alive_commands,
alive_pipelines,
active_flows,
}
}
/// Lista pipelines vivos con sus sockets activos. /// Lista pipelines vivos con sus sockets activos.
pub async fn list_flow_pipelines(&self) -> Vec<(Ulid, Vec<std::path::PathBuf>)> { pub async fn list_flow_pipelines(&self) -> Vec<(Ulid, Vec<std::path::PathBuf>)> {
let g = self.inner.lock().await; let g = self.inner.lock().await;
@@ -419,6 +419,11 @@ fn spawn_splitter(
let mut buf = [0u8; 4096]; let mut buf = [0u8; 4096];
let mut total: u64 = 0; let mut total: u64 = 0;
let mut eof = false; let mut eof = false;
let mut bucket = if spec.max_bytes_per_sec > 0 {
Some(TokenBucket::new(spec.max_bytes_per_sec))
} else {
None
};
// Fase 1: sampling (sólo si tap=true) + replicación. // Fase 1: sampling (sólo si tap=true) + replicación.
while !eof && (spec.tap && sample.len() < spec.sample_bytes) { while !eof && (spec.tap && sample.len() < spec.sample_bytes) {
@@ -432,9 +437,16 @@ fn spawn_splitter(
let take = n.min(spec.sample_bytes - sample.len()); let take = n.min(spec.sample_bytes - sample.len());
sample.extend_from_slice(&buf[..take]); sample.extend_from_slice(&buf[..take]);
} }
// Token bucket: reserva ANTES de broadcast — si hay debt,
// sleep antes de mandar al subscriber.
if let Some(b) = bucket.as_mut() {
let wait = b.reserve(n as u64);
if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64; total += n as u64;
rate_limit_sleep(spec.max_bytes_per_sec, n).await;
} }
let d = if spec.tap { let d = if spec.tap {
@@ -451,9 +463,14 @@ fn spawn_splitter(
Err(_) => break, Err(_) => break,
}; };
if n == 0 { break; } if n == 0 { break; }
if let Some(b) = bucket.as_mut() {
let wait = b.reserve(n as u64);
if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await; broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64; total += n as u64;
rate_limit_sleep(spec.max_bytes_per_sec, n).await;
} }
debug!(bytes = total, consumers = writers.len(), "splitter finished"); debug!(bytes = total, consumers = writers.len(), "splitter finished");
@@ -475,16 +492,45 @@ fn spawn_splitter(
SplitterHandle { handle } SplitterHandle { handle }
} }
/// Token-bucket simple: si `max_bps > 0`, sleep `chunk_size / max_bps` /// Token-bucket real con capacidad de burst.
/// segundos. Implementación crude pero suficiente para v1. /// - `rate_bps`: tokens (bytes) por segundo de refill.
async fn rate_limit_sleep(max_bps: u64, chunk_bytes: usize) { /// - `capacity`: máx tokens acumulables. Default = 1 segundo de rate.
if max_bps == 0 { /// - `tokens`: tokens disponibles (puede negativos para "debt").
return; /// - `last_refill`: para calcular cuántos refill desde la última call.
struct TokenBucket {
rate_bps: u64,
capacity: u64,
tokens: f64,
last_refill: std::time::Instant,
}
impl TokenBucket {
fn new(rate_bps: u64) -> Self {
Self {
rate_bps,
capacity: rate_bps, // 1 second worth of burst.
tokens: rate_bps as f64,
last_refill: std::time::Instant::now(),
}
}
/// Refill desde la última call según wall time. Reserva `cost`
/// tokens; si no alcanza, retorna el sleep necesario.
fn reserve(&mut self, cost: u64) -> std::time::Duration {
let now = std::time::Instant::now();
let elapsed_secs = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed_secs * self.rate_bps as f64)
.min(self.capacity as f64);
self.last_refill = now;
self.tokens -= cost as f64;
if self.tokens >= 0.0 {
std::time::Duration::ZERO
} else {
// Debt: tiempo para recuperar a 0 tokens.
let secs_needed = -self.tokens / self.rate_bps as f64;
std::time::Duration::from_secs_f64(secs_needed)
} }
let secs = chunk_bytes as f64 / max_bps as f64;
let ms = (secs * 1000.0) as u64;
if ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
} }
} }
@@ -30,6 +30,9 @@ pub enum Request {
/// Health-check. /// Health-check.
Ping, Ping,
/// Health endpoint estructurado: versión + uptime + counts.
Health,
/// Crear un workspace nuevo. /// Crear un workspace nuevo.
WorkspaceCreate { spec: WorkspaceSpec }, WorkspaceCreate { spec: WorkspaceSpec },
@@ -145,6 +148,16 @@ pub enum Request {
pub enum Response { pub enum Response {
Pong, Pong,
Health {
version: String,
uptime_ms: u64,
alive_workspaces: u32,
alive_commands: u32,
alive_pipelines: u32,
active_flows: u32,
dirty: bool,
},
WorkspaceCreated { WorkspaceCreated {
id: WorkspaceId, id: WorkspaceId,
warnings: Vec<String>, warnings: Vec<String>,