From 4c9d1b4c1dbc62cf7d9cfde211524eb06ba21144 Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 10:22:46 +0000 Subject: [PATCH] feat(shipote): quota enforce + cgroup memory.max + pipeline restart (fase L) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - WorkspaceSpec.quota_enforce: QuotaAction (None|Log|Kill) por recurso (mem, nproc). reap_dead aplica policy; Kill usa stop_with_grace(ZERO). - ente_incarnate::cgroup::apply_rlimits_to_cgroup escribe memory.max y pids.max. WorkspaceManager::create_with_id lo invoca si soma.cgroup.path y delegation. Kernel hace OOM kill al exceder; falla silenciosa si no hay delegation. - PipelineSpec.restart_on_failure: bool. register_pipeline_supervisor retiene spec; reap_dead detecta all-dead + any-failed → push a queue; daemon reaper drena y relanza pipeline ENTERO (los pipes intermedios no permiten restart parcial). 82 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 24, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shipote-daemon/src/main.rs | 54 +++- .../modules/shipote/shipote-card/src/lib.rs | 37 +++ .../modules/shipote/shipote-core/src/lib.rs | 277 +++++++++++++++++- .../shipote/shipote-core/src/persist.rs | 2 + .../shipote/shipote-core/src/pipeline.rs | 4 + .../shipote/shipote-protocol/src/lib.rs | 1 + crates/shared/ente-incarnate/src/cgroup.rs | 31 +- 7 files changed, 401 insertions(+), 5 deletions(-) diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index 9f024b0..939fec3 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -93,7 +93,8 @@ async fn main() -> anyhow::Result<()> { let discerner = Arc::new(DiscernPipeline::default_pipeline()); - // Reaper periódico cada 500 ms. + // Reaper periódico cada 500 ms. Además drena pipelines pendientes + // de restart (supervisión a nivel pipeline). { let mgr = mgr.clone(); tokio::spawn(async move { @@ -101,6 +102,55 @@ async fn main() -> anyhow::Result<()> { loop { tick.tick().await; mgr.reap_dead().await; + let pending = mgr.take_pending_restarts().await; + for sup in pending { + info!( + label = %sup.spec.label, + restart_count = sup.restart_count, + "pipeline restart: relaunching" + ); + let inc = mgr.incarnator_handle(); + let disc = std::sync::Arc::new(DiscernPipeline::default_pipeline()); + let ws_label = mgr + .workspace_label(sup.spec.workspace) + .await + .unwrap_or_default(); + let restart_count = sup.restart_count; + let workspace = sup.spec.workspace; + let tap = sup.tap; + let mut new_spec = sup.spec.clone(); + // Mantener restart_on_failure para futuras fallas. + new_spec.restart_on_failure = true; + match shipote_core::pipeline::run_pipeline( + &new_spec, + &ws_label, + tap, + disc, + inc, + Some(mgr.clone()), + ) + .await + { + Ok(launch) => { + mgr.register_pipeline_commands(workspace, launch.pipeline, launch.command_pids.clone()) + .await; + // Re-registrar supervisor con el nuevo pipeline_id, + // preservando restart_count. + let mut s = shipote_core::PipelineSupervisor { + workspace, + spec: new_spec, + tap, + restart_count, + }; + s.restart_count = restart_count; + mgr.register_pipeline_supervisor(launch.pipeline, workspace, s.spec.clone(), tap) + .await; + } + Err(e) => { + warn!(?e, "pipeline restart failed"); + } + } + } } }); } @@ -219,6 +269,7 @@ async fn dispatch( announce_edges_to_broker(pool.as_deref(), &pipeline_id, &launch.edge_discernments); let cmds = launch.command_pids; mgr.register_pipeline_commands(spec.workspace, pipeline_id, cmds.clone()).await; + mgr.register_pipeline_supervisor(pipeline_id, spec.workspace, spec.clone(), tap).await; let edges = launch.edge_discernments.into_iter().map(map_edge_to_info).collect(); Response::PipelineStarted { pipeline: pipeline_id, @@ -318,6 +369,7 @@ async fn dispatch( announce_edges_to_broker(pool.as_deref(), &pipeline_id, &launch.edge_discernments); let cmds = launch.command_pids; mgr.register_pipeline_commands(spec.workspace, pipeline_id, cmds.clone()).await; + mgr.register_pipeline_supervisor(pipeline_id, spec.workspace, spec.clone(), tap).await; let edges = launch.edge_discernments.into_iter().map(map_edge_to_info).collect(); Response::PipelineStarted { pipeline: pipeline_id, diff --git a/crates/modules/shipote/shipote-card/src/lib.rs b/crates/modules/shipote/shipote-card/src/lib.rs index b1fc374..19873e1 100644 --- a/crates/modules/shipote/shipote-card/src/lib.rs +++ b/crates/modules/shipote/shipote-card/src/lib.rs @@ -94,6 +94,34 @@ pub struct WorkspaceSpec { /// Política al terminar el workspace. #[serde(default)] pub on_exit: ExitPolicy, + + /// Política de enforcement automático cuando un recurso excede su + /// rlimit declarado en `soma.rlimits`. Default = sólo accounting + /// (None) — el quota report sigue funcionando, pero no hay kill. + #[serde(default)] + pub quota_enforce: QuotaEnforcement, +} + +/// Acción cuando un recurso excede su límite. Aplica por recurso (mem, +/// nproc, ...). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum QuotaAction { + /// Sólo accounting: la breach aparece en `workspace_quota`. + #[default] + None, + /// Loguear la breach (info-level del daemon). + Log, + /// Matar todos los comandos vivos del workspace (SIGKILL, sin grace). + Kill, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct QuotaEnforcement { + #[serde(default)] + pub mem: QuotaAction, + #[serde(default)] + pub nproc: QuotaAction, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -189,6 +217,11 @@ pub struct PipelineSpec { pub edges: Vec, #[serde(default)] pub discern: DiscernPolicy, + /// Si `true` y cualquier comando del pipeline termina con exit!=0, + /// el daemon relaunch el pipeline ENTERO (stop + nuevo run_pipeline). + /// Útil para pipelines de procesamiento continuo. + #[serde(default)] + pub restart_on_failure: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -468,6 +501,7 @@ mod subst_tests { }], edges: vec![], discern: DiscernPolicy::default(), + restart_on_failure: false, }; let out = substitute_vars(&spec, &vars).unwrap(); assert_eq!(out.label, "p-renamed"); @@ -487,6 +521,7 @@ mod subst_tests { nodes: vec![], edges: vec![], discern: DiscernPolicy::default(), + restart_on_failure: false, }; let out = substitute_vars(&spec, &vars).unwrap(); assert_eq!(out.label, "p-${UNDEFINED}"); @@ -509,6 +544,7 @@ mod tests { scope: FlowScope::Public, }], on_exit: ExitPolicy::Reap, + quota_enforce: Default::default(), } } @@ -566,6 +602,7 @@ mod tests { to_input: "y".into(), }], discern: DiscernPolicy::default(), + restart_on_failure: false, }; assert!(p.validate().is_err()); } diff --git a/crates/modules/shipote/shipote-core/src/lib.rs b/crates/modules/shipote/shipote-core/src/lib.rs index 239b60f..9673dc3 100644 --- a/crates/modules/shipote/shipote-core/src/lib.rs +++ b/crates/modules/shipote/shipote-core/src/lib.rs @@ -99,6 +99,21 @@ struct Inner { /// con la misma spec (nuevo pid y nuevo command_id se asigna por /// el nuevo state pero el restart_spec sigue ligado al original). restart_specs: HashMap, + /// Supervisores de pipelines con `restart_on_failure`. Indexed por + /// pipeline_id. Cuando `reap_dead` detecta que el pipeline tuvo + /// algún command failure, agrega un entry a `pending_pipeline_restarts`. + pipeline_supervisors: HashMap, + /// Cola de pipelines pendientes de restart. El daemon la drena en + /// cada loop del reaper, hace stop + run_pipeline. + pending_pipeline_restarts: Vec, +} + +#[derive(Debug, Clone)] +pub struct PipelineSupervisor { + pub workspace: WorkspaceId, + pub spec: PipelineSpec, + pub tap: bool, + pub restart_count: u32, } #[derive(Debug, Clone)] @@ -131,6 +146,19 @@ pub struct CommandInfo { pub log_bytes: u64, } +/// Lee VmRSS (bytes) de `/proc//status`. Helper local para +/// reap_dead que no necesita el full stats. Devuelve 0 si el proc no +/// existe o el campo no aparece. +fn read_proc_rss(pid: i32) -> Option { + let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?; + status + .lines() + .find_map(|l| l.strip_prefix("VmRSS:").map(str::trim)) + .and_then(|s| s.split_whitespace().next()) + .and_then(|s| s.parse::().ok()) + .map(|kb| kb * 1024) +} + fn spawn_log_drainer(read_fd: std::os::fd::RawFd, logs: logbuf::LogBuf) { // Marcar non-blocking + envolver en AsyncFd; igual patrón que el tap. // SAFETY: F_SETFL sobre fd válido. @@ -198,11 +226,54 @@ impl WorkspaceManager { saved_pipelines: HashMap::new(), pipeline_flows: HashMap::new(), restart_specs: HashMap::new(), + pipeline_supervisors: HashMap::new(), + pending_pipeline_restarts: Vec::new(), })), incarnator: Arc::new(Incarnator::new(cfg)), } } + /// Registra un supervisor para un pipeline con `restart_on_failure=true`. + /// El daemon llama esto tras `run_pipeline` para que `reap_dead` agregue + /// el pipeline a la cola de restart cuando algún command falle. + pub async fn register_pipeline_supervisor( + &self, + pipeline_id: Ulid, + workspace: WorkspaceId, + spec: PipelineSpec, + tap: bool, + ) { + if !spec.restart_on_failure { + return; + } + tracing::debug!(%pipeline_id, label = %spec.label, "pipeline supervisor registered"); + let mut g = self.inner.lock().await; + g.pipeline_supervisors.insert( + pipeline_id, + PipelineSupervisor { + workspace, + spec, + tap, + restart_count: 0, + }, + ); + } + + /// Drena la cola de pipelines pendientes de restart y retorna las + /// specs a relaunch. El daemon lo llama tras cada `reap_dead`. + pub async fn take_pending_restarts(&self) -> Vec { + let mut g = self.inner.lock().await; + let pending = std::mem::take(&mut g.pending_pipeline_restarts); + let mut out = Vec::with_capacity(pending.len()); + for old_id in pending { + if let Some(mut sup) = g.pipeline_supervisors.remove(&old_id) { + sup.restart_count += 1; + out.push(sup); + } + } + out + } + /// Registra los comandos lanzados por un pipeline en el workspace. /// Esto permite `pipeline_stop` (matar selectivamente sólo los pids /// de un pipeline). `pipeline_id` se setea en cada CommandState. @@ -465,8 +536,22 @@ impl WorkspaceManager { spec: WorkspaceSpec, ) -> Result<(WorkspaceId, Vec), CoreError> { let card = spec.to_card(id)?; - let warnings = self.incarnator.dry_run(&card).warnings; + let mut warnings = self.incarnator.dry_run(&card).warnings; let ttl = spec.ttl; + + // Si el workspace declara cgroup path Y rlimits, intentamos + // crear el cgroup y escribir memory.max/pids.max. El kernel + // hace OOM kill al exceder memory.max — enforcement automático + // sin policy adicional. Falla silenciosa si no hay delegation. + if !spec.soma.cgroup.path.is_empty() { + if let Ok(abs) = ente_incarnate::cgroup::ensure_cgroup(&spec.soma.cgroup) { + let applied = + ente_incarnate::cgroup::apply_rlimits_to_cgroup(&abs, &spec.soma.rlimits); + if !applied.is_empty() { + warnings.push(format!("cgroup limits applied: {}", applied.join(", "))); + } + } + } let state = WorkspaceState { id, spec, @@ -778,6 +863,7 @@ impl WorkspaceManager { /// el daemon o ante SIGCHLD. Marca `alive=false` y guarda exit_status. pub async fn reap_dead(self: &Arc) { let mut to_restart: Vec = Vec::new(); + let mut to_enforce_kill: Vec = Vec::new(); { let mut g = self.inner.lock().await; for ws in g.workspaces.values_mut() { @@ -798,6 +884,93 @@ impl WorkspaceManager { } } } + // Quota enforcement: chequear breach por workspace y aplicar policy. + // Lo hacemos dentro del mismo lock para tener una lectura + // consistente; el kill real va fuera del lock. + for (ws_id, ws) in g.workspaces.iter() { + let rl = &ws.spec.soma.rlimits; + let qe = &ws.spec.quota_enforce; + // Sólo aplicamos si hay al menos una action != None. + if qe.mem == shipote_card::QuotaAction::None + && qe.nproc == shipote_card::QuotaAction::None + { + continue; + } + // Medir RSS y nproc vivos sin pasar por workspace_stats + // (que tomaría el lock recursivo). Hacemos un read directo. + let alive: Vec = ws + .commands + .values() + .filter(|c| c.alive) + .map(|c| c.pid.as_raw()) + .collect(); + let nproc_alive = alive.len() as u32; + let mem_used: u64 = alive + .iter() + .filter_map(|pid| read_proc_rss(*pid)) + .sum(); + + let mem_breach = matches!(rl.mem_bytes, Some(limit) if mem_used > limit); + let nproc_breach = matches!(rl.nproc, Some(limit) if nproc_alive > limit); + + let mut kill_needed = false; + if mem_breach { + match qe.mem { + shipote_card::QuotaAction::Log => { + warn!(%ws_id, used = mem_used, limit = ?rl.mem_bytes, "quota breach: memory"); + } + shipote_card::QuotaAction::Kill => { + warn!(%ws_id, used = mem_used, limit = ?rl.mem_bytes, "quota breach: KILLING"); + kill_needed = true; + } + _ => {} + } + } + if nproc_breach { + match qe.nproc { + shipote_card::QuotaAction::Log => { + warn!(%ws_id, alive = nproc_alive, limit = ?rl.nproc, "quota breach: nproc"); + } + shipote_card::QuotaAction::Kill => { + warn!(%ws_id, alive = nproc_alive, limit = ?rl.nproc, "quota breach: KILLING"); + kill_needed = true; + } + _ => {} + } + } + if kill_needed { + to_enforce_kill.push(*ws_id); + } + } + // Pipeline supervisor: detectar pipelines cuyos comandos tienen + // failure. Marca para restart si tiene supervisor. + // Esto se hace cuando TODOS los comandos del pipeline están + // dead Y al menos uno tiene exit!=0 (sino podría disparar + // restart mientras otros comandos aún corren — incorrecto). + let supervisor_ids: Vec = g.pipeline_supervisors.keys().copied().collect(); + for pipe_id in supervisor_ids { + // ¿Hay algún comando vivo de este pipeline? + let mut all_dead = true; + let mut any_failed = false; + for ws in g.workspaces.values() { + for cmd in ws.commands.values() { + if cmd.pipeline_id != Some(pipe_id) { + continue; + } + if cmd.alive { + all_dead = false; + } else if cmd.exit_status.map_or(false, |s| s != 0) { + any_failed = true; + } + } + } + if all_dead && any_failed { + // Push a queue si no estaba ya. + if !g.pending_pipeline_restarts.contains(&pipe_id) { + g.pending_pipeline_restarts.push(pipe_id); + } + } + } // Detectar restart_specs cuyo command_id ya está dead con exit!=0. let mut to_remove: Vec = Vec::new(); for (cmd_id, spec) in g.restart_specs.iter() { @@ -826,6 +999,10 @@ impl WorkspaceManager { g.restart_specs.remove(&id); } } + // Quota enforcement: kill workspaces fuera del lock. + for ws_id in to_enforce_kill { + let _ = self.stop_with_grace(ws_id, std::time::Duration::ZERO).await; + } // Schedule restart fuera del lock. for mut spec in to_restart { let mgr = self.clone(); @@ -900,6 +1077,7 @@ mod tests { ttl: Some(std::time::Duration::from_millis(120)), flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _) = mgr.create(spec).await.unwrap(); assert_eq!(mgr.list().await.len(), 1); @@ -922,6 +1100,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _w) = mgr.create(spec).await.unwrap(); let list = mgr.list().await; @@ -939,6 +1118,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _) = mgr.create(spec).await.unwrap(); let summary = mgr @@ -971,6 +1151,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _) = mgr.create(spec).await.unwrap(); // sh -c "echo OUT; echo ERR >&2" @@ -1017,6 +1198,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _) = mgr.create(spec).await.unwrap(); // /bin/false sale con exit=1. Con restart_on_failure=true debería @@ -1042,6 +1224,98 @@ mod tests { panic!("restart never launched a new command"); } + #[tokio::test] + async fn pipeline_supervisor_queues_restart_on_failure() { + use shipote_card::{CommandRef, DiscernPolicy, PipelineSpec}; + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let (ws_id, _) = mgr.create(WorkspaceSpec { + label: "psup".into(), + soma: Default::default(), + permissions: Default::default(), + ttl: None, + flow_dirs: vec![], + on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), + }).await.unwrap(); + let spec = PipelineSpec { + label: "fail-pipeline".into(), + workspace: ws_id, + nodes: vec![CommandRef { + label: "boom".into(), + payload: brahman_card::Payload::Native { + exec: "/bin/false".into(), + argv: vec![], + envp: vec![], + }, + soma: Default::default(), + flows: Default::default(), + supervision: brahman_card::Supervision::OneShot, + }], + edges: vec![], + discern: DiscernPolicy::default(), + restart_on_failure: true, + }; + let pipeline_id = ulid::Ulid::new(); + // Simulamos lo que haría el daemon: registramos un comando como + // si fuera de pipeline. Usamos `register_pipeline_commands` con + // un pid fake — pero como reaper hace waitpid, mejor lanzar de verdad. + // Hack: usar /bin/false via run() y manualmente marcar pipeline_id. + let summary = mgr.run(ws_id, "/bin/false".into(), vec![], vec![]).await.unwrap(); + // Marcar el comando con pipeline_id manualmente. + { + let mut g = mgr.inner.lock().await; + if let Some(ws) = g.workspaces.get_mut(&ws_id) { + if let Some(cmd) = ws.commands.get_mut(&summary.id) { + cmd.pipeline_id = Some(pipeline_id); + } + } + } + mgr.register_pipeline_supervisor(pipeline_id, ws_id, spec, true).await; + // Esperamos que reap detecte la falla y push a pending. + for _ in 0..40 { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + mgr.reap_dead().await; + let pending = mgr.take_pending_restarts().await; + if !pending.is_empty() { + assert_eq!(pending[0].spec.label, "fail-pipeline"); + return; + } + } + panic!("supervisor never queued a restart"); + } + + #[tokio::test] + async fn quota_enforce_nproc_kill_terminates_commands() { + let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); + let mut spec = WorkspaceSpec { + label: "qenforce".into(), + soma: Default::default(), + permissions: Default::default(), + ttl: None, + flow_dirs: vec![], + on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: shipote_card::QuotaEnforcement { + mem: shipote_card::QuotaAction::None, + nproc: shipote_card::QuotaAction::Kill, + }, + }; + spec.soma.rlimits.nproc = Some(1); + let (id, _) = mgr.create(spec).await.unwrap(); + // Lanzo 2 procesos (cada uno sleep). nproc_limit=1 → breach inmediato. + let _ = mgr.run(id, "/bin/sleep".into(), vec!["5".into()], vec![]).await.unwrap(); + let _ = mgr.run(id, "/bin/sleep".into(), vec!["5".into()], vec![]).await.unwrap(); + // Reaper detecta breach y mata workspace. + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + mgr.reap_dead().await; + let alive = mgr.list().await; + if alive.is_empty() { + return; // workspace removido por stop() + } + } + panic!("quota enforce kill never triggered"); + } + #[tokio::test] async fn run_true_in_workspace() { let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default())); @@ -1052,6 +1326,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }; let (id, _) = mgr.create(spec).await.unwrap(); let summary = mgr diff --git a/crates/modules/shipote/shipote-core/src/persist.rs b/crates/modules/shipote/shipote-core/src/persist.rs index a4e1e44..ca8f862 100644 --- a/crates/modules/shipote/shipote-core/src/persist.rs +++ b/crates/modules/shipote/shipote-core/src/persist.rs @@ -162,6 +162,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: ExitPolicy::Reap, + quota_enforce: Default::default(), } } @@ -208,6 +209,7 @@ mod tests { }], edges: vec![], discern: DiscernPolicy::default(), + restart_on_failure: false, }; mgr1.save_pipeline("daily".into(), spec).await; mgr1.save_snapshot(&path).await.unwrap(); diff --git a/crates/modules/shipote/shipote-core/src/pipeline.rs b/crates/modules/shipote/shipote-core/src/pipeline.rs index ece10e6..150c158 100644 --- a/crates/modules/shipote/shipote-core/src/pipeline.rs +++ b/crates/modules/shipote/shipote-core/src/pipeline.rs @@ -594,6 +594,7 @@ mod tests { to_input: "stdin".into(), }], discern: DiscernPolicy::default(), + restart_on_failure: false, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -631,6 +632,7 @@ mod tests { }, ], discern: DiscernPolicy::default(), + restart_on_failure: false, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -667,6 +669,7 @@ mod tests { }, ], discern: DiscernPolicy::default(), + restart_on_failure: false, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); @@ -698,6 +701,7 @@ mod tests { replay_chunks: 32, replay_bytes: 0, }, + restart_on_failure: false, }; let disc = Arc::new(DiscernPipeline::default_pipeline()); let inc = Arc::new(Incarnator::new(IncarnatorConfig::default())); diff --git a/crates/modules/shipote/shipote-protocol/src/lib.rs b/crates/modules/shipote/shipote-protocol/src/lib.rs index 51d130c..598c17e 100644 --- a/crates/modules/shipote/shipote-protocol/src/lib.rs +++ b/crates/modules/shipote/shipote-protocol/src/lib.rs @@ -374,6 +374,7 @@ mod tests { ttl: None, flow_dirs: vec![], on_exit: shipote_card::ExitPolicy::Reap, + quota_enforce: Default::default(), }, }; let bytes = postcard::to_allocvec(&req).unwrap(); diff --git a/crates/shared/ente-incarnate/src/cgroup.rs b/crates/shared/ente-incarnate/src/cgroup.rs index 580d478..59e2680 100644 --- a/crates/shared/ente-incarnate/src/cgroup.rs +++ b/crates/shared/ente-incarnate/src/cgroup.rs @@ -1,8 +1,8 @@ //! Resolución y creación de cgroups v2 para el hijo. use crate::error::IncarnateError; -use brahman_card::CgroupSpec; -use std::path::PathBuf; +use brahman_card::{CgroupSpec, ResourceLimits}; +use std::path::{Path, PathBuf}; /// Cgroup actual del proceso que llama. Lo usamos como prefijo para paths /// declarados relativos en `CgroupSpec.path`. @@ -58,8 +58,33 @@ pub fn ensure_cgroup(spec: &CgroupSpec) -> Result { Ok(abs) } +/// Escribe `memory.max` y `pids.max` al cgroup según `rlimits`. Falla +/// silenciosamente si los archivos no son escribibles (cgroup no +/// delegated). El kernel hace OOM kill cuando `memory.max` se excede, +/// y bloquea forks cuando `pids.max` se alcanza. +/// +/// `memory.max` acepta `max` o un número en bytes. `pids.max` igual. +pub fn apply_rlimits_to_cgroup(cgroup_abs: &Path, rlimits: &ResourceLimits) -> Vec { + let mut applied = Vec::new(); + if let Some(mem) = rlimits.mem_bytes { + let path = cgroup_abs.join("memory.max"); + match std::fs::write(&path, format!("{mem}\n")) { + Ok(_) => applied.push(format!("memory.max={mem}")), + Err(e) => tracing::warn!(?e, path = %path.display(), "memory.max write failed"), + } + } + if let Some(np) = rlimits.nproc { + let path = cgroup_abs.join("pids.max"); + match std::fs::write(&path, format!("{np}\n")) { + Ok(_) => applied.push(format!("pids.max={np}")), + Err(e) => tracing::warn!(?e, path = %path.display(), "pids.max write failed"), + } + } + applied +} + /// Mueve `pid` a `cgroup_abs/cgroup.procs`. -pub fn move_to_cgroup(cgroup_abs: &std::path::Path, pid: nix::unistd::Pid) -> Result<(), IncarnateError> { +pub fn move_to_cgroup(cgroup_abs: &Path, pid: nix::unistd::Pid) -> Result<(), IncarnateError> { let procs = cgroup_abs.join("cgroup.procs"); std::fs::write(&procs, format!("{}\n", pid.as_raw())).map_err(|e| match e.kind() { std::io::ErrorKind::PermissionDenied => IncarnateError::CgroupNotWritable {