From c3f9c9e36a758ad07434939dfa9bb214aff4acfb Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 10:34:27 +0000 Subject: [PATCH] feat(shipote): pipeline backoff + quota card + logs follow (fase M) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PipelineSpec.restart_backoff_ms + restart_max_backoff_ms + restart_max: backoff exponencial entre relaunches (anti-thrash). take_pending_restarts aplica restart_max (0 = infinito); excedido = supervisor descartado con warning. Daemon hace tokio::sleep(backoff) antes del relaunch y escala current_backoff x2 hasta el cap. - shipote-shell card "Quota breaches": probe extiende con WorkspaceQuota por workspace. Color rojo si hay breaches, verde si no. - shipote logs --follow: poll cada 200ms al daemon, imprime suffix nuevo hasta que el comando termine. Sin cambios al protocolo. Best-effort: si el ring rota más rápido que el poll, se pierden bytes. 83 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 24, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shipote-cli/src/main.rs | 93 +++++++++++++++---- crates/apps/shipote-daemon/src/main.rs | 42 +++++---- crates/apps/shipote-shell/src/main.rs | 48 +++++++++- .../modules/shipote/shipote-card/src/lib.rs | 28 ++++++ .../modules/shipote/shipote-core/src/lib.rs | 48 ++++++++++ .../shipote/shipote-core/src/persist.rs | 3 + .../shipote/shipote-core/src/pipeline.rs | 12 +++ 7 files changed, 236 insertions(+), 38 deletions(-) diff --git a/crates/apps/shipote-cli/src/main.rs b/crates/apps/shipote-cli/src/main.rs index 632e33c..94a8865 100644 --- a/crates/apps/shipote-cli/src/main.rs +++ b/crates/apps/shipote-cli/src/main.rs @@ -69,6 +69,9 @@ enum Cmd { /// Stream a leer: stdout | stderr | both. #[arg(long, default_value = "both")] stream: String, + /// Seguir el log en vivo (poll cada 200ms hasta que el comando termine). + #[arg(short = 'f', long)] + follow: bool, }, /// Pipeline DAG con flujo tipado. @@ -457,28 +460,82 @@ async fn main() -> Result<()> { } } - Cmd::Logs { workspace, command, tail, stream: which_stream } => { + Cmd::Logs { workspace, command, tail, stream: which_stream, follow } => { let ws = parse_ws_id(&workspace)?; let cmd_id = Ulid::from_string(&command).map_err(|e| anyhow!("invalid command id: {e}"))?; - let resp = round_trip( - &mut stream, - Request::CommandLogs { - workspace: ws, - command: cmd_id, - tail_bytes: tail, - stream: which_stream, - }, - ) - .await?; - match resp { - Response::CommandLogs { bytes } => { - // stdout raw, sin decoding — el log puede tener bytes binarios. - use std::io::Write; - let _ = std::io::stdout().write_all(&bytes); + if !follow { + let resp = round_trip( + &mut stream, + Request::CommandLogs { + workspace: ws, + command: cmd_id, + tail_bytes: tail, + stream: which_stream, + }, + ) + .await?; + match resp { + Response::CommandLogs { bytes } => { + use std::io::Write; + let _ = std::io::stdout().write_all(&bytes); + let _ = std::io::stdout().flush(); + } + Response::Error { message } => return Err(anyhow!(message)), + other => print_unexpected(&other), + } + } else { + // Follow mode: poll cada 200ms. Mantenemos el último buffer + // visto; cada round imprimimos el delta (suffix nuevo). + // Limitación: si el ring rota más rápido que el poll, perdemos + // bytes — pero el comportamiento es "best effort". + use std::io::Write; + let mut prev: Vec = Vec::new(); + loop { + let resp = round_trip( + &mut stream, + Request::CommandLogs { + workspace: ws, + command: cmd_id, + tail_bytes: 0, + stream: which_stream.clone(), + }, + ) + .await?; + let bytes = match resp { + Response::CommandLogs { bytes } => bytes, + Response::Error { message } => return Err(anyhow!(message)), + other => { + print_unexpected(&other); + break; + } + }; + // Imprimir suffix nuevo si bytes es extension de prev. + if bytes.len() >= prev.len() && bytes[..prev.len()] == prev[..] { + let _ = std::io::stdout().write_all(&bytes[prev.len()..]); + } else { + // Ring rotó — reset y print todo. + let _ = std::io::stdout().write_all(&bytes); + } let _ = std::io::stdout().flush(); + prev = bytes; + + // Si el comando terminó, salir tras un último read. + let list_resp = round_trip( + &mut stream, + Request::CommandList { workspace: ws }, + ) + .await?; + let mut still_alive = false; + if let Response::CommandList { items } = list_resp { + if let Some(c) = items.iter().find(|c| c.id == cmd_id) { + still_alive = c.alive; + } + } + if !still_alive { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(200)).await; } - Response::Error { message } => return Err(anyhow!(message)), - other => print_unexpected(&other), } } diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index 939fec3..14f76f0 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -104,23 +104,25 @@ async fn main() -> anyhow::Result<()> { mgr.reap_dead().await; let pending = mgr.take_pending_restarts().await; for sup in pending { + let backoff = std::time::Duration::from_millis(sup.current_backoff_ms); info!( label = %sup.spec.label, restart_count = sup.restart_count, - "pipeline restart: relaunching" + backoff_ms = sup.current_backoff_ms, + "pipeline restart: relaunching after backoff" ); + // Backoff antes del relaunch — anti-thrash. + tokio::time::sleep(backoff).await; let inc = mgr.incarnator_handle(); let disc = std::sync::Arc::new(DiscernPipeline::default_pipeline()); - let ws_label = mgr - .workspace_label(sup.spec.workspace) - .await - .unwrap_or_default(); - let restart_count = sup.restart_count; let workspace = sup.spec.workspace; + let ws_label = mgr.workspace_label(workspace).await.unwrap_or_default(); let tap = sup.tap; let mut new_spec = sup.spec.clone(); - // Mantener restart_on_failure para futuras fallas. new_spec.restart_on_failure = true; + // Escalar el backoff para la PRÓXIMA falla. + let next_backoff = (sup.current_backoff_ms * 2) + .min(new_spec.restart_max_backoff_ms); match shipote_core::pipeline::run_pipeline( &new_spec, &ws_label, @@ -132,19 +134,23 @@ async fn main() -> anyhow::Result<()> { .await { Ok(launch) => { - mgr.register_pipeline_commands(workspace, launch.pipeline, launch.command_pids.clone()) - .await; - // Re-registrar supervisor con el nuevo pipeline_id, - // preservando restart_count. - let mut s = shipote_core::PipelineSupervisor { + mgr.register_pipeline_commands( workspace, - spec: new_spec, + launch.pipeline, + launch.command_pids.clone(), + ) + .await; + // Re-registrar supervisor con backoff escalado + + // restart_count preservado. + mgr.register_pipeline_supervisor_with_state( + launch.pipeline, + workspace, + new_spec, tap, - restart_count, - }; - s.restart_count = restart_count; - mgr.register_pipeline_supervisor(launch.pipeline, workspace, s.spec.clone(), tap) - .await; + sup.restart_count, + next_backoff, + ) + .await; } Err(e) => { warn!(?e, "pipeline restart failed"); diff --git a/crates/apps/shipote-shell/src/main.rs b/crates/apps/shipote-shell/src/main.rs index d8bf01a..0bf35bc 100644 --- a/crates/apps/shipote-shell/src/main.rs +++ b/crates/apps/shipote-shell/src/main.rs @@ -6,8 +6,8 @@ use gpui::{div, prelude::*, px, Context, IntoElement, Render, SharedString, Window}; use shipote_protocol::{ - default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, Request, Response, - WorkspaceStatsInfo, WorkspaceSummary, + default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, QuotaReportInfo, Request, + Response, WorkspaceStatsInfo, WorkspaceSummary, }; use std::path::PathBuf; use std::time::Duration; @@ -46,6 +46,8 @@ struct Shell { flows: Vec, /// History de RSS por workspace (últimas N samples). stats_history: std::collections::BTreeMap>, + /// Quota report fresco por workspace. + quotas: std::collections::BTreeMap, caps: Option, last_probe_ms: u64, recent_log: Option<(String, String)>, @@ -79,6 +81,7 @@ impl Shell { me.commands = snap.commands; me.saved_pipelines = snap.saved_pipelines; me.flows = snap.flows; + me.quotas = snap.quotas; // Append a la history por workspace. for (ws_id, fresh) in &snap.fresh_stats { let h = me @@ -106,6 +109,7 @@ impl Shell { me.commands.clear(); me.saved_pipelines.clear(); me.flows.clear(); + me.quotas.clear(); me.caps = None; me.recent_log = None; } @@ -126,6 +130,7 @@ impl Shell { saved_pipelines: Vec::new(), flows: Vec::new(), stats_history: std::collections::BTreeMap::new(), + quotas: std::collections::BTreeMap::new(), caps: None, last_probe_ms: 0, recent_log: None, @@ -141,6 +146,8 @@ struct Snapshot { flows: Vec, /// Stats fresco por workspace (id.toString → stats). fresh_stats: std::collections::BTreeMap, + /// Quota report fresco por workspace. + quotas: std::collections::BTreeMap, caps: CapsSummary, /// tail del log del comando más reciente (label + bytes). None si no hay. recent_log: Option<(String, String)>, @@ -170,6 +177,7 @@ fn probe_blocking(path: &std::path::Path) -> Result { // Commands por workspace. let mut commands_map = std::collections::BTreeMap::new(); let mut fresh_stats = std::collections::BTreeMap::new(); + let mut quotas = std::collections::BTreeMap::new(); for w in &workspaces { write_frame(&mut stream, &Request::CommandList { workspace: w.id }) .await @@ -192,6 +200,16 @@ fn probe_blocking(path: &std::path::Path) -> Result { if let Response::WorkspaceStats { info } = resp { fresh_stats.insert(w.id.to_string(), info); } + // Quota por workspace. + write_frame(&mut stream, &Request::WorkspaceQuota { workspace: w.id }) + .await + .map_err(|e| format!("write quota: {e}"))?; + let resp: Response = read_frame(&mut stream) + .await + .map_err(|e| format!("read quota: {e}"))?; + if let Response::WorkspaceQuota { info } = resp { + quotas.insert(w.id.to_string(), info); + } } // Saved pipelines. @@ -294,6 +312,7 @@ fn probe_blocking(path: &std::path::Path) -> Result { saved_pipelines, flows, fresh_stats, + quotas, caps, recent_log, }) @@ -455,6 +474,21 @@ impl Render for Shell { "definiciones reusables vía run-saved".to_string() }; + // Quota breaches por workspace. + let mut breach_items: Vec = Vec::new(); + for (ws_id, q) in &self.quotas { + for b in &q.breaches { + let short = &ws_id[20..]; + breach_items.push(format!("{short} {b}")); + } + } + let breach_count = breach_items.len().to_string(); + let breach_descr = if breach_items.is_empty() { + "todos los workspaces dentro de quota".to_string() + } else { + "ws_suffix · recurso · uso > limit".to_string() + }; + // Flow channels (data plane). let flow_count: usize = self.flows.iter().map(|f| f.sockets.len()).sum(); let flow_items: Vec = self @@ -547,6 +581,16 @@ impl Render for Shell { text, text_dim, &flow_items, + )) + .child(stat_card( + cx, + "Quota breaches", + breach_count, + &breach_descr, + if breach_items.is_empty() { accent_up } else { accent_down }, + text, + text_dim, + &breach_items, )); // Live tail del comando más reciente con output. diff --git a/crates/modules/shipote/shipote-card/src/lib.rs b/crates/modules/shipote/shipote-card/src/lib.rs index 19873e1..64a04f7 100644 --- a/crates/modules/shipote/shipote-card/src/lib.rs +++ b/crates/modules/shipote/shipote-card/src/lib.rs @@ -222,6 +222,25 @@ pub struct PipelineSpec { /// Útil para pipelines de procesamiento continuo. #[serde(default)] pub restart_on_failure: bool, + /// Backoff inicial entre restarts (ms). Crece exponencialmente + /// hasta `restart_max_backoff_ms`. Default 200ms = ~5 restarts/s + /// inicial, escalando rápido. + #[serde(default = "default_restart_backoff")] + pub restart_backoff_ms: u64, + /// Backoff máximo (ms). Default 30s. El backoff no crece más allá. + #[serde(default = "default_restart_max_backoff")] + pub restart_max_backoff_ms: u64, + /// Máximo de restarts antes de dar up. `0` = infinito. Default 0. + /// Útil para fail-loud cuando un pipeline siempre falla. + #[serde(default)] + pub restart_max: u32, +} + +fn default_restart_backoff() -> u64 { + 200 +} +fn default_restart_max_backoff() -> u64 { + 30_000 } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -502,6 +521,9 @@ mod subst_tests { edges: vec![], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let out = substitute_vars(&spec, &vars).unwrap(); assert_eq!(out.label, "p-renamed"); @@ -522,6 +544,9 @@ mod subst_tests { edges: vec![], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let out = substitute_vars(&spec, &vars).unwrap(); assert_eq!(out.label, "p-${UNDEFINED}"); @@ -603,6 +628,9 @@ mod tests { }], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; assert!(p.validate().is_err()); } diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 9673dc3..15cc29c 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -114,6 +114,8 @@ pub struct PipelineSupervisor { pub spec: PipelineSpec, pub tap: bool, pub restart_count: u32, + /// Backoff actual (ms) — escala exponencialmente con cada restart. + pub current_backoff_ms: u64, } #[derive(Debug, Clone)] @@ -248,6 +250,7 @@ impl WorkspaceManager { } tracing::debug!(%pipeline_id, label = %spec.label, "pipeline supervisor registered"); let mut g = self.inner.lock().await; + let initial_backoff = spec.restart_backoff_ms.max(50); g.pipeline_supervisors.insert( pipeline_id, PipelineSupervisor { @@ -255,18 +258,60 @@ impl WorkspaceManager { spec, tap, restart_count: 0, + current_backoff_ms: initial_backoff, + }, + ); + } + + /// Variante que preserva backoff/count del supervisor anterior (para + /// re-registrar tras un restart sin perder el throttle acumulado). + pub async fn register_pipeline_supervisor_with_state( + &self, + pipeline_id: Ulid, + workspace: WorkspaceId, + spec: PipelineSpec, + tap: bool, + restart_count: u32, + current_backoff_ms: u64, + ) { + if !spec.restart_on_failure { + return; + } + let mut g = self.inner.lock().await; + g.pipeline_supervisors.insert( + pipeline_id, + PipelineSupervisor { + workspace, + spec, + tap, + restart_count, + current_backoff_ms, }, ); } /// Drena la cola de pipelines pendientes de restart y retorna las /// specs a relaunch. El daemon lo llama tras cada `reap_dead`. + /// + /// Aplica `restart_max`: si el supervisor ya pasó el límite, no se + /// retorna y el supervisor se elimina (give-up). El backoff + /// preserva el valor actual; el daemon decide cuándo aplicar el + /// sleep antes del relaunch. pub async fn take_pending_restarts(&self) -> Vec { let mut g = self.inner.lock().await; let pending = std::mem::take(&mut g.pending_pipeline_restarts); let mut out = Vec::with_capacity(pending.len()); for old_id in pending { if let Some(mut sup) = g.pipeline_supervisors.remove(&old_id) { + if sup.spec.restart_max > 0 && sup.restart_count >= sup.spec.restart_max { + tracing::warn!( + label = %sup.spec.label, + restart_count = sup.restart_count, + max = sup.spec.restart_max, + "pipeline restart_max reached — giving up" + ); + continue; // no relaunch, supervisor discarded. + } sup.restart_count += 1; out.push(sup); } @@ -1254,6 +1299,9 @@ mod tests { edges: vec![], discern: DiscernPolicy::default(), restart_on_failure: true, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let pipeline_id = ulid::Ulid::new(); // Simulamos lo que haría el daemon: registramos un comando como diff --git a/crates/modules/shipote/shipote-core/src/persist.rs b/crates/modules/shipote/shipote-core/src/persist.rs index ca8f862..86eb14c 100644 --- a/crates/modules/shipote/shipote-core/src/persist.rs +++ b/crates/modules/shipote/shipote-core/src/persist.rs @@ -210,6 +210,9 @@ mod tests { edges: vec![], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; mgr1.save_pipeline("daily".into(), spec).await; mgr1.save_snapshot(&path).await.unwrap(); diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index 150c158..e449647 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -595,6 +595,9 @@ mod tests { }], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -633,6 +636,9 @@ mod tests { ], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -670,6 +676,9 @@ mod tests { ], discern: DiscernPolicy::default(), restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -702,6 +711,9 @@ mod tests { replay_bytes: 0, }, restart_on_failure: false, + restart_backoff_ms: 200, + restart_max_backoff_ms: 30_000, + restart_max: 0, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));