feat(shipote): pipeline backoff + quota card + logs follow (fase M)

- PipelineSpec.restart_backoff_ms + restart_max_backoff_ms + restart_max:
  backoff exponencial entre relaunches (anti-thrash). take_pending_restarts
  aplica restart_max (0 = infinito); excedido = supervisor descartado con
  warning. Daemon hace tokio::sleep(backoff) antes del relaunch y escala
  current_backoff x2 hasta el cap.
- shipote-shell card "Quota breaches": probe extiende con WorkspaceQuota
  por workspace. Color rojo si hay breaches, verde si no.
- shipote logs --follow: poll cada 200ms al daemon, imprime suffix nuevo
  hasta que el comando termine. Sin cambios al protocolo. Best-effort:
  si el ring rota más rápido que el poll, se pierden bytes.

83 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 24, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 10:34:27 +00:00
parent 4c9d1b4c1d
commit c3f9c9e36a
7 changed files with 236 additions and 38 deletions
+75 -18
View File
@@ -69,6 +69,9 @@ enum Cmd {
/// Stream a leer: stdout | stderr | both.
#[arg(long, default_value = "both")]
stream: String,
/// Seguir el log en vivo (poll cada 200ms hasta que el comando termine).
#[arg(short = 'f', long)]
follow: bool,
},
/// Pipeline DAG con flujo tipado.
@@ -457,28 +460,82 @@ async fn main() -> Result<()> {
}
}
Cmd::Logs { workspace, command, tail, stream: which_stream } => {
Cmd::Logs { workspace, command, tail, stream: which_stream, follow } => {
let ws = parse_ws_id(&workspace)?;
let cmd_id = Ulid::from_string(&command).map_err(|e| anyhow!("invalid command id: {e}"))?;
let resp = round_trip(
&mut stream,
Request::CommandLogs {
workspace: ws,
command: cmd_id,
tail_bytes: tail,
stream: which_stream,
},
)
.await?;
match resp {
Response::CommandLogs { bytes } => {
// stdout raw, sin decoding — el log puede tener bytes binarios.
use std::io::Write;
let _ = std::io::stdout().write_all(&bytes);
if !follow {
let resp = round_trip(
&mut stream,
Request::CommandLogs {
workspace: ws,
command: cmd_id,
tail_bytes: tail,
stream: which_stream,
},
)
.await?;
match resp {
Response::CommandLogs { bytes } => {
use std::io::Write;
let _ = std::io::stdout().write_all(&bytes);
let _ = std::io::stdout().flush();
}
Response::Error { message } => return Err(anyhow!(message)),
other => print_unexpected(&other),
}
} else {
// Follow mode: poll cada 200ms. Mantenemos el último buffer
// visto; cada round imprimimos el delta (suffix nuevo).
// Limitación: si el ring rota más rápido que el poll, perdemos
// bytes — pero el comportamiento es "best effort".
use std::io::Write;
let mut prev: Vec<u8> = Vec::new();
loop {
let resp = round_trip(
&mut stream,
Request::CommandLogs {
workspace: ws,
command: cmd_id,
tail_bytes: 0,
stream: which_stream.clone(),
},
)
.await?;
let bytes = match resp {
Response::CommandLogs { bytes } => bytes,
Response::Error { message } => return Err(anyhow!(message)),
other => {
print_unexpected(&other);
break;
}
};
// Imprimir suffix nuevo si bytes es extension de prev.
if bytes.len() >= prev.len() && bytes[..prev.len()] == prev[..] {
let _ = std::io::stdout().write_all(&bytes[prev.len()..]);
} else {
// Ring rotó — reset y print todo.
let _ = std::io::stdout().write_all(&bytes);
}
let _ = std::io::stdout().flush();
prev = bytes;
// Si el comando terminó, salir tras un último read.
let list_resp = round_trip(
&mut stream,
Request::CommandList { workspace: ws },
)
.await?;
let mut still_alive = false;
if let Response::CommandList { items } = list_resp {
if let Some(c) = items.iter().find(|c| c.id == cmd_id) {
still_alive = c.alive;
}
}
if !still_alive {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
Response::Error { message } => return Err(anyhow!(message)),
other => print_unexpected(&other),
}
}
+24 -18
View File
@@ -104,23 +104,25 @@ async fn main() -> anyhow::Result<()> {
mgr.reap_dead().await;
let pending = mgr.take_pending_restarts().await;
for sup in pending {
let backoff = std::time::Duration::from_millis(sup.current_backoff_ms);
info!(
label = %sup.spec.label,
restart_count = sup.restart_count,
"pipeline restart: relaunching"
backoff_ms = sup.current_backoff_ms,
"pipeline restart: relaunching after backoff"
);
// Backoff antes del relaunch — anti-thrash.
tokio::time::sleep(backoff).await;
let inc = mgr.incarnator_handle();
let disc = std::sync::Arc::new(DiscernPipeline::default_pipeline());
let ws_label = mgr
.workspace_label(sup.spec.workspace)
.await
.unwrap_or_default();
let restart_count = sup.restart_count;
let workspace = sup.spec.workspace;
let ws_label = mgr.workspace_label(workspace).await.unwrap_or_default();
let tap = sup.tap;
let mut new_spec = sup.spec.clone();
// Mantener restart_on_failure para futuras fallas.
new_spec.restart_on_failure = true;
// Escalar el backoff para la PRÓXIMA falla.
let next_backoff = (sup.current_backoff_ms * 2)
.min(new_spec.restart_max_backoff_ms);
match shipote_core::pipeline::run_pipeline(
&new_spec,
&ws_label,
@@ -132,19 +134,23 @@ async fn main() -> anyhow::Result<()> {
.await
{
Ok(launch) => {
mgr.register_pipeline_commands(workspace, launch.pipeline, launch.command_pids.clone())
.await;
// Re-registrar supervisor con el nuevo pipeline_id,
// preservando restart_count.
let mut s = shipote_core::PipelineSupervisor {
mgr.register_pipeline_commands(
workspace,
spec: new_spec,
launch.pipeline,
launch.command_pids.clone(),
)
.await;
// Re-registrar supervisor con backoff escalado +
// restart_count preservado.
mgr.register_pipeline_supervisor_with_state(
launch.pipeline,
workspace,
new_spec,
tap,
restart_count,
};
s.restart_count = restart_count;
mgr.register_pipeline_supervisor(launch.pipeline, workspace, s.spec.clone(), tap)
.await;
sup.restart_count,
next_backoff,
)
.await;
}
Err(e) => {
warn!(?e, "pipeline restart failed");
+46 -2
View File
@@ -6,8 +6,8 @@
use gpui::{div, prelude::*, px, Context, IntoElement, Render, SharedString, Window};
use shipote_protocol::{
default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, Request, Response,
WorkspaceStatsInfo, WorkspaceSummary,
default_socket_path, read_frame, write_frame, CommandInfo, FlowInfo, QuotaReportInfo, Request,
Response, WorkspaceStatsInfo, WorkspaceSummary,
};
use std::path::PathBuf;
use std::time::Duration;
@@ -46,6 +46,8 @@ struct Shell {
flows: Vec<FlowInfo>,
/// History de RSS por workspace (últimas N samples).
stats_history: std::collections::BTreeMap<String, std::collections::VecDeque<WorkspaceStatsInfo>>,
/// Quota report fresco por workspace.
quotas: std::collections::BTreeMap<String, QuotaReportInfo>,
caps: Option<CapsSummary>,
last_probe_ms: u64,
recent_log: Option<(String, String)>,
@@ -79,6 +81,7 @@ impl Shell {
me.commands = snap.commands;
me.saved_pipelines = snap.saved_pipelines;
me.flows = snap.flows;
me.quotas = snap.quotas;
// Append a la history por workspace.
for (ws_id, fresh) in &snap.fresh_stats {
let h = me
@@ -106,6 +109,7 @@ impl Shell {
me.commands.clear();
me.saved_pipelines.clear();
me.flows.clear();
me.quotas.clear();
me.caps = None;
me.recent_log = None;
}
@@ -126,6 +130,7 @@ impl Shell {
saved_pipelines: Vec::new(),
flows: Vec::new(),
stats_history: std::collections::BTreeMap::new(),
quotas: std::collections::BTreeMap::new(),
caps: None,
last_probe_ms: 0,
recent_log: None,
@@ -141,6 +146,8 @@ struct Snapshot {
flows: Vec<FlowInfo>,
/// Stats fresco por workspace (id.toString → stats).
fresh_stats: std::collections::BTreeMap<String, WorkspaceStatsInfo>,
/// Quota report fresco por workspace.
quotas: std::collections::BTreeMap<String, QuotaReportInfo>,
caps: CapsSummary,
/// tail del log del comando más reciente (label + bytes). None si no hay.
recent_log: Option<(String, String)>,
@@ -170,6 +177,7 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
// Commands por workspace.
let mut commands_map = std::collections::BTreeMap::new();
let mut fresh_stats = std::collections::BTreeMap::new();
let mut quotas = std::collections::BTreeMap::new();
for w in &workspaces {
write_frame(&mut stream, &Request::CommandList { workspace: w.id })
.await
@@ -192,6 +200,16 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
if let Response::WorkspaceStats { info } = resp {
fresh_stats.insert(w.id.to_string(), info);
}
// Quota por workspace.
write_frame(&mut stream, &Request::WorkspaceQuota { workspace: w.id })
.await
.map_err(|e| format!("write quota: {e}"))?;
let resp: Response = read_frame(&mut stream)
.await
.map_err(|e| format!("read quota: {e}"))?;
if let Response::WorkspaceQuota { info } = resp {
quotas.insert(w.id.to_string(), info);
}
}
// Saved pipelines.
@@ -294,6 +312,7 @@ fn probe_blocking(path: &std::path::Path) -> Result<Snapshot, String> {
saved_pipelines,
flows,
fresh_stats,
quotas,
caps,
recent_log,
})
@@ -455,6 +474,21 @@ impl Render for Shell {
"definiciones reusables vía run-saved".to_string()
};
// Quota breaches por workspace.
let mut breach_items: Vec<String> = Vec::new();
for (ws_id, q) in &self.quotas {
for b in &q.breaches {
let short = &ws_id[20..];
breach_items.push(format!("{short} {b}"));
}
}
let breach_count = breach_items.len().to_string();
let breach_descr = if breach_items.is_empty() {
"todos los workspaces dentro de quota".to_string()
} else {
"ws_suffix · recurso · uso > limit".to_string()
};
// Flow channels (data plane).
let flow_count: usize = self.flows.iter().map(|f| f.sockets.len()).sum();
let flow_items: Vec<String> = self
@@ -547,6 +581,16 @@ impl Render for Shell {
text,
text_dim,
&flow_items,
))
.child(stat_card(
cx,
"Quota breaches",
breach_count,
&breach_descr,
if breach_items.is_empty() { accent_up } else { accent_down },
text,
text_dim,
&breach_items,
));
// Live tail del comando más reciente con output.