feat(shuma): captura acotada + reproceso de salidas vía stdin

shuma-exec: cota dura de memoria. CommandSpec.capture_limit (bytes):
pasado el tope se emite RunEvent::Truncated una vez y el resto se
descarta —pero el pipe se sigue drenando, así el proceso no se
bloquea y termina normal. CommandSpec.stdin_data alimenta un texto
por la entrada estándar (escrito en su propio hilo).

shuma-session: CommandRun.truncated.

shuma-shell: tope de captura de 8 MiB por comando. Cada card con
salida muestra «⤳ reprocesar» — al pulsarlo, el próximo comando
filtra esa salida capturada (vía stdin) sin re-ejecutar el original;
un banner marca el modo. Las cards truncadas avisan «⚠ truncado».

shuma-exec: 12 tests (incluye truncado y reproceso por stdin).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-20 19:53:42 +00:00
parent 0740d2e2af
commit b4be5e1c72
4 changed files with 281 additions and 42 deletions
+100 -10
View File
@@ -36,6 +36,10 @@ use shuma_sysmon::{Snapshot, SystemSampler};
/// Cuántas muestras guarda la curva de cada monitor.
const HISTORY: usize = 80;
/// Tope de captura de salida por comando — 8 MiB. Pasado el tope la
/// salida se descarta: cota dura de memoria ante un stream gigante.
const CAPTURE_LIMIT: usize = 8 * 1024 * 1024;
/// Archivos/directorios que delatan la estructura de un proyecto.
const PROJECT_MARKERS: &[&str] = &[
".git",
@@ -282,6 +286,8 @@ struct Shell {
group_anchor: usize,
/// Patrones detectados por el motor de inferencia (cache).
patterns: Vec<shuma_infer::EmergingPattern>,
/// Si está activo, el próximo comando reprocesa la salida de este run.
reprocess_source: Option<RunId>,
/// Scroll del feed central — sigue al comando más reciente.
scroll: ScrollHandle,
focus: FocusHandle,
@@ -323,6 +329,7 @@ impl Shell {
run_ui: HashMap::new(),
group_anchor: 0,
patterns: Vec::new(),
reprocess_source: None,
scroll: ScrollHandle::new(),
focus: cx.focus_handle(),
focused_once: false,
@@ -372,6 +379,7 @@ impl Shell {
RunEvent::Stderr(l) => {
self.session.append_output(*id, Stream::Stderr, l)
}
RunEvent::Truncated => self.session.mark_truncated(*id),
RunEvent::Exited(code) => self.session.finish_run(*id, code, now),
RunEvent::Failed(msg) => {
self.session.append_output(
@@ -559,7 +567,35 @@ impl Shell {
let id = self.session.begin_run(&line, now);
self.run_ui.insert(id, RunUi::default());
let spec = CommandSpec::bash(&line, self.session.cwd());
let spec = CommandSpec::bash(&line, self.session.cwd()).with_limit(CAPTURE_LIMIT);
self.active.push((id, exec_run(&spec)));
self.scroll.scroll_to_bottom();
}
/// Reprocesa la salida capturada del comando `source`: ejecuta `line`
/// alimentándole esa salida por stdin, sin volver a correr el
/// original. Así un resultado se filtra con distintas herramientas.
fn run_reprocess(&mut self, line: String, source: RunId) {
let line = line.trim().to_string();
if line.is_empty() {
return;
}
let data: String = self
.session
.run(source)
.map(|r| r.lines_of(Stream::Stdout).collect::<Vec<_>>().join("\n"))
.unwrap_or_default();
for ui in self.run_ui.values_mut() {
if !ui.user_touched {
ui.collapsed = true;
}
}
let now = unix_now();
let id = self.session.begin_run(&line, now);
self.run_ui.insert(id, RunUi::default());
let spec = CommandSpec::bash(&line, self.session.cwd())
.with_limit(CAPTURE_LIMIT)
.with_stdin(data);
self.active.push((id, exec_run(&spec)));
self.scroll.scroll_to_bottom();
}
@@ -610,14 +646,19 @@ impl Shell {
}
}
/// Enter — ejecuta el contenido del input.
/// Enter — ejecuta el contenido del input, o reprocesa una salida
/// previa si hay un origen de reproceso activo.
fn submit(&mut self) {
let line = self.line.text().to_string();
self.line.clear();
self.completion = None;
self.show_completion = false;
if let Some(source) = self.reprocess_source.take() {
self.run_reprocess(line, source);
} else {
self.run_command(line);
}
}
fn handle_key(&mut self, event: &KeyDownEvent, _w: &mut Window, cx: &mut Context<Self>) {
let ks = &event.keystroke;
@@ -633,6 +674,7 @@ impl Shell {
}
"escape" => {
self.show_completion = false;
self.reprocess_source = None;
cx.notify();
return;
}
@@ -780,18 +822,25 @@ fn render_run(
RunStatus::Failed => ("", gpui::hsla(2.0 / 360.0, 0.68, 0.60, 1.0)),
};
let stderr_color = gpui::hsla(8.0 / 360.0, 0.62, 0.66, 1.0);
let accent = gpui::hsla(190.0 / 360.0, 0.60, 0.62, 1.0);
// Nota a la derecha: código de salida, y al colapsar, conteo de líneas.
let mut note = match r.exit_code {
Some(0) | None => String::new(),
Some(c) => format!("salió {c}"),
};
// Nota a la derecha: salida no-cero, truncado, y conteo si colapsada.
let mut parts: Vec<String> = Vec::new();
if let Some(c) = r.exit_code {
if c != 0 {
parts.push(format!("salió {c}"));
}
}
if r.truncated {
parts.push("⚠ truncado".to_string());
}
if ui.collapsed {
let n = r.count_of(Stream::Stdout);
if n > 0 {
note = format!("{note} · {n} líneas").trim_start().to_string();
parts.push(format!("{n} líneas"));
}
}
let note = parts.join(" · ");
// Cabecera-acordeón: un clic colapsa/expande.
let caret = if ui.collapsed { "" } else { "" };
@@ -870,6 +919,29 @@ fn render_run(
None
};
// Reprocesar — sólo si el comando dejó algo en stdout que filtrar.
let reprocess_chip = if r.count_of(Stream::Stdout) > 0 {
Some(
div()
.id(SharedString::from(format!("repro-{id}")))
.flex_none()
.px(px(6.))
.py(px(1.))
.rounded(px(3.))
.text_size(px(11.))
.text_color(accent)
.cursor_pointer()
.hover(|s| s.text_color(gpui::hsla(0.0, 0.0, 0.95, 1.0)))
.child("⤳ reprocesar")
.on_click(cx.listener(move |shell, _, _, cx| {
shell.reprocess_source = Some(id);
cx.notify();
})),
)
} else {
None
};
let header = div()
.flex()
.flex_row()
@@ -877,6 +949,7 @@ fn render_run(
.gap(px(6.))
.child(header_left)
.children(stderr_chip)
.children(reprocess_chip)
.children(kill_chip);
// Cuerpo: sólo con el acordeón abierto. El filtro elige el flujo.
@@ -1291,16 +1364,33 @@ impl Render for Shell {
.child(SharedString::from(ghost)),
);
}
let prompt = div()
let input_bar = div()
.h(px(46.))
.flex()
.flex_row()
.items_center()
.px(px(14.))
.bg(panel)
.text_color(text)
.text_size(px(14.))
.children(input_row);
// Banner del modo reproceso — escribí un filtro para la salida.
let banner = self.reprocess_source.map(|src| {
div()
.px(px(14.))
.py(px(3.))
.bg(gpui::hsla(190.0 / 360.0, 0.30, 0.22, 1.0))
.text_size(px(11.))
.text_color(accent)
.child(SharedString::from(format!(
"⤳ reprocesando la salida de #{src} — escribí un filtro · Esc cancela"
)))
});
let prompt = div()
.flex()
.flex_col()
.bg(panel)
.children(banner)
.child(input_bar);
// --- Popup de autocompletado ---
let mut popup_layer: Vec<gpui::Div> = Vec::new();
+167 -31
View File
@@ -10,16 +10,29 @@
//! shell, en cambio, corre líneas de shell ad-hoc y necesita ver la
//! salida fluir. Dos capas distintas, a propósito.
//!
//! **Captura acotada.** Para no cargar en RAM un stream de gigabytes, la
//! captura tiene un límite de bytes ([`CommandSpec::capture_limit`]):
//! pasado el límite se emite [`RunEvent::Truncated`] una vez y el resto
//! se **descarta** — pero el pipe se sigue drenando, así el proceso no
//! se bloquea y termina normal.
//!
//! **Reproceso.** [`CommandSpec::stdin_data`] alimenta un texto por la
//! entrada estándar del proceso: permite reprocesar la salida capturada
//! de un comando previo con otra herramienta, sin volver a correr el
//! comando original.
//!
//! El crate es agnóstico de frontend: el proceso y sus lectores corren
//! en hilos; el consumidor (shell GPUI o TUI) drena el canal cuando
//! quiere — sin `async`, sin acoplarse a ningún runtime.
#![forbid(unsafe_code)]
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Read, Write};
use std::process::{Child, Command, Stdio};
use std::sync::mpsc::{self, Receiver, TryRecvError};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, TryRecvError};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
/// Qué ejecutar: una línea de comandos, en un directorio, con un shell.
#[derive(Debug, Clone)]
@@ -30,12 +43,35 @@ pub struct CommandSpec {
pub cwd: String,
/// Programa de shell — `"bash"`, `"sh"`, `"fish"`…
pub shell: String,
/// Tope de bytes a capturar; `0` = sin límite. Pasado el tope, la
/// salida se descarta (se emite [`RunEvent::Truncated`]).
pub capture_limit: usize,
/// Texto a alimentar por stdin — para reprocesar una salida previa.
pub stdin_data: Option<String>,
}
impl CommandSpec {
/// Spec con `bash` como shell.
/// Spec con `bash` como shell, sin límite ni stdin.
pub fn bash(line: impl Into<String>, cwd: impl Into<String>) -> Self {
Self { line: line.into(), cwd: cwd.into(), shell: "bash".into() }
Self {
line: line.into(),
cwd: cwd.into(),
shell: "bash".into(),
capture_limit: 0,
stdin_data: None,
}
}
/// Fija el tope de captura en bytes (encadenable).
pub fn with_limit(mut self, bytes: usize) -> Self {
self.capture_limit = bytes;
self
}
/// Alimenta `data` por la entrada estándar del proceso (encadenable).
pub fn with_stdin(mut self, data: impl Into<String>) -> Self {
self.stdin_data = Some(data.into());
self
}
}
@@ -46,6 +82,8 @@ pub enum RunEvent {
Stdout(String),
/// Una línea de salida de error.
Stderr(String),
/// La captura alcanzó su tope; lo que sigue se descarta.
Truncated,
/// El proceso terminó con este código de salida.
Exited(i32),
/// El proceso no pudo siquiera lanzarse.
@@ -122,6 +160,33 @@ impl RunHandle {
}
}
/// Lanza un hilo lector de un flujo. Cuenta los bytes contra `counter`;
/// pasado `limit` emite `Truncated` una vez (vía `announced`) y descarta
/// el resto, pero **sigue drenando** el pipe para no bloquear al proceso.
fn spawn_reader<R: Read + Send + 'static>(
stream: R,
tx: Sender<RunEvent>,
make: fn(String) -> RunEvent,
limit: usize,
counter: Arc<AtomicUsize>,
announced: Arc<AtomicBool>,
) -> JoinHandle<()> {
std::thread::spawn(move || {
for line in BufReader::new(stream).lines().map_while(Result::ok) {
let total = counter.fetch_add(line.len() + 1, Ordering::Relaxed) + line.len() + 1;
if limit != 0 && total > limit {
if !announced.swap(true, Ordering::Relaxed) {
let _ = tx.send(RunEvent::Truncated);
}
continue; // descarta, pero sigue leyendo el pipe
}
if tx.send(make(line)).is_err() {
break;
}
}
})
}
/// Lanza `spec` y devuelve un [`RunHandle`] desde el que drenar la
/// salida. La función vuelve de inmediato: el proceso corre en hilos.
pub fn run(spec: &CommandSpec) -> RunHandle {
@@ -131,11 +196,16 @@ pub fn run(spec: &CommandSpec) -> RunHandle {
let cell = Arc::clone(&child_cell);
std::thread::spawn(move || {
let stdin_mode = if spec.stdin_data.is_some() {
Stdio::piped()
} else {
Stdio::null()
};
let spawned = Command::new(&spec.shell)
.arg("-c")
.arg(&spec.line)
.current_dir(&spec.cwd)
.stdin(Stdio::null())
.stdin(stdin_mode)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
@@ -148,32 +218,48 @@ pub fn run(spec: &CommandSpec) -> RunHandle {
}
};
// Un hilo lector por flujo: stdout y stderr fluyen en paralelo.
// Si hay datos para reprocesar, se escriben por stdin en su
// propio hilo (la escritura puede bloquear hasta que el proceso
// consuma); al terminar, `stdin` se cierra → EOF.
if let Some(data) = spec.stdin_data.clone() {
if let Some(mut stdin) = child.stdin.take() {
std::thread::spawn(move || {
let _ = stdin.write_all(data.as_bytes());
});
}
}
let stdout = child.stdout.take();
let stderr = child.stderr.take();
// Comparte el proceso para que `RunHandle::kill` pueda alcanzarlo.
if let Ok(mut g) = cell.lock() {
*g = Some(child);
}
// Contador de bytes compartido: el tope vale para stdout+stderr.
let counter = Arc::new(AtomicUsize::new(0));
let announced = Arc::new(AtomicBool::new(false));
let limit = spec.capture_limit;
let out_reader = stdout.map(|s| {
let tx = tx.clone();
std::thread::spawn(move || {
for line in BufReader::new(s).lines().map_while(Result::ok) {
if tx.send(RunEvent::Stdout(line)).is_err() {
break;
}
}
})
spawn_reader(
s,
tx.clone(),
RunEvent::Stdout,
limit,
Arc::clone(&counter),
Arc::clone(&announced),
)
});
let err_reader = stderr.map(|s| {
let tx = tx.clone();
std::thread::spawn(move || {
for line in BufReader::new(s).lines().map_while(Result::ok) {
if tx.send(RunEvent::Stderr(line)).is_err() {
break;
}
}
})
spawn_reader(
s,
tx.clone(),
RunEvent::Stderr,
limit,
Arc::clone(&counter),
Arc::clone(&announced),
)
});
// Los lectores terminan cuando el proceso cierra sus pipes —sea
@@ -203,7 +289,7 @@ mod tests {
/// `sh` está en cualquier entorno POSIX — más portable que bash
/// para los tests.
fn sh(line: &str) -> CommandSpec {
CommandSpec { line: line.into(), cwd: ".".into(), shell: "sh".into() }
CommandSpec { shell: "sh".into(), ..CommandSpec::bash(line, ".") }
}
#[test]
@@ -259,11 +345,7 @@ mod tests {
#[test]
fn missing_shell_fails_gracefully() {
let spec = CommandSpec {
line: "echo x".into(),
cwd: ".".into(),
shell: "/no/existe/shell-xyz".into(),
};
let spec = CommandSpec { shell: "/no/existe/shell-xyz".into(), ..sh("echo x") };
let mut h = run(&spec);
let events = h.wait_all();
assert!(matches!(events.first(), Some(RunEvent::Failed(_))));
@@ -274,18 +356,72 @@ mod tests {
assert!(RunEvent::Exited(0).is_terminal());
assert!(RunEvent::Failed("x".into()).is_terminal());
assert!(!RunEvent::Stdout("x".into()).is_terminal());
assert!(!RunEvent::Truncated.is_terminal());
}
#[test]
fn kill_stops_a_long_running_process() {
let mut h = run(&sh("sleep 30"));
// Espera breve para que el proceso se haya lanzado.
std::thread::sleep(std::time::Duration::from_millis(250));
h.kill();
// wait_all retorna pronto (no espera los 30s) y cierra con un
// evento terminal.
let events = h.wait_all();
assert!(events.last().map(|e| e.is_terminal()).unwrap_or(false));
assert!(h.is_finished());
}
#[test]
fn capture_limit_truncates_but_process_finishes() {
// 20.000 líneas, pero la captura se corta a ~400 bytes.
let mut h = run(&sh("seq 1 20000").with_limit(400));
let events = h.wait_all();
// Se anunció el truncado…
assert!(events.contains(&RunEvent::Truncated));
// …pero el proceso terminó normal (no se bloqueó).
assert!(events.contains(&RunEvent::Exited(0)));
// Y lo capturado quedó acotado.
let captured = events
.iter()
.filter(|e| matches!(e, RunEvent::Stdout(_)))
.count();
assert!(captured < 20000, "la salida quedó acotada");
}
#[test]
fn no_limit_captures_everything() {
let mut h = run(&sh("seq 1 500")); // capture_limit = 0
let events = h.wait_all();
assert!(!events.contains(&RunEvent::Truncated));
let n = events.iter().filter(|e| matches!(e, RunEvent::Stdout(_))).count();
assert_eq!(n, 500);
}
#[test]
fn stdin_data_is_fed_to_the_process() {
// `cat` devuelve por stdout lo que recibe por stdin — es el
// reproceso más simple: tomar una salida y pasarla a otro filtro.
let mut h = run(&sh("cat").with_stdin("alfa\nbeta\ngamma"));
let lines: Vec<String> = h
.wait_all()
.into_iter()
.filter_map(|e| match e {
RunEvent::Stdout(l) => Some(l),
_ => None,
})
.collect();
assert_eq!(lines, vec!["alfa", "beta", "gamma"]);
}
#[test]
fn stdin_data_reprocessed_by_a_filter() {
let mut h = run(&sh("grep beta").with_stdin("alfa\nbeta\nbetabel\ngamma"));
let lines: Vec<String> = h
.wait_all()
.into_iter()
.filter_map(|e| match e {
RunEvent::Stdout(l) => Some(l),
_ => None,
})
.collect();
assert_eq!(lines, vec!["beta", "betabel"]);
}
}
@@ -60,6 +60,8 @@ pub struct CommandRun {
pub exit_code: Option<i32>,
/// Salida — cada línea sabe si es de stdout o de stderr.
pub output: Vec<OutputLine>,
/// `true` si la salida superó el tope de captura y se descartó parte.
pub truncated: bool,
/// Segundo Unix en que arrancó.
pub started_at: u64,
/// Segundo Unix en que terminó.
@@ -167,12 +169,20 @@ impl WorkSession {
status: RunStatus::Running,
exit_code: None,
output: Vec::new(),
truncated: false,
started_at: now,
finished_at: None,
});
id
}
/// Marca que la salida de un comando se truncó al tope de captura.
pub fn mark_truncated(&mut self, id: RunId) {
if let Some(r) = self.run_mut(id) {
r.truncated = true;
}
}
pub fn run(&self, id: RunId) -> Option<&CommandRun> {
self.history.iter().find(|r| r.id == id)
}
+3
View File
@@ -1285,3 +1285,6 @@ Failed to create DBUS proxy: Cannot autolaunch D-Bus without X11 $DISPLAY
[Child 27081, MediaDecoderStateMachine #72] WARNING: 7249567b5160 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279
[Child 27081, MediaDecoderStateMachine #72] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630
[Child 27081, MediaDecoderStateMachine #72] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168
[Child 27081, MediaDecoderStateMachine #83] WARNING: 7249567b55e0 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279
[Child 27081, MediaDecoderStateMachine #83] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630
[Child 27081, MediaDecoderStateMachine #83] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168