feat(shipote): throughput + stats persistente + auth peer (fase P)

- FlowMeter (atomic u64 + rolling window 32 samples) en cada FlowChannel.
  flow_throughput() → (socket, bytes_total, bytes_per_sec). CLI:
  shipote flow throughput. Idle threshold 5s = rate 0.0.
- Snapshot v4 con stats_history persistente por workspace (cap 16).
  PersistedStats separado para evitar Instant. Restore hidrata el VecDeque
  con source="persisted".
- Auth SO_PEERCRED: daemon rechaza peers con uid distinto al propio.
  SHIPOTE_TRUST_ANYONE=1 = escape hatch documentado.

84 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 25, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 13:58:41 +00:00
parent 1cce50b290
commit 3486949d24
8 changed files with 273 additions and 12 deletions
@@ -47,9 +47,77 @@ pub struct FlowChannel {
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_caps: ReplayCaps,
socket_path: PathBuf,
meter: Arc<FlowMeter>,
_accept_handle: AbortOnDrop,
}
/// Contador de bytes y rate (bytes/s ventana 1s).
#[derive(Debug)]
pub struct FlowMeter {
/// Bytes acumulados desde la creación del FlowChannel.
total_bytes: std::sync::atomic::AtomicU64,
/// Ring buffer de (timestamp_ms, bytes_acumulados) para calcular
/// el rate sobre los últimos N samples.
rate_window: Mutex<VecDeque<(u64, u64)>>,
}
const RATE_WINDOW_SAMPLES: usize = 32;
impl FlowMeter {
fn new() -> Self {
Self {
total_bytes: std::sync::atomic::AtomicU64::new(0),
rate_window: Mutex::new(VecDeque::with_capacity(RATE_WINDOW_SAMPLES)),
}
}
fn record(&self, delta: u64) {
let now = self.total_bytes
.fetch_add(delta, std::sync::atomic::Ordering::Relaxed)
+ delta;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if let Ok(mut w) = self.rate_window.lock() {
if w.len() >= RATE_WINDOW_SAMPLES {
w.pop_front();
}
w.push_back((ts, now));
}
}
/// Bytes totales acumulados desde la creación.
pub fn total_bytes(&self) -> u64 {
self.total_bytes.load(std::sync::atomic::Ordering::Relaxed)
}
/// Bytes por segundo (rolling sobre la ventana). 0 si no hay
/// historia suficiente o si el último sample es muy viejo (>5s).
pub fn bytes_per_sec(&self) -> f64 {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let w = match self.rate_window.lock() {
Ok(w) => w,
Err(_) => return 0.0,
};
if w.len() < 2 {
return 0.0;
}
let last = w.back().copied().unwrap();
// Si el último sample tiene >5s, asumimos idle.
if now_ms.saturating_sub(last.0) > 5000 {
return 0.0;
}
let first = w.front().copied().unwrap();
let dt_ms = last.0.saturating_sub(first.0).max(1);
let d_bytes = last.1.saturating_sub(first.1);
(d_bytes as f64 * 1000.0) / dt_ms as f64
}
}
#[derive(Debug, Clone, Copy)]
pub struct ReplayCaps {
/// Máximo de chunks retenidos.
@@ -72,6 +140,7 @@ pub struct FlowSender {
sender: broadcast::Sender<Arc<Vec<u8>>>,
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_caps: ReplayCaps,
meter: Arc<FlowMeter>,
}
impl FlowSender {
@@ -85,6 +154,7 @@ impl FlowSender {
evict_for_incoming(&mut g, caps, incoming);
g.push_back(data.clone());
}
self.meter.record(incoming as u64);
let _ = self.sender.send(data);
}
}
@@ -195,10 +265,15 @@ impl FlowChannel {
replay,
replay_caps: caps,
socket_path,
meter: Arc::new(FlowMeter::new()),
_accept_handle: AbortOnDrop(join.abort_handle()),
})
}
pub fn meter(&self) -> &FlowMeter {
&self.meter
}
/// Push un chunk al channel. Si no hay subscribers, drop silencioso.
/// Siempre se guarda en el replay buffer (con cap rotation por chunks
/// y opcionalmente por bytes).
@@ -210,6 +285,7 @@ impl FlowChannel {
evict_for_incoming(&mut g, caps, incoming);
g.push_back(arc.clone());
}
self.meter.record(incoming as u64);
let _ = self.sender.send(arc);
}
@@ -219,12 +295,13 @@ impl FlowChannel {
/// Handle clone-able para que tasks externas (splitter) pushen al
/// channel sin tener ownership del FlowChannel. Cada push se guarda
/// también en el replay buffer.
/// también en el replay buffer y se contabiliza en el meter.
pub fn sender_handle(&self) -> FlowSender {
FlowSender {
sender: self.sender.clone(),
replay: self.replay.clone(),
replay_caps: self.replay_caps,
meter: self.meter.clone(),
}
}
@@ -442,6 +442,22 @@ impl WorkspaceManager {
.collect()
}
/// Throughput per-socket: bytes_total + bytes_per_sec por flow socket.
pub async fn flow_throughput(&self) -> Vec<(std::path::PathBuf, u64, f64)> {
let g = self.inner.lock().await;
let mut out = Vec::new();
for flows in g.pipeline_flows.values() {
for fc in flows {
out.push((
fc.socket_path().to_path_buf(),
fc.meter().total_bytes(),
fc.meter().bytes_per_sec(),
));
}
}
out
}
/// Cierra el data plane de un pipeline (drop = remove_file de sockets).
pub async fn drop_pipeline_flows(&self, pipeline: Ulid) -> bool {
self.inner.lock().await.pipeline_flows.remove(&pipeline).is_some()
@@ -10,10 +10,10 @@ use shipote_card::{PipelineSpec, WorkspaceId, WorkspaceSpec};
use std::path::{Path, PathBuf};
use tracing::{info, warn};
/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines` (pipelines
/// con supervisor vivo al momento del snapshot — el daemon los relanza
/// al restore). Versiones inferiores leen campos ausentes como vacío.
pub const SNAPSHOT_VERSION: u16 = 3;
/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines`. v4 agrega
/// `stats_history` por workspace (sparkline survives daemon restart).
/// Versiones inferiores leen campos ausentes como vacío.
pub const SNAPSHOT_VERSION: u16 = 4;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShipoteSnapshot {
@@ -32,6 +32,37 @@ pub struct ShipoteSnapshot {
pub struct WorkspaceEntry {
pub id: WorkspaceId,
pub spec: WorkspaceSpec,
/// Stats history persistida — cap reasonable para no inflar el JSON.
/// Sólo se guardan campos serializables (no Instant).
#[serde(default)]
pub stats_history: Vec<PersistedStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedStats {
pub commands_alive: u32,
pub commands_total: u32,
pub rss_bytes: Option<u64>,
pub rss_peak_bytes: Option<u64>,
pub cpu_usec: Option<u64>,
pub cpu_percent: Option<f32>,
pub cpu_cores: u32,
pub uptime_ms: u64,
}
impl From<&crate::stats::WorkspaceStats> for PersistedStats {
fn from(s: &crate::stats::WorkspaceStats) -> Self {
Self {
commands_alive: s.commands_alive,
commands_total: s.commands_total,
rss_bytes: s.rss_bytes,
rss_peak_bytes: s.rss_peak_bytes,
cpu_usec: s.cpu_usec,
cpu_percent: s.cpu_percent,
cpu_cores: s.cpu_cores,
uptime_ms: s.uptime_ms,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -98,13 +129,27 @@ fn now_ms() -> u64 {
impl WorkspaceManager {
/// Toma snapshot del estado actual.
pub async fn snapshot(&self) -> ShipoteSnapshot {
const PERSIST_STATS_CAP: usize = 16;
let g = self.inner.lock().await;
let workspaces = g
.workspaces
.iter()
.map(|(id, ws)| WorkspaceEntry {
id: *id,
spec: ws.spec.clone(),
.map(|(id, ws)| {
// Persist sólo los últimos N samples — el resto crece
// y el JSON se infla.
let take = ws.stats_history.len().min(PERSIST_STATS_CAP);
let skip = ws.stats_history.len() - take;
let stats_history: Vec<PersistedStats> = ws
.stats_history
.iter()
.skip(skip)
.map(PersistedStats::from)
.collect();
WorkspaceEntry {
id: *id,
spec: ws.spec.clone(),
stats_history,
}
})
.collect();
let saved_pipelines = g
@@ -165,8 +210,33 @@ impl WorkspaceManager {
// v2+: reusamos el id original así clients que tracking
// workspace_id no se rompen al restart.
let label = entry.spec.label.clone();
match self.create_with_id(entry.id, entry.spec).await {
Ok(_) => out.workspaces_restored += 1,
let id = entry.id;
let history = entry.stats_history;
match self.create_with_id(id, entry.spec).await {
Ok(_) => {
out.workspaces_restored += 1;
// Hidratar history persistida. Convertimos
// PersistedStats → WorkspaceStats (perdemos
// los campos no serializables como `source`).
if !history.is_empty() {
let mut g = self.inner.lock().await;
if let Some(ws) = g.workspaces.get_mut(&id) {
for ps in history {
ws.stats_history.push_back(crate::stats::WorkspaceStats {
commands_alive: ps.commands_alive,
commands_total: ps.commands_total,
rss_bytes: ps.rss_bytes,
rss_peak_bytes: ps.rss_peak_bytes,
cpu_usec: ps.cpu_usec,
cpu_percent: ps.cpu_percent,
cpu_cores: ps.cpu_cores,
source: "persisted".into(),
uptime_ms: ps.uptime_ms,
});
}
}
}
}
Err(e) => warn!(?e, %label, "skipped workspace en restore"),
}
}
@@ -134,6 +134,9 @@ pub enum Request {
/// Listar pipelines activos con sus flow channels (data plane).
FlowList,
/// Throughput por flow socket: bytes_total + bytes_per_sec.
FlowThroughput,
/// Cerrar el data plane de un pipeline (drop sockets + canales).
FlowDrop { pipeline: Ulid },
}
@@ -233,6 +236,10 @@ pub enum Response {
items: Vec<FlowInfo>,
},
FlowThroughput {
items: Vec<FlowThroughputInfo>,
},
FlowDropped {
pipeline: Ulid,
existed: bool,
@@ -270,6 +277,13 @@ fn default_cpu_cores() -> u32 {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowThroughputInfo {
pub socket: PathBuf,
pub bytes_total: u64,
pub bytes_per_sec: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowInfo {
pub pipeline: Ulid,