feat(shipote): quota enforce + cgroup memory.max + pipeline restart (fase L)

- WorkspaceSpec.quota_enforce: QuotaAction (None|Log|Kill) por recurso
  (mem, nproc). reap_dead aplica policy; Kill usa stop_with_grace(ZERO).
- ente_incarnate::cgroup::apply_rlimits_to_cgroup escribe memory.max y
  pids.max. WorkspaceManager::create_with_id lo invoca si soma.cgroup.path
  y delegation. Kernel hace OOM kill al exceder; falla silenciosa si no
  hay delegation.
- PipelineSpec.restart_on_failure: bool. register_pipeline_supervisor
  retiene spec; reap_dead detecta all-dead + any-failed → push a queue;
  daemon reaper drena y relanza pipeline ENTERO (los pipes intermedios
  no permiten restart parcial).

82 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8,
shipote-core 24, shipote-discern 5, yahweh-provider-fs 3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 10:22:46 +00:00
parent 324a0c2d5d
commit 4c9d1b4c1d
7 changed files with 401 additions and 5 deletions
+276 -1
View File
@@ -99,6 +99,21 @@ struct Inner {
/// con la misma spec (nuevo pid y nuevo command_id se asigna por
/// el nuevo state pero el restart_spec sigue ligado al original).
restart_specs: HashMap<Ulid, RestartSpec>,
/// Supervisores de pipelines con `restart_on_failure`. Indexed por
/// pipeline_id. Cuando `reap_dead` detecta que el pipeline tuvo
/// algún command failure, agrega un entry a `pending_pipeline_restarts`.
pipeline_supervisors: HashMap<Ulid, PipelineSupervisor>,
/// Cola de pipelines pendientes de restart. El daemon la drena en
/// cada loop del reaper, hace stop + run_pipeline.
pending_pipeline_restarts: Vec<Ulid>,
}
#[derive(Debug, Clone)]
pub struct PipelineSupervisor {
pub workspace: WorkspaceId,
pub spec: PipelineSpec,
pub tap: bool,
pub restart_count: u32,
}
#[derive(Debug, Clone)]
@@ -131,6 +146,19 @@ pub struct CommandInfo {
pub log_bytes: u64,
}
/// Lee VmRSS (bytes) de `/proc/<pid>/status`. Helper local para
/// reap_dead que no necesita el full stats. Devuelve 0 si el proc no
/// existe o el campo no aparece.
fn read_proc_rss(pid: i32) -> Option<u64> {
let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
status
.lines()
.find_map(|l| l.strip_prefix("VmRSS:").map(str::trim))
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse::<u64>().ok())
.map(|kb| kb * 1024)
}
fn spawn_log_drainer(read_fd: std::os::fd::RawFd, logs: logbuf::LogBuf) {
// Marcar non-blocking + envolver en AsyncFd; igual patrón que el tap.
// SAFETY: F_SETFL sobre fd válido.
@@ -198,11 +226,54 @@ impl WorkspaceManager {
saved_pipelines: HashMap::new(),
pipeline_flows: HashMap::new(),
restart_specs: HashMap::new(),
pipeline_supervisors: HashMap::new(),
pending_pipeline_restarts: Vec::new(),
})),
incarnator: Arc::new(Incarnator::new(cfg)),
}
}
/// Registra un supervisor para un pipeline con `restart_on_failure=true`.
/// El daemon llama esto tras `run_pipeline` para que `reap_dead` agregue
/// el pipeline a la cola de restart cuando algún command falle.
pub async fn register_pipeline_supervisor(
&self,
pipeline_id: Ulid,
workspace: WorkspaceId,
spec: PipelineSpec,
tap: bool,
) {
if !spec.restart_on_failure {
return;
}
tracing::debug!(%pipeline_id, label = %spec.label, "pipeline supervisor registered");
let mut g = self.inner.lock().await;
g.pipeline_supervisors.insert(
pipeline_id,
PipelineSupervisor {
workspace,
spec,
tap,
restart_count: 0,
},
);
}
/// Drena la cola de pipelines pendientes de restart y retorna las
/// specs a relaunch. El daemon lo llama tras cada `reap_dead`.
pub async fn take_pending_restarts(&self) -> Vec<PipelineSupervisor> {
let mut g = self.inner.lock().await;
let pending = std::mem::take(&mut g.pending_pipeline_restarts);
let mut out = Vec::with_capacity(pending.len());
for old_id in pending {
if let Some(mut sup) = g.pipeline_supervisors.remove(&old_id) {
sup.restart_count += 1;
out.push(sup);
}
}
out
}
/// Registra los comandos lanzados por un pipeline en el workspace.
/// Esto permite `pipeline_stop` (matar selectivamente sólo los pids
/// de un pipeline). `pipeline_id` se setea en cada CommandState.
@@ -465,8 +536,22 @@ impl WorkspaceManager {
spec: WorkspaceSpec,
) -> Result<(WorkspaceId, Vec<String>), CoreError> {
let card = spec.to_card(id)?;
let warnings = self.incarnator.dry_run(&card).warnings;
let mut warnings = self.incarnator.dry_run(&card).warnings;
let ttl = spec.ttl;
// Si el workspace declara cgroup path Y rlimits, intentamos
// crear el cgroup y escribir memory.max/pids.max. El kernel
// hace OOM kill al exceder memory.max — enforcement automático
// sin policy adicional. Falla silenciosa si no hay delegation.
if !spec.soma.cgroup.path.is_empty() {
if let Ok(abs) = ente_incarnate::cgroup::ensure_cgroup(&spec.soma.cgroup) {
let applied =
ente_incarnate::cgroup::apply_rlimits_to_cgroup(&abs, &spec.soma.rlimits);
if !applied.is_empty() {
warnings.push(format!("cgroup limits applied: {}", applied.join(", ")));
}
}
}
let state = WorkspaceState {
id,
spec,
@@ -778,6 +863,7 @@ impl WorkspaceManager {
/// el daemon o ante SIGCHLD. Marca `alive=false` y guarda exit_status.
pub async fn reap_dead(self: &Arc<Self>) {
let mut to_restart: Vec<RestartSpec> = Vec::new();
let mut to_enforce_kill: Vec<WorkspaceId> = Vec::new();
{
let mut g = self.inner.lock().await;
for ws in g.workspaces.values_mut() {
@@ -798,6 +884,93 @@ impl WorkspaceManager {
}
}
}
// Quota enforcement: chequear breach por workspace y aplicar policy.
// Lo hacemos dentro del mismo lock para tener una lectura
// consistente; el kill real va fuera del lock.
for (ws_id, ws) in g.workspaces.iter() {
let rl = &ws.spec.soma.rlimits;
let qe = &ws.spec.quota_enforce;
// Sólo aplicamos si hay al menos una action != None.
if qe.mem == shipote_card::QuotaAction::None
&& qe.nproc == shipote_card::QuotaAction::None
{
continue;
}
// Medir RSS y nproc vivos sin pasar por workspace_stats
// (que tomaría el lock recursivo). Hacemos un read directo.
let alive: Vec<i32> = ws
.commands
.values()
.filter(|c| c.alive)
.map(|c| c.pid.as_raw())
.collect();
let nproc_alive = alive.len() as u32;
let mem_used: u64 = alive
.iter()
.filter_map(|pid| read_proc_rss(*pid))
.sum();
let mem_breach = matches!(rl.mem_bytes, Some(limit) if mem_used > limit);
let nproc_breach = matches!(rl.nproc, Some(limit) if nproc_alive > limit);
let mut kill_needed = false;
if mem_breach {
match qe.mem {
shipote_card::QuotaAction::Log => {
warn!(%ws_id, used = mem_used, limit = ?rl.mem_bytes, "quota breach: memory");
}
shipote_card::QuotaAction::Kill => {
warn!(%ws_id, used = mem_used, limit = ?rl.mem_bytes, "quota breach: KILLING");
kill_needed = true;
}
_ => {}
}
}
if nproc_breach {
match qe.nproc {
shipote_card::QuotaAction::Log => {
warn!(%ws_id, alive = nproc_alive, limit = ?rl.nproc, "quota breach: nproc");
}
shipote_card::QuotaAction::Kill => {
warn!(%ws_id, alive = nproc_alive, limit = ?rl.nproc, "quota breach: KILLING");
kill_needed = true;
}
_ => {}
}
}
if kill_needed {
to_enforce_kill.push(*ws_id);
}
}
// Pipeline supervisor: detectar pipelines cuyos comandos tienen
// failure. Marca para restart si tiene supervisor.
// Esto se hace cuando TODOS los comandos del pipeline están
// dead Y al menos uno tiene exit!=0 (sino podría disparar
// restart mientras otros comandos aún corren — incorrecto).
let supervisor_ids: Vec<Ulid> = g.pipeline_supervisors.keys().copied().collect();
for pipe_id in supervisor_ids {
// ¿Hay algún comando vivo de este pipeline?
let mut all_dead = true;
let mut any_failed = false;
for ws in g.workspaces.values() {
for cmd in ws.commands.values() {
if cmd.pipeline_id != Some(pipe_id) {
continue;
}
if cmd.alive {
all_dead = false;
} else if cmd.exit_status.map_or(false, |s| s != 0) {
any_failed = true;
}
}
}
if all_dead && any_failed {
// Push a queue si no estaba ya.
if !g.pending_pipeline_restarts.contains(&pipe_id) {
g.pending_pipeline_restarts.push(pipe_id);
}
}
}
// Detectar restart_specs cuyo command_id ya está dead con exit!=0.
let mut to_remove: Vec<Ulid> = Vec::new();
for (cmd_id, spec) in g.restart_specs.iter() {
@@ -826,6 +999,10 @@ impl WorkspaceManager {
g.restart_specs.remove(&id);
}
}
// Quota enforcement: kill workspaces fuera del lock.
for ws_id in to_enforce_kill {
let _ = self.stop_with_grace(ws_id, std::time::Duration::ZERO).await;
}
// Schedule restart fuera del lock.
for mut spec in to_restart {
let mgr = self.clone();
@@ -900,6 +1077,7 @@ mod tests {
ttl: Some(std::time::Duration::from_millis(120)),
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
assert_eq!(mgr.list().await.len(), 1);
@@ -922,6 +1100,7 @@ mod tests {
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _w) = mgr.create(spec).await.unwrap();
let list = mgr.list().await;
@@ -939,6 +1118,7 @@ mod tests {
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
let summary = mgr
@@ -971,6 +1151,7 @@ mod tests {
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
// sh -c "echo OUT; echo ERR >&2"
@@ -1017,6 +1198,7 @@ mod tests {
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
// /bin/false sale con exit=1. Con restart_on_failure=true debería
@@ -1042,6 +1224,98 @@ mod tests {
panic!("restart never launched a new command");
}
#[tokio::test]
async fn pipeline_supervisor_queues_restart_on_failure() {
use shipote_card::{CommandRef, DiscernPolicy, PipelineSpec};
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let (ws_id, _) = mgr.create(WorkspaceSpec {
label: "psup".into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
}).await.unwrap();
let spec = PipelineSpec {
label: "fail-pipeline".into(),
workspace: ws_id,
nodes: vec![CommandRef {
label: "boom".into(),
payload: brahman_card::Payload::Native {
exec: "/bin/false".into(),
argv: vec![],
envp: vec![],
},
soma: Default::default(),
flows: Default::default(),
supervision: brahman_card::Supervision::OneShot,
}],
edges: vec![],
discern: DiscernPolicy::default(),
restart_on_failure: true,
};
let pipeline_id = ulid::Ulid::new();
// Simulamos lo que haría el daemon: registramos un comando como
// si fuera de pipeline. Usamos `register_pipeline_commands` con
// un pid fake — pero como reaper hace waitpid, mejor lanzar de verdad.
// Hack: usar /bin/false via run() y manualmente marcar pipeline_id.
let summary = mgr.run(ws_id, "/bin/false".into(), vec![], vec![]).await.unwrap();
// Marcar el comando con pipeline_id manualmente.
{
let mut g = mgr.inner.lock().await;
if let Some(ws) = g.workspaces.get_mut(&ws_id) {
if let Some(cmd) = ws.commands.get_mut(&summary.id) {
cmd.pipeline_id = Some(pipeline_id);
}
}
}
mgr.register_pipeline_supervisor(pipeline_id, ws_id, spec, true).await;
// Esperamos que reap detecte la falla y push a pending.
for _ in 0..40 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
mgr.reap_dead().await;
let pending = mgr.take_pending_restarts().await;
if !pending.is_empty() {
assert_eq!(pending[0].spec.label, "fail-pipeline");
return;
}
}
panic!("supervisor never queued a restart");
}
#[tokio::test]
async fn quota_enforce_nproc_kill_terminates_commands() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let mut spec = WorkspaceSpec {
label: "qenforce".into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: shipote_card::QuotaEnforcement {
mem: shipote_card::QuotaAction::None,
nproc: shipote_card::QuotaAction::Kill,
},
};
spec.soma.rlimits.nproc = Some(1);
let (id, _) = mgr.create(spec).await.unwrap();
// Lanzo 2 procesos (cada uno sleep). nproc_limit=1 → breach inmediato.
let _ = mgr.run(id, "/bin/sleep".into(), vec!["5".into()], vec![]).await.unwrap();
let _ = mgr.run(id, "/bin/sleep".into(), vec!["5".into()], vec![]).await.unwrap();
// Reaper detecta breach y mata workspace.
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
mgr.reap_dead().await;
let alive = mgr.list().await;
if alive.is_empty() {
return; // workspace removido por stop()
}
}
panic!("quota enforce kill never triggered");
}
#[tokio::test]
async fn run_true_in_workspace() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
@@ -1052,6 +1326,7 @@ mod tests {
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
quota_enforce: Default::default(),
};
let (id, _) = mgr.create(spec).await.unwrap();
let summary = mgr