feat(shipote): throughput card + rate-limit + snapshot incremental (fase Q)

- shipote-shell Flow channels card extiende con bytes_total + bytes/s
  por socket. Lookup helper evita borrows en closures.
- DiscernPolicy.max_bytes_per_sec: splitter task hace sleep proporcional
  al tamaño de chunk tras cada broadcast. Token-bucket simple v1.
- WorkspaceManager.dirty: AtomicBool. mark_dirty() en mutaciones que
  afectan al snapshot. save_snapshot skip si clean y path existe.
  restore_snapshot resetea dirty=false (hidratación no es mutation).

85 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 26, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 16:20:50 +00:00
parent 3486949d24
commit 18c0344a52
5 changed files with 134 additions and 25 deletions
@@ -274,6 +274,11 @@ pub struct DiscernPolicy {
/// productores con chunks de tamaño variable.
#[serde(default)]
pub replay_bytes: usize,
/// Rate-limit del flow channel (bytes/s). `0` = sin límite. Si está
/// definido, el splitter sleeps proporcional al tamaño del chunk
/// antes de re-broadcastear. Protege subscribers lentos.
#[serde(default)]
pub max_bytes_per_sec: u64,
}
impl Default for DiscernPolicy {
@@ -283,6 +288,7 @@ impl Default for DiscernPolicy {
enrich_producer: default_true(),
replay_chunks: default_replay_chunks(),
replay_bytes: 0,
max_bytes_per_sec: 0,
}
}
}
+28 -1
View File
@@ -87,6 +87,10 @@ pub enum LogStream {
pub struct WorkspaceManager {
inner: Arc<Mutex<Inner>>,
incarnator: Arc<Incarnator>,
/// True si hubo alguna mutación desde el último `save_snapshot`.
/// `save_snapshot` skip si false (snapshot incremental — evita
/// re-serialize cuando nada cambió, ej. SIGTERM tras un período idle).
dirty: std::sync::atomic::AtomicBool,
}
struct Inner {
@@ -238,9 +242,23 @@ impl WorkspaceManager {
pending_pipeline_restarts: Vec::new(),
})),
incarnator: Arc::new(Incarnator::new(cfg)),
dirty: std::sync::atomic::AtomicBool::new(false),
}
}
/// Marca el manager como dirty. Cualquier mutación que afecta al
/// snapshot debería llamar esto.
#[inline]
fn mark_dirty(&self) {
self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
}
/// True si hubo cambios desde el último `save_snapshot`. Útil para
/// chequeos cooperativos (ej. monitoring que pollea cada N).
pub fn is_dirty(&self) -> bool {
self.dirty.load(std::sync::atomic::Ordering::Relaxed)
}
/// Registra un supervisor para un pipeline con `restart_on_failure=true`.
/// El daemon llama esto tras `run_pipeline` para que `reap_dead` agregue
/// el pipeline a la cola de restart cuando algún command falle.
@@ -267,6 +285,8 @@ impl WorkspaceManager {
current_backoff_ms: initial_backoff,
},
);
drop(g);
self.mark_dirty();
}
/// Variante que preserva backoff/count del supervisor anterior (para
@@ -480,6 +500,7 @@ impl WorkspaceManager {
/// Guarda (o reemplaza) un PipelineSpec bajo `name`.
pub async fn save_pipeline(&self, name: String, spec: PipelineSpec) {
self.inner.lock().await.saved_pipelines.insert(name, spec);
self.mark_dirty();
}
/// Devuelve los nombres de los pipelines guardados.
@@ -497,7 +518,11 @@ impl WorkspaceManager {
/// Elimina un saved pipeline.
pub async fn drop_saved_pipeline(&self, name: &str) -> bool {
self.inner.lock().await.saved_pipelines.remove(name).is_some()
let existed = self.inner.lock().await.saved_pipelines.remove(name).is_some();
if existed {
self.mark_dirty();
}
existed
}
/// Label del workspace, si existe.
@@ -648,6 +673,7 @@ impl WorkspaceManager {
stats_history: std::collections::VecDeque::with_capacity(STATS_HISTORY_CAP),
};
self.inner.lock().await.workspaces.insert(id, state);
self.mark_dirty();
info!(%id, ?ttl, "workspace created");
// Si tiene TTL, programar auto-stop. El task captura un weak ref
@@ -698,6 +724,7 @@ impl WorkspaceManager {
// También limpiamos flow_channels del workspace si los hubiera —
// por workspace lo retenemos por pipeline, no por workspace.
drop(g);
self.mark_dirty();
// 1) SIGTERM (o SIGKILL si grace=0) a todos vivos.
let initial_signal = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM };
@@ -181,10 +181,18 @@ impl WorkspaceManager {
}
}
/// Escribe snapshot a disco.
/// Escribe snapshot a disco. Si `is_dirty()` es false **y** el path
/// existe (snapshot previo válido), skip la escritura.
pub async fn save_snapshot(&self, path: &Path) -> anyhow::Result<()> {
if !self.is_dirty() && path.exists() {
info!(path = %path.display(), "snapshot SKIPPED (clean)");
return Ok(());
}
let snap = self.snapshot().await;
snap.write(path)?;
// Clear dirty: lo que está en disco es el current state.
self.dirty
.store(false, std::sync::atomic::Ordering::Relaxed);
info!(path = %path.display(), workspaces = snap.workspaces.len(), "snapshot saved");
Ok(())
}
@@ -245,6 +253,11 @@ impl WorkspaceManager {
out.saved_pipelines_restored += 1;
}
out.live_pipelines = snap.live_pipelines;
// Restore no cuenta como mutación — lo que está en disco es lo
// que acabamos de cargar. Sin esto, el próximo SIGTERM siempre
// re-escribiría aunque no hubiese cambios reales.
self.dirty
.store(false, std::sync::atomic::Ordering::Relaxed);
info!(
workspaces = out.workspaces_restored,
saved_pipelines = out.saved_pipelines_restored,
@@ -304,6 +317,24 @@ mod tests {
assert!(restored_ids.contains(&id2));
}
#[tokio::test]
async fn save_snapshot_skips_when_clean() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("state.json");
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let _ = mgr.create(sample_ws("dirty-test")).await.unwrap();
assert!(mgr.is_dirty(), "create debería marcar dirty");
mgr.save_snapshot(&path).await.unwrap();
assert!(!mgr.is_dirty(), "save_snapshot debería limpiar dirty");
let mtime1 = std::fs::metadata(&path).unwrap().modified().unwrap();
// Esperamos un pelín para que mtime cambie si fuera re-escrito.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
// Segundo save sin mutación → skip.
mgr.save_snapshot(&path).await.unwrap();
let mtime2 = std::fs::metadata(&path).unwrap().modified().unwrap();
assert_eq!(mtime1, mtime2, "skip cuando clean — mtime no cambia");
}
#[tokio::test]
async fn snapshot_includes_saved_pipelines() {
use shipote_card::{CommandRef, DiscernPolicy, PipelineSpec};
@@ -132,6 +132,7 @@ pub async fn run_pipeline(
edges: edge_meta,
tap,
sample_bytes: spec.discern.sample_bytes,
max_bytes_per_sec: spec.discern.max_bytes_per_sec,
});
}
@@ -308,6 +309,9 @@ struct SplitterSpec {
edges: Vec<EdgeMeta>,
tap: bool,
sample_bytes: usize,
/// Rate-limit en bytes/s (0 = sin limit). Tras cada chunk de `n`
/// bytes, splitter sleeps `n / max_bytes_per_sec` segundos.
max_bytes_per_sec: u64,
}
struct SplitterHandle {
@@ -430,6 +434,7 @@ fn spawn_splitter(
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
rate_limit_sleep(spec.max_bytes_per_sec, n).await;
}
let d = if spec.tap {
@@ -448,6 +453,7 @@ fn spawn_splitter(
if n == 0 { break; }
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
rate_limit_sleep(spec.max_bytes_per_sec, n).await;
}
debug!(bytes = total, consumers = writers.len(), "splitter finished");
@@ -469,6 +475,19 @@ fn spawn_splitter(
SplitterHandle { handle }
}
/// Token-bucket simple: si `max_bps > 0`, sleep `chunk_size / max_bps`
/// segundos. Implementación crude pero suficiente para v1.
async fn rate_limit_sleep(max_bps: u64, chunk_bytes: usize) {
if max_bps == 0 {
return;
}
let secs = chunk_bytes as f64 / max_bps as f64;
let ms = (secs * 1000.0) as u64;
if ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
}
}
async fn broadcast_chunk(
writers: &[AsyncFd<std::os::fd::OwnedFd>],
edge_senders: &[Option<crate::flow_channel::FlowSender>],
@@ -721,6 +740,7 @@ mod tests {
enrich_producer: true,
replay_chunks: 32,
replay_bytes: 0,
max_bytes_per_sec: 0,
},
restart_on_failure: false,
restart_backoff_ms: 200,