From 22a0ae8c58014af5cb12dbdd13408f9cd2745e31 Mon Sep 17 00:00:00 2001 From: sergio Date: Wed, 20 May 2026 20:15:30 +0000 Subject: [PATCH] feat(shuma-exec): spill con splice (copia cero) + limpieza de temporales MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit El volcado de la salida excedente ya no copia por espacio de usuario: pasado el tope, el lector escribe la línea que cruzó + lo bufereado y luego mueve el resto del pipe al archivo con splice(2) —kernel a kernel, sin copia—. Se aplica a stdout (el contenido principal). shuma-shell limpia sus archivos de volcado al cerrar la sesión (Drop). Los spills llevan el pid en el nombre para no chocar entre instancias. shuma-exec: 11 tests verdes (el de spill ahora verifica el camino splice). Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 3 + crates/apps/shuma-shell/src/main.rs | 20 ++++- crates/modules/shuma/shuma-exec/Cargo.toml | 2 + crates/modules/shuma/shuma-exec/src/lib.rs | 91 ++++++++++++---------- 4 files changed, 74 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 114751b..c0b9e30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11383,6 +11383,9 @@ dependencies = [ [[package]] name = "shuma-exec" version = "0.1.0" +dependencies = [ + "nix 0.29.0", +] [[package]] name = "shuma-gateway" diff --git a/crates/apps/shuma-shell/src/main.rs b/crates/apps/shuma-shell/src/main.rs index aab6042..8397ee1 100644 --- a/crates/apps/shuma-shell/src/main.rs +++ b/crates/apps/shuma-shell/src/main.rs @@ -642,8 +642,10 @@ impl Shell { /// aplica la política de captura de la sesión. fn build_spec(&self, line: &str, stdin: Option, run_id: RunId) -> CommandSpec { let policy = self.session.capture(); - let spill_path = (policy.spill && policy.limit_bytes > 0) - .then(|| std::env::temp_dir().join(format!("shuma-spill-{run_id}.log"))); + let spill_path = (policy.spill && policy.limit_bytes > 0).then(|| { + std::env::temp_dir() + .join(format!("shuma-spill-{}-{run_id}.log", std::process::id())) + }); CommandSpec { exec: plan_exec(line), cwd: self.session.cwd().to_string(), @@ -1621,6 +1623,20 @@ impl Render for Shell { } } +impl Drop for Shell { + /// Al cerrar la sesión, limpia sus archivos de volcado temporales. + fn drop(&mut self) { + let prefix = format!("shuma-spill-{}-", std::process::id()); + if let Ok(entries) = std::fs::read_dir(std::env::temp_dir()) { + for e in entries.flatten() { + if e.file_name().to_string_lossy().starts_with(&prefix) { + let _ = std::fs::remove_file(e.path()); + } + } + } + } +} + fn main() { launch_app("brahman · shuma shell", (1100., 700.), Shell::new); } diff --git a/crates/modules/shuma/shuma-exec/Cargo.toml b/crates/modules/shuma/shuma-exec/Cargo.toml index f042f40..c4527c0 100644 --- a/crates/modules/shuma/shuma-exec/Cargo.toml +++ b/crates/modules/shuma/shuma-exec/Cargo.toml @@ -8,3 +8,5 @@ publish.workspace = true description = "shuma — ejecutor de comandos del shell con salida en streaming: lanza un proceso y entrega stdout/stderr línea a línea por un canal. Agnóstico de UI." [dependencies] +# `zerocopy` habilita `splice` — volcado pipe→archivo sin copia. +nix = { workspace = true, features = ["zerocopy"] } diff --git a/crates/modules/shuma/shuma-exec/src/lib.rs b/crates/modules/shuma/shuma-exec/src/lib.rs index 00eb62a..676e22b 100644 --- a/crates/modules/shuma/shuma-exec/src/lib.rs +++ b/crates/modules/shuma/shuma-exec/src/lib.rs @@ -25,13 +25,16 @@ use std::fs::File; use std::io::{BufRead, BufReader, Read, Write}; -use std::path::PathBuf; +use std::os::fd::AsFd; +use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{self, Receiver, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; +use nix::fcntl::{splice, SpliceFFlags}; + /// Una etapa del pipe en ejecución directa: un binario y sus argumentos /// ya resueltos (sin comillas, sin metacaracteres). #[derive(Debug, Clone)] @@ -128,25 +131,6 @@ impl RunEvent { } } -/// Destino de volcado de la salida excedente — compartido entre lectores. -struct SpillSink { - path: PathBuf, - file: Mutex>, -} - -impl SpillSink { - /// Escribe una línea excedente al archivo (lo abre perezosamente). - fn write_line(&self, line: &str) { - let mut g = self.file.lock().expect("spill lock"); - if g.is_none() { - *g = File::create(&self.path).ok(); - } - if let Some(f) = g.as_mut() { - let _ = writeln!(f, "{line}"); - } - } -} - /// Asa de un comando en ejecución. El consumidor la conserva y drena sus /// eventos cuando le conviene. pub struct RunHandle { @@ -208,41 +192,71 @@ impl RunHandle { } } -/// Lanza un hilo lector de un flujo, con captura acotada. Pasado el tope -/// emite (una vez) `Truncated` o `Spilled` y deriva el resto al sumidero -/// de volcado o a la basura — pero **sigue drenando** el pipe. +/// Vuelca el resto de un pipe a un archivo con **copia cero** (`splice`): +/// los bytes van de pipe a archivo sin pasar por espacio de usuario. +fn spill_rest(reader: &mut BufReader, path: &Path, first_line: &str) { + let Ok(file) = File::create(path) else { + return; + }; + let mut file = file; + // La línea que cruzó el tope y lo ya bufereado van primero… + let _ = file.write_all(first_line.as_bytes()); + let buffered: Vec = reader.buffer().to_vec(); + let _ = file.write_all(&buffered); + reader.consume(buffered.len()); + // …y el resto del pipe se mueve con `splice`, kernel a kernel. + loop { + match splice(reader.get_ref(), None, &file, None, 1 << 20, SpliceFFlags::empty()) { + Ok(0) | Err(_) => break, + Ok(_) => {} + } + } +} + +/// Lanza un hilo lector de un flujo, con captura acotada. Pasado el tope: +/// si hay `spill`, el resto se vuelca al archivo con `splice` (copia +/// cero); si no, se descarta. En ambos casos el pipe se **sigue +/// drenando** — el proceso nunca se bloquea. #[allow(clippy::too_many_arguments)] -fn spawn_reader( +fn spawn_reader( stream: R, tx: Sender, make: fn(String) -> RunEvent, limit: usize, counter: Arc, announced: Arc, - spill: Option>, + spill: Option, ) -> JoinHandle<()> { std::thread::spawn(move || { - for line in BufReader::new(stream).lines().map_while(Result::ok) { - let total = - counter.fetch_add(line.len() + 1, Ordering::Relaxed) + line.len() + 1; + let mut reader = BufReader::new(stream); + let mut buf = String::new(); + loop { + buf.clear(); + let n = match reader.read_line(&mut buf) { + Ok(0) => break, // EOF + Ok(n) => n, + Err(_) => break, + }; + let total = counter.fetch_add(n, Ordering::Relaxed) + n; if limit != 0 && total > limit { let first = !announced.swap(true, Ordering::Relaxed); match &spill { - Some(sink) => { + Some(path) => { if first { - let _ = tx - .send(RunEvent::Spilled(sink.path.display().to_string())); + let _ = tx.send(RunEvent::Spilled(path.display().to_string())); } - sink.write_line(&line); + spill_rest(&mut reader, path, &buf); + break; // splice se llevó el resto } None => { if first { let _ = tx.send(RunEvent::Truncated); } + continue; // descarta, pero sigue drenando } } - continue; // descarta/vuelca, pero sigue leyendo el pipe } + let line = buf.trim_end_matches(['\n', '\r']).to_string(); if tx.send(make(line)).is_err() { break; } @@ -361,13 +375,10 @@ pub fn run(spec: &CommandSpec) -> RunHandle { } // Captura acotada: contador y aviso compartidos por todos los - // lectores; un sumidero de volcado opcional. + // lectores. El volcado a archivo se aplica sólo a stdout (el + // contenido principal); stderr excedente se descarta. let counter = Arc::new(AtomicUsize::new(0)); let announced = Arc::new(AtomicBool::new(false)); - let spill = spec - .spill_path - .clone() - .map(|path| Arc::new(SpillSink { path, file: Mutex::new(None) })); let limit = spec.capture_limit; let mut readers: Vec> = Vec::new(); @@ -379,7 +390,7 @@ pub fn run(spec: &CommandSpec) -> RunHandle { limit, Arc::clone(&counter), Arc::clone(&announced), - spill.clone(), + spec.spill_path.clone(), )); } for s in stderrs { @@ -390,7 +401,7 @@ pub fn run(spec: &CommandSpec) -> RunHandle { limit, Arc::clone(&counter), Arc::clone(&announced), - spill.clone(), + None, )); } for h in readers {