feat(shuma-exec): spill con splice (copia cero) + limpieza de temporales
El volcado de la salida excedente ya no copia por espacio de usuario: pasado el tope, el lector escribe la línea que cruzó + lo bufereado y luego mueve el resto del pipe al archivo con splice(2) —kernel a kernel, sin copia—. Se aplica a stdout (el contenido principal). shuma-shell limpia sus archivos de volcado al cerrar la sesión (Drop). Los spills llevan el pid en el nombre para no chocar entre instancias. shuma-exec: 11 tests verdes (el de spill ahora verifica el camino splice). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -25,13 +25,16 @@
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::os::fd::AsFd;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use nix::fcntl::{splice, SpliceFFlags};
|
||||
|
||||
/// Una etapa del pipe en ejecución directa: un binario y sus argumentos
|
||||
/// ya resueltos (sin comillas, sin metacaracteres).
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -128,25 +131,6 @@ impl RunEvent {
|
||||
}
|
||||
}
|
||||
|
||||
/// Destino de volcado de la salida excedente — compartido entre lectores.
|
||||
struct SpillSink {
|
||||
path: PathBuf,
|
||||
file: Mutex<Option<File>>,
|
||||
}
|
||||
|
||||
impl SpillSink {
|
||||
/// Escribe una línea excedente al archivo (lo abre perezosamente).
|
||||
fn write_line(&self, line: &str) {
|
||||
let mut g = self.file.lock().expect("spill lock");
|
||||
if g.is_none() {
|
||||
*g = File::create(&self.path).ok();
|
||||
}
|
||||
if let Some(f) = g.as_mut() {
|
||||
let _ = writeln!(f, "{line}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Asa de un comando en ejecución. El consumidor la conserva y drena sus
|
||||
/// eventos cuando le conviene.
|
||||
pub struct RunHandle {
|
||||
@@ -208,41 +192,71 @@ impl RunHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// Lanza un hilo lector de un flujo, con captura acotada. Pasado el tope
|
||||
/// emite (una vez) `Truncated` o `Spilled` y deriva el resto al sumidero
|
||||
/// de volcado o a la basura — pero **sigue drenando** el pipe.
|
||||
/// Vuelca el resto de un pipe a un archivo con **copia cero** (`splice`):
|
||||
/// los bytes van de pipe a archivo sin pasar por espacio de usuario.
|
||||
fn spill_rest<R: Read + AsFd>(reader: &mut BufReader<R>, path: &Path, first_line: &str) {
|
||||
let Ok(file) = File::create(path) else {
|
||||
return;
|
||||
};
|
||||
let mut file = file;
|
||||
// La línea que cruzó el tope y lo ya bufereado van primero…
|
||||
let _ = file.write_all(first_line.as_bytes());
|
||||
let buffered: Vec<u8> = reader.buffer().to_vec();
|
||||
let _ = file.write_all(&buffered);
|
||||
reader.consume(buffered.len());
|
||||
// …y el resto del pipe se mueve con `splice`, kernel a kernel.
|
||||
loop {
|
||||
match splice(reader.get_ref(), None, &file, None, 1 << 20, SpliceFFlags::empty()) {
|
||||
Ok(0) | Err(_) => break,
|
||||
Ok(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lanza un hilo lector de un flujo, con captura acotada. Pasado el tope:
|
||||
/// si hay `spill`, el resto se vuelca al archivo con `splice` (copia
|
||||
/// cero); si no, se descarta. En ambos casos el pipe se **sigue
|
||||
/// drenando** — el proceso nunca se bloquea.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn spawn_reader<R: Read + Send + 'static>(
|
||||
fn spawn_reader<R: Read + AsFd + Send + 'static>(
|
||||
stream: R,
|
||||
tx: Sender<RunEvent>,
|
||||
make: fn(String) -> RunEvent,
|
||||
limit: usize,
|
||||
counter: Arc<AtomicUsize>,
|
||||
announced: Arc<AtomicBool>,
|
||||
spill: Option<Arc<SpillSink>>,
|
||||
spill: Option<PathBuf>,
|
||||
) -> JoinHandle<()> {
|
||||
std::thread::spawn(move || {
|
||||
for line in BufReader::new(stream).lines().map_while(Result::ok) {
|
||||
let total =
|
||||
counter.fetch_add(line.len() + 1, Ordering::Relaxed) + line.len() + 1;
|
||||
let mut reader = BufReader::new(stream);
|
||||
let mut buf = String::new();
|
||||
loop {
|
||||
buf.clear();
|
||||
let n = match reader.read_line(&mut buf) {
|
||||
Ok(0) => break, // EOF
|
||||
Ok(n) => n,
|
||||
Err(_) => break,
|
||||
};
|
||||
let total = counter.fetch_add(n, Ordering::Relaxed) + n;
|
||||
if limit != 0 && total > limit {
|
||||
let first = !announced.swap(true, Ordering::Relaxed);
|
||||
match &spill {
|
||||
Some(sink) => {
|
||||
Some(path) => {
|
||||
if first {
|
||||
let _ = tx
|
||||
.send(RunEvent::Spilled(sink.path.display().to_string()));
|
||||
let _ = tx.send(RunEvent::Spilled(path.display().to_string()));
|
||||
}
|
||||
sink.write_line(&line);
|
||||
spill_rest(&mut reader, path, &buf);
|
||||
break; // splice se llevó el resto
|
||||
}
|
||||
None => {
|
||||
if first {
|
||||
let _ = tx.send(RunEvent::Truncated);
|
||||
}
|
||||
continue; // descarta, pero sigue drenando
|
||||
}
|
||||
}
|
||||
continue; // descarta/vuelca, pero sigue leyendo el pipe
|
||||
}
|
||||
let line = buf.trim_end_matches(['\n', '\r']).to_string();
|
||||
if tx.send(make(line)).is_err() {
|
||||
break;
|
||||
}
|
||||
@@ -361,13 +375,10 @@ pub fn run(spec: &CommandSpec) -> RunHandle {
|
||||
}
|
||||
|
||||
// Captura acotada: contador y aviso compartidos por todos los
|
||||
// lectores; un sumidero de volcado opcional.
|
||||
// lectores. El volcado a archivo se aplica sólo a stdout (el
|
||||
// contenido principal); stderr excedente se descarta.
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let announced = Arc::new(AtomicBool::new(false));
|
||||
let spill = spec
|
||||
.spill_path
|
||||
.clone()
|
||||
.map(|path| Arc::new(SpillSink { path, file: Mutex::new(None) }));
|
||||
let limit = spec.capture_limit;
|
||||
|
||||
let mut readers: Vec<JoinHandle<()>> = Vec::new();
|
||||
@@ -379,7 +390,7 @@ pub fn run(spec: &CommandSpec) -> RunHandle {
|
||||
limit,
|
||||
Arc::clone(&counter),
|
||||
Arc::clone(&announced),
|
||||
spill.clone(),
|
||||
spec.spill_path.clone(),
|
||||
));
|
||||
}
|
||||
for s in stderrs {
|
||||
@@ -390,7 +401,7 @@ pub fn run(spec: &CommandSpec) -> RunHandle {
|
||||
limit,
|
||||
Arc::clone(&counter),
|
||||
Arc::clone(&announced),
|
||||
spill.clone(),
|
||||
None,
|
||||
));
|
||||
}
|
||||
for h in readers {
|
||||
|
||||
Reference in New Issue
Block a user