From b4be5e1c72d728d5e3aaf2f294b35a3c34c7de99 Mon Sep 17 00:00:00 2001 From: sergio Date: Wed, 20 May 2026 19:53:42 +0000 Subject: [PATCH] =?UTF-8?q?feat(shuma):=20captura=20acotada=20+=20reproces?= =?UTF-8?q?o=20de=20salidas=20v=C3=ADa=20stdin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit shuma-exec: cota dura de memoria. CommandSpec.capture_limit (bytes): pasado el tope se emite RunEvent::Truncated una vez y el resto se descarta —pero el pipe se sigue drenando, así el proceso no se bloquea y termina normal. CommandSpec.stdin_data alimenta un texto por la entrada estándar (escrito en su propio hilo). shuma-session: CommandRun.truncated. shuma-shell: tope de captura de 8 MiB por comando. Cada card con salida muestra «⤳ reprocesar» — al pulsarlo, el próximo comando filtra esa salida capturada (vía stdin) sin re-ejecutar el original; un banner marca el modo. Las cards truncadas avisan «⚠ truncado». shuma-exec: 12 tests (incluye truncado y reproceso por stdin). Co-Authored-By: Claude Opus 4.7 --- crates/apps/shuma-shell/src/main.rs | 112 +++++++++- crates/modules/shuma/shuma-exec/src/lib.rs | 198 +++++++++++++++--- crates/modules/shuma/shuma-session/src/lib.rs | 10 + nohup.out | 3 + 4 files changed, 281 insertions(+), 42 deletions(-) diff --git a/crates/apps/shuma-shell/src/main.rs b/crates/apps/shuma-shell/src/main.rs index 901c0d6..089f325 100644 --- a/crates/apps/shuma-shell/src/main.rs +++ b/crates/apps/shuma-shell/src/main.rs @@ -36,6 +36,10 @@ use shuma_sysmon::{Snapshot, SystemSampler}; /// Cuántas muestras guarda la curva de cada monitor. const HISTORY: usize = 80; +/// Tope de captura de salida por comando — 8 MiB. Pasado el tope la +/// salida se descarta: cota dura de memoria ante un stream gigante. +const CAPTURE_LIMIT: usize = 8 * 1024 * 1024; + /// Archivos/directorios que delatan la estructura de un proyecto. const PROJECT_MARKERS: &[&str] = &[ ".git", @@ -282,6 +286,8 @@ struct Shell { group_anchor: usize, /// Patrones detectados por el motor de inferencia (cache). patterns: Vec, + /// Si está activo, el próximo comando reprocesa la salida de este run. + reprocess_source: Option, /// Scroll del feed central — sigue al comando más reciente. scroll: ScrollHandle, focus: FocusHandle, @@ -323,6 +329,7 @@ impl Shell { run_ui: HashMap::new(), group_anchor: 0, patterns: Vec::new(), + reprocess_source: None, scroll: ScrollHandle::new(), focus: cx.focus_handle(), focused_once: false, @@ -372,6 +379,7 @@ impl Shell { RunEvent::Stderr(l) => { self.session.append_output(*id, Stream::Stderr, l) } + RunEvent::Truncated => self.session.mark_truncated(*id), RunEvent::Exited(code) => self.session.finish_run(*id, code, now), RunEvent::Failed(msg) => { self.session.append_output( @@ -559,7 +567,35 @@ impl Shell { let id = self.session.begin_run(&line, now); self.run_ui.insert(id, RunUi::default()); - let spec = CommandSpec::bash(&line, self.session.cwd()); + let spec = CommandSpec::bash(&line, self.session.cwd()).with_limit(CAPTURE_LIMIT); + self.active.push((id, exec_run(&spec))); + self.scroll.scroll_to_bottom(); + } + + /// Reprocesa la salida capturada del comando `source`: ejecuta `line` + /// alimentándole esa salida por stdin, sin volver a correr el + /// original. Así un resultado se filtra con distintas herramientas. + fn run_reprocess(&mut self, line: String, source: RunId) { + let line = line.trim().to_string(); + if line.is_empty() { + return; + } + let data: String = self + .session + .run(source) + .map(|r| r.lines_of(Stream::Stdout).collect::>().join("\n")) + .unwrap_or_default(); + for ui in self.run_ui.values_mut() { + if !ui.user_touched { + ui.collapsed = true; + } + } + let now = unix_now(); + let id = self.session.begin_run(&line, now); + self.run_ui.insert(id, RunUi::default()); + let spec = CommandSpec::bash(&line, self.session.cwd()) + .with_limit(CAPTURE_LIMIT) + .with_stdin(data); self.active.push((id, exec_run(&spec))); self.scroll.scroll_to_bottom(); } @@ -610,13 +646,18 @@ impl Shell { } } - /// Enter — ejecuta el contenido del input. + /// Enter — ejecuta el contenido del input, o reprocesa una salida + /// previa si hay un origen de reproceso activo. fn submit(&mut self) { let line = self.line.text().to_string(); self.line.clear(); self.completion = None; self.show_completion = false; - self.run_command(line); + if let Some(source) = self.reprocess_source.take() { + self.run_reprocess(line, source); + } else { + self.run_command(line); + } } fn handle_key(&mut self, event: &KeyDownEvent, _w: &mut Window, cx: &mut Context) { @@ -633,6 +674,7 @@ impl Shell { } "escape" => { self.show_completion = false; + self.reprocess_source = None; cx.notify(); return; } @@ -780,18 +822,25 @@ fn render_run( RunStatus::Failed => ("✗", gpui::hsla(2.0 / 360.0, 0.68, 0.60, 1.0)), }; let stderr_color = gpui::hsla(8.0 / 360.0, 0.62, 0.66, 1.0); + let accent = gpui::hsla(190.0 / 360.0, 0.60, 0.62, 1.0); - // Nota a la derecha: código de salida, y al colapsar, conteo de líneas. - let mut note = match r.exit_code { - Some(0) | None => String::new(), - Some(c) => format!("salió {c}"), - }; + // Nota a la derecha: salida no-cero, truncado, y conteo si colapsada. + let mut parts: Vec = Vec::new(); + if let Some(c) = r.exit_code { + if c != 0 { + parts.push(format!("salió {c}")); + } + } + if r.truncated { + parts.push("⚠ truncado".to_string()); + } if ui.collapsed { let n = r.count_of(Stream::Stdout); if n > 0 { - note = format!("{note} · {n} líneas").trim_start().to_string(); + parts.push(format!("{n} líneas")); } } + let note = parts.join(" · "); // Cabecera-acordeón: un clic colapsa/expande. let caret = if ui.collapsed { "▸" } else { "▾" }; @@ -870,6 +919,29 @@ fn render_run( None }; + // Reprocesar — sólo si el comando dejó algo en stdout que filtrar. + let reprocess_chip = if r.count_of(Stream::Stdout) > 0 { + Some( + div() + .id(SharedString::from(format!("repro-{id}"))) + .flex_none() + .px(px(6.)) + .py(px(1.)) + .rounded(px(3.)) + .text_size(px(11.)) + .text_color(accent) + .cursor_pointer() + .hover(|s| s.text_color(gpui::hsla(0.0, 0.0, 0.95, 1.0))) + .child("⤳ reprocesar") + .on_click(cx.listener(move |shell, _, _, cx| { + shell.reprocess_source = Some(id); + cx.notify(); + })), + ) + } else { + None + }; + let header = div() .flex() .flex_row() @@ -877,6 +949,7 @@ fn render_run( .gap(px(6.)) .child(header_left) .children(stderr_chip) + .children(reprocess_chip) .children(kill_chip); // Cuerpo: sólo con el acordeón abierto. El filtro elige el flujo. @@ -1291,16 +1364,33 @@ impl Render for Shell { .child(SharedString::from(ghost)), ); } - let prompt = div() + let input_bar = div() .h(px(46.)) .flex() .flex_row() .items_center() .px(px(14.)) - .bg(panel) .text_color(text) .text_size(px(14.)) .children(input_row); + // Banner del modo reproceso — escribí un filtro para la salida. + let banner = self.reprocess_source.map(|src| { + div() + .px(px(14.)) + .py(px(3.)) + .bg(gpui::hsla(190.0 / 360.0, 0.30, 0.22, 1.0)) + .text_size(px(11.)) + .text_color(accent) + .child(SharedString::from(format!( + "⤳ reprocesando la salida de #{src} — escribí un filtro · Esc cancela" + ))) + }); + let prompt = div() + .flex() + .flex_col() + .bg(panel) + .children(banner) + .child(input_bar); // --- Popup de autocompletado --- let mut popup_layer: Vec = Vec::new(); diff --git a/crates/modules/shuma/shuma-exec/src/lib.rs b/crates/modules/shuma/shuma-exec/src/lib.rs index 07fbc48..1f4c0d1 100644 --- a/crates/modules/shuma/shuma-exec/src/lib.rs +++ b/crates/modules/shuma/shuma-exec/src/lib.rs @@ -10,16 +10,29 @@ //! shell, en cambio, corre líneas de shell ad-hoc y necesita ver la //! salida fluir. Dos capas distintas, a propósito. //! +//! **Captura acotada.** Para no cargar en RAM un stream de gigabytes, la +//! captura tiene un límite de bytes ([`CommandSpec::capture_limit`]): +//! pasado el límite se emite [`RunEvent::Truncated`] una vez y el resto +//! se **descarta** — pero el pipe se sigue drenando, así el proceso no +//! se bloquea y termina normal. +//! +//! **Reproceso.** [`CommandSpec::stdin_data`] alimenta un texto por la +//! entrada estándar del proceso: permite reprocesar la salida capturada +//! de un comando previo con otra herramienta, sin volver a correr el +//! comando original. +//! //! El crate es agnóstico de frontend: el proceso y sus lectores corren //! en hilos; el consumidor (shell GPUI o TUI) drena el canal cuando //! quiere — sin `async`, sin acoplarse a ningún runtime. #![forbid(unsafe_code)] -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Read, Write}; use std::process::{Child, Command, Stdio}; -use std::sync::mpsc::{self, Receiver, TryRecvError}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::{self, Receiver, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; /// Qué ejecutar: una línea de comandos, en un directorio, con un shell. #[derive(Debug, Clone)] @@ -30,12 +43,35 @@ pub struct CommandSpec { pub cwd: String, /// Programa de shell — `"bash"`, `"sh"`, `"fish"`… pub shell: String, + /// Tope de bytes a capturar; `0` = sin límite. Pasado el tope, la + /// salida se descarta (se emite [`RunEvent::Truncated`]). + pub capture_limit: usize, + /// Texto a alimentar por stdin — para reprocesar una salida previa. + pub stdin_data: Option, } impl CommandSpec { - /// Spec con `bash` como shell. + /// Spec con `bash` como shell, sin límite ni stdin. pub fn bash(line: impl Into, cwd: impl Into) -> Self { - Self { line: line.into(), cwd: cwd.into(), shell: "bash".into() } + Self { + line: line.into(), + cwd: cwd.into(), + shell: "bash".into(), + capture_limit: 0, + stdin_data: None, + } + } + + /// Fija el tope de captura en bytes (encadenable). + pub fn with_limit(mut self, bytes: usize) -> Self { + self.capture_limit = bytes; + self + } + + /// Alimenta `data` por la entrada estándar del proceso (encadenable). + pub fn with_stdin(mut self, data: impl Into) -> Self { + self.stdin_data = Some(data.into()); + self } } @@ -46,6 +82,8 @@ pub enum RunEvent { Stdout(String), /// Una línea de salida de error. Stderr(String), + /// La captura alcanzó su tope; lo que sigue se descarta. + Truncated, /// El proceso terminó con este código de salida. Exited(i32), /// El proceso no pudo siquiera lanzarse. @@ -122,6 +160,33 @@ impl RunHandle { } } +/// Lanza un hilo lector de un flujo. Cuenta los bytes contra `counter`; +/// pasado `limit` emite `Truncated` una vez (vía `announced`) y descarta +/// el resto, pero **sigue drenando** el pipe para no bloquear al proceso. +fn spawn_reader( + stream: R, + tx: Sender, + make: fn(String) -> RunEvent, + limit: usize, + counter: Arc, + announced: Arc, +) -> JoinHandle<()> { + std::thread::spawn(move || { + for line in BufReader::new(stream).lines().map_while(Result::ok) { + let total = counter.fetch_add(line.len() + 1, Ordering::Relaxed) + line.len() + 1; + if limit != 0 && total > limit { + if !announced.swap(true, Ordering::Relaxed) { + let _ = tx.send(RunEvent::Truncated); + } + continue; // descarta, pero sigue leyendo el pipe + } + if tx.send(make(line)).is_err() { + break; + } + } + }) +} + /// Lanza `spec` y devuelve un [`RunHandle`] desde el que drenar la /// salida. La función vuelve de inmediato: el proceso corre en hilos. pub fn run(spec: &CommandSpec) -> RunHandle { @@ -131,11 +196,16 @@ pub fn run(spec: &CommandSpec) -> RunHandle { let cell = Arc::clone(&child_cell); std::thread::spawn(move || { + let stdin_mode = if spec.stdin_data.is_some() { + Stdio::piped() + } else { + Stdio::null() + }; let spawned = Command::new(&spec.shell) .arg("-c") .arg(&spec.line) .current_dir(&spec.cwd) - .stdin(Stdio::null()) + .stdin(stdin_mode) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn(); @@ -148,32 +218,48 @@ pub fn run(spec: &CommandSpec) -> RunHandle { } }; - // Un hilo lector por flujo: stdout y stderr fluyen en paralelo. + // Si hay datos para reprocesar, se escriben por stdin en su + // propio hilo (la escritura puede bloquear hasta que el proceso + // consuma); al terminar, `stdin` se cierra → EOF. + if let Some(data) = spec.stdin_data.clone() { + if let Some(mut stdin) = child.stdin.take() { + std::thread::spawn(move || { + let _ = stdin.write_all(data.as_bytes()); + }); + } + } + let stdout = child.stdout.take(); let stderr = child.stderr.take(); // Comparte el proceso para que `RunHandle::kill` pueda alcanzarlo. if let Ok(mut g) = cell.lock() { *g = Some(child); } + + // Contador de bytes compartido: el tope vale para stdout+stderr. + let counter = Arc::new(AtomicUsize::new(0)); + let announced = Arc::new(AtomicBool::new(false)); + let limit = spec.capture_limit; + let out_reader = stdout.map(|s| { - let tx = tx.clone(); - std::thread::spawn(move || { - for line in BufReader::new(s).lines().map_while(Result::ok) { - if tx.send(RunEvent::Stdout(line)).is_err() { - break; - } - } - }) + spawn_reader( + s, + tx.clone(), + RunEvent::Stdout, + limit, + Arc::clone(&counter), + Arc::clone(&announced), + ) }); let err_reader = stderr.map(|s| { - let tx = tx.clone(); - std::thread::spawn(move || { - for line in BufReader::new(s).lines().map_while(Result::ok) { - if tx.send(RunEvent::Stderr(line)).is_err() { - break; - } - } - }) + spawn_reader( + s, + tx.clone(), + RunEvent::Stderr, + limit, + Arc::clone(&counter), + Arc::clone(&announced), + ) }); // Los lectores terminan cuando el proceso cierra sus pipes —sea @@ -203,7 +289,7 @@ mod tests { /// `sh` está en cualquier entorno POSIX — más portable que bash /// para los tests. fn sh(line: &str) -> CommandSpec { - CommandSpec { line: line.into(), cwd: ".".into(), shell: "sh".into() } + CommandSpec { shell: "sh".into(), ..CommandSpec::bash(line, ".") } } #[test] @@ -259,11 +345,7 @@ mod tests { #[test] fn missing_shell_fails_gracefully() { - let spec = CommandSpec { - line: "echo x".into(), - cwd: ".".into(), - shell: "/no/existe/shell-xyz".into(), - }; + let spec = CommandSpec { shell: "/no/existe/shell-xyz".into(), ..sh("echo x") }; let mut h = run(&spec); let events = h.wait_all(); assert!(matches!(events.first(), Some(RunEvent::Failed(_)))); @@ -274,18 +356,72 @@ mod tests { assert!(RunEvent::Exited(0).is_terminal()); assert!(RunEvent::Failed("x".into()).is_terminal()); assert!(!RunEvent::Stdout("x".into()).is_terminal()); + assert!(!RunEvent::Truncated.is_terminal()); } #[test] fn kill_stops_a_long_running_process() { let mut h = run(&sh("sleep 30")); - // Espera breve para que el proceso se haya lanzado. std::thread::sleep(std::time::Duration::from_millis(250)); h.kill(); - // wait_all retorna pronto (no espera los 30s) y cierra con un - // evento terminal. let events = h.wait_all(); assert!(events.last().map(|e| e.is_terminal()).unwrap_or(false)); assert!(h.is_finished()); } + + #[test] + fn capture_limit_truncates_but_process_finishes() { + // 20.000 líneas, pero la captura se corta a ~400 bytes. + let mut h = run(&sh("seq 1 20000").with_limit(400)); + let events = h.wait_all(); + // Se anunció el truncado… + assert!(events.contains(&RunEvent::Truncated)); + // …pero el proceso terminó normal (no se bloqueó). + assert!(events.contains(&RunEvent::Exited(0))); + // Y lo capturado quedó acotado. + let captured = events + .iter() + .filter(|e| matches!(e, RunEvent::Stdout(_))) + .count(); + assert!(captured < 20000, "la salida quedó acotada"); + } + + #[test] + fn no_limit_captures_everything() { + let mut h = run(&sh("seq 1 500")); // capture_limit = 0 + let events = h.wait_all(); + assert!(!events.contains(&RunEvent::Truncated)); + let n = events.iter().filter(|e| matches!(e, RunEvent::Stdout(_))).count(); + assert_eq!(n, 500); + } + + #[test] + fn stdin_data_is_fed_to_the_process() { + // `cat` devuelve por stdout lo que recibe por stdin — es el + // reproceso más simple: tomar una salida y pasarla a otro filtro. + let mut h = run(&sh("cat").with_stdin("alfa\nbeta\ngamma")); + let lines: Vec = h + .wait_all() + .into_iter() + .filter_map(|e| match e { + RunEvent::Stdout(l) => Some(l), + _ => None, + }) + .collect(); + assert_eq!(lines, vec!["alfa", "beta", "gamma"]); + } + + #[test] + fn stdin_data_reprocessed_by_a_filter() { + let mut h = run(&sh("grep beta").with_stdin("alfa\nbeta\nbetabel\ngamma")); + let lines: Vec = h + .wait_all() + .into_iter() + .filter_map(|e| match e { + RunEvent::Stdout(l) => Some(l), + _ => None, + }) + .collect(); + assert_eq!(lines, vec!["beta", "betabel"]); + } } diff --git a/crates/modules/shuma/shuma-session/src/lib.rs b/crates/modules/shuma/shuma-session/src/lib.rs index be39669..23e8af9 100644 --- a/crates/modules/shuma/shuma-session/src/lib.rs +++ b/crates/modules/shuma/shuma-session/src/lib.rs @@ -60,6 +60,8 @@ pub struct CommandRun { pub exit_code: Option, /// Salida — cada línea sabe si es de stdout o de stderr. pub output: Vec, + /// `true` si la salida superó el tope de captura y se descartó parte. + pub truncated: bool, /// Segundo Unix en que arrancó. pub started_at: u64, /// Segundo Unix en que terminó. @@ -167,12 +169,20 @@ impl WorkSession { status: RunStatus::Running, exit_code: None, output: Vec::new(), + truncated: false, started_at: now, finished_at: None, }); id } + /// Marca que la salida de un comando se truncó al tope de captura. + pub fn mark_truncated(&mut self, id: RunId) { + if let Some(r) = self.run_mut(id) { + r.truncated = true; + } + } + pub fn run(&self, id: RunId) -> Option<&CommandRun> { self.history.iter().find(|r| r.id == id) } diff --git a/nohup.out b/nohup.out index 92ac61f..b09ffd6 100644 --- a/nohup.out +++ b/nohup.out @@ -1285,3 +1285,6 @@ Failed to create DBUS proxy: Cannot autolaunch D-Bus without X11 $DISPLAY [Child 27081, MediaDecoderStateMachine #72] WARNING: 7249567b5160 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279 [Child 27081, MediaDecoderStateMachine #72] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630 [Child 27081, MediaDecoderStateMachine #72] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168 +[Child 27081, MediaDecoderStateMachine #83] WARNING: 7249567b55e0 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279 +[Child 27081, MediaDecoderStateMachine #83] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630 +[Child 27081, MediaDecoderStateMachine #83] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168