feat(shuma): shuma-session + shuma-exec — sesiones de trabajo y ejecución

shuma-session: el shell trabaja dentro de una WorkSession — directorio
actual (que es el identificador de aislamiento, hash estable del cwd),
historial de comandos ejecutados (CommandRun con salida y estado) y
grupos de comandos guardados y reutilizables (CommandGroup).

shuma-exec: ejecutor con salida en streaming — lanza bash -c en un
directorio y entrega stdout/stderr línea a línea por un canal, sin
esperar al final. Es la capa que sandokan (poll-based, orquesta Cards)
deliberadamente no cubre.

15 tests. Agnósticos de UI, #![forbid(unsafe_code)].

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-20 18:32:59 +00:00
parent 8250ecbc0f
commit 0f2f1033eb
7 changed files with 601 additions and 0 deletions
Generated
+11
View File
@@ -11380,6 +11380,10 @@ dependencies = [
"toml 0.8.23", "toml 0.8.23",
] ]
[[package]]
name = "shuma-exec"
version = "0.1.0"
[[package]] [[package]]
name = "shuma-gateway" name = "shuma-gateway"
version = "0.1.0" version = "0.1.0"
@@ -11420,6 +11424,13 @@ dependencies = [
"ulid", "ulid",
] ]
[[package]]
name = "shuma-session"
version = "0.1.0"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "shuma-shell" name = "shuma-shell"
version = "0.1.0" version = "0.1.0"
+2
View File
@@ -182,6 +182,8 @@ members = [
"crates/modules/shuma/shuma-intent", "crates/modules/shuma/shuma-intent",
"crates/modules/shuma/shuma-line", "crates/modules/shuma/shuma-line",
"crates/modules/shuma/shuma-sysmon", "crates/modules/shuma/shuma-sysmon",
"crates/modules/shuma/shuma-session",
"crates/modules/shuma/shuma-exec",
"crates/modules/shuma/shuma-shell-render", "crates/modules/shuma/shuma-shell-render",
# ============================================================ # ============================================================
@@ -0,0 +1,10 @@
[package]
name = "shuma-exec"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "shuma — ejecutor de comandos del shell con salida en streaming: lanza un proceso y entrega stdout/stderr línea a línea por un canal. Agnóstico de UI."
[dependencies]
+256
View File
@@ -0,0 +1,256 @@
//! `shuma-exec` — ejecución de comandos del shell con salida en streaming.
//!
//! Lanza una línea de comandos en un shell (`bash -c …`) dentro de un
//! directorio, y entrega su salida **a medida que ocurre**: cada línea
//! de stdout o stderr llega como un [`RunEvent`] por un canal, sin
//! esperar a que el proceso termine.
//!
//! Esto es lo que `sandokan` no hace: el orquestador es poll-based y
//! orquesta *Cards* de brahman (entidades aisladas y supervisadas). El
//! shell, en cambio, corre líneas de shell ad-hoc y necesita ver la
//! salida fluir. Dos capas distintas, a propósito.
//!
//! El crate es agnóstico de frontend: el proceso y sus lectores corren
//! en hilos; el consumidor (shell GPUI o TUI) drena el canal cuando
//! quiere — sin `async`, sin acoplarse a ningún runtime.
#![forbid(unsafe_code)]
use std::io::{BufRead, BufReader};
use std::process::{Command, Stdio};
use std::sync::mpsc::{self, Receiver, TryRecvError};
/// Qué ejecutar: una línea de comandos, en un directorio, con un shell.
#[derive(Debug, Clone)]
pub struct CommandSpec {
/// La línea completa — se pasa como `shell -c "<line>"`.
pub line: String,
/// Directorio de trabajo del proceso.
pub cwd: String,
/// Programa de shell — `"bash"`, `"sh"`, `"fish"`…
pub shell: String,
}
impl CommandSpec {
/// Spec con `bash` como shell.
pub fn bash(line: impl Into<String>, cwd: impl Into<String>) -> Self {
Self { line: line.into(), cwd: cwd.into(), shell: "bash".into() }
}
}
/// Un evento de la ejecución de un comando.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RunEvent {
/// Una línea de salida estándar.
Stdout(String),
/// Una línea de salida de error.
Stderr(String),
/// El proceso terminó con este código de salida.
Exited(i32),
/// El proceso no pudo siquiera lanzarse.
Failed(String),
}
impl RunEvent {
/// `true` si el evento cierra la ejecución (`Exited` o `Failed`).
pub fn is_terminal(&self) -> bool {
matches!(self, RunEvent::Exited(_) | RunEvent::Failed(_))
}
}
/// Asa de un comando en ejecución. El consumidor la conserva y drena sus
/// eventos cuando le conviene.
pub struct RunHandle {
rx: Receiver<RunEvent>,
finished: bool,
}
impl RunHandle {
/// Drena todos los eventos disponibles ahora mismo, sin bloquear.
/// Marca el asa como terminada al ver un evento terminal.
pub fn try_events(&mut self) -> Vec<RunEvent> {
let mut out = Vec::new();
loop {
match self.rx.try_recv() {
Ok(ev) => {
if ev.is_terminal() {
self.finished = true;
}
out.push(ev);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
self.finished = true;
break;
}
}
}
out
}
/// Bloquea hasta que el proceso termine y devuelve todos sus eventos
/// en orden. Pensado para tests y para usos sincrónicos.
pub fn wait_all(&mut self) -> Vec<RunEvent> {
let mut out = Vec::new();
while let Ok(ev) = self.rx.recv() {
let terminal = ev.is_terminal();
out.push(ev);
if terminal {
self.finished = true;
}
}
self.finished = true;
out
}
/// `true` si ya se observó el evento terminal.
pub fn is_finished(&self) -> bool {
self.finished
}
}
/// Lanza `spec` y devuelve un [`RunHandle`] desde el que drenar la
/// salida. La función vuelve de inmediato: el proceso corre en hilos.
pub fn run(spec: &CommandSpec) -> RunHandle {
let (tx, rx) = mpsc::channel();
let spec = spec.clone();
std::thread::spawn(move || {
let spawned = Command::new(&spec.shell)
.arg("-c")
.arg(&spec.line)
.current_dir(&spec.cwd)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
let mut child = match spawned {
Ok(c) => c,
Err(e) => {
let _ = tx.send(RunEvent::Failed(e.to_string()));
return;
}
};
// Un hilo lector por flujo: stdout y stderr fluyen en paralelo.
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let out_reader = stdout.map(|s| {
let tx = tx.clone();
std::thread::spawn(move || {
for line in BufReader::new(s).lines().map_while(Result::ok) {
if tx.send(RunEvent::Stdout(line)).is_err() {
break;
}
}
})
});
let err_reader = stderr.map(|s| {
let tx = tx.clone();
std::thread::spawn(move || {
for line in BufReader::new(s).lines().map_while(Result::ok) {
if tx.send(RunEvent::Stderr(line)).is_err() {
break;
}
}
})
});
if let Some(h) = out_reader {
let _ = h.join();
}
if let Some(h) = err_reader {
let _ = h.join();
}
let code = child
.wait()
.ok()
.and_then(|s| s.code())
.unwrap_or(-1);
let _ = tx.send(RunEvent::Exited(code));
});
RunHandle { rx, finished: false }
}
#[cfg(test)]
mod tests {
use super::*;
/// `sh` está en cualquier entorno POSIX — más portable que bash
/// para los tests.
fn sh(line: &str) -> CommandSpec {
CommandSpec { line: line.into(), cwd: ".".into(), shell: "sh".into() }
}
#[test]
fn captures_stdout_and_exit_code() {
let mut h = run(&sh("echo hola-mundo"));
let events = h.wait_all();
assert!(events.contains(&RunEvent::Stdout("hola-mundo".into())));
assert!(events.contains(&RunEvent::Exited(0)));
assert!(h.is_finished());
}
#[test]
fn captures_stderr() {
let mut h = run(&sh("echo problema 1>&2"));
let events = h.wait_all();
assert!(events.contains(&RunEvent::Stderr("problema".into())));
}
#[test]
fn nonzero_exit_is_reported() {
let mut h = run(&sh("exit 3"));
let events = h.wait_all();
assert!(events.contains(&RunEvent::Exited(3)));
}
#[test]
fn multiple_output_lines_arrive_in_order() {
let mut h = run(&sh("echo uno; echo dos; echo tres"));
let lines: Vec<String> = h
.wait_all()
.into_iter()
.filter_map(|e| match e {
RunEvent::Stdout(l) => Some(l),
_ => None,
})
.collect();
assert_eq!(lines, vec!["uno", "dos", "tres"]);
}
#[test]
fn pipes_run_through_the_shell() {
let mut h = run(&sh("printf 'b\\na\\nc\\n' | sort"));
let lines: Vec<String> = h
.wait_all()
.into_iter()
.filter_map(|e| match e {
RunEvent::Stdout(l) => Some(l),
_ => None,
})
.collect();
assert_eq!(lines, vec!["a", "b", "c"]);
}
#[test]
fn missing_shell_fails_gracefully() {
let spec = CommandSpec {
line: "echo x".into(),
cwd: ".".into(),
shell: "/no/existe/shell-xyz".into(),
};
let mut h = run(&spec);
let events = h.wait_all();
assert!(matches!(events.first(), Some(RunEvent::Failed(_))));
}
#[test]
fn terminal_event_detection() {
assert!(RunEvent::Exited(0).is_terminal());
assert!(RunEvent::Failed("x".into()).is_terminal());
assert!(!RunEvent::Stdout("x".into()).is_terminal());
}
}
@@ -0,0 +1,11 @@
[package]
name = "shuma-session"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "shuma — el modelo de la sesión de trabajo: directorio actual (identificador de aislamiento), historial de comandos ejecutados y grupos reutilizables."
[dependencies]
serde = { workspace = true }
@@ -0,0 +1,308 @@
//! `shuma-session` — la sesión de trabajo del shell.
//!
//! El shell no es una sucesión suelta de comandos: trabaja *dentro de
//! una sesión*. Una [`WorkSession`] contiene:
//!
//! - el **directorio actual** — que es además el identificador de
//! aislamiento (cada directorio es un contexto separado);
//! - el **historial** de comandos ejecutados, cada uno con su salida y
//! su estado ([`CommandRun`]);
//! - los **grupos** de comandos guardados y reutilizables
//! ([`CommandGroup`]).
//!
//! Modelo puro y agnóstico: la ejecución real la hace `shuma-exec`, el
//! tiempo lo inyecta el caller. Determinista y testeable.
#![forbid(unsafe_code)]
use serde::{Deserialize, Serialize};
/// Identificador de un comando dentro de su sesión.
pub type RunId = u64;
/// Estado de un comando ejecutado.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunStatus {
/// Ejecutándose; su salida sigue llegando.
Running,
/// Terminó con código 0.
Ok,
/// Terminó con código distinto de 0, o no pudo lanzarse.
Failed,
}
/// Un comando ejecutado: la línea, el directorio, el estado y la salida.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommandRun {
pub id: RunId,
/// La línea de comandos tal como se escribió.
pub line: String,
/// El directorio en que se ejecutó.
pub cwd: String,
pub status: RunStatus,
/// Código de salida, una vez terminado.
pub exit_code: Option<i32>,
/// Salida combinada (stdout + stderr), una línea por elemento.
pub output: Vec<String>,
/// Segundo Unix en que arrancó.
pub started_at: u64,
/// Segundo Unix en que terminó.
pub finished_at: Option<u64>,
}
impl CommandRun {
/// `true` si el comando sigue corriendo.
pub fn is_running(&self) -> bool {
self.status == RunStatus::Running
}
/// Cantidad de líneas de salida.
pub fn line_count(&self) -> usize {
self.output.len()
}
}
/// Un grupo de comandos guardado para reutilizar — una receta.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommandGroup {
pub name: String,
/// Las líneas de comando, en orden de ejecución.
pub lines: Vec<String>,
}
/// La sesión de trabajo: directorio actual + historial + grupos.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkSession {
pub name: String,
cwd: String,
history: Vec<CommandRun>,
groups: Vec<CommandGroup>,
next_id: RunId,
}
/// FNV-1a de 64 bits — base del identificador de aislamiento.
fn fnv1a(bytes: &[u8]) -> u64 {
let mut h: u64 = 0xcbf2_9ce4_8422_2325;
for &b in bytes {
h ^= b as u64;
h = h.wrapping_mul(0x100_0000_01b3);
}
h
}
impl WorkSession {
/// Abre una sesión con un nombre y un directorio inicial.
pub fn new(name: impl Into<String>, cwd: impl Into<String>) -> Self {
Self {
name: name.into(),
cwd: cwd.into(),
history: Vec::new(),
groups: Vec::new(),
next_id: 1,
}
}
/// Directorio actual de la sesión.
pub fn cwd(&self) -> &str {
&self.cwd
}
/// Cambia el directorio actual — y con él, el contexto de aislamiento.
pub fn set_cwd(&mut self, cwd: impl Into<String>) {
self.cwd = cwd.into();
}
/// Identificador de aislamiento del directorio actual: un hash corto
/// y estable del `cwd`. Cada directorio es un contexto distinto, así
/// que el id cambia al hacer `cd`.
pub fn isolation_id(&self) -> String {
format!("{:012x}", fnv1a(self.cwd.as_bytes()) & 0xffff_ffff_ffff)
}
// --- Historial de comandos ---
/// Registra el inicio de un comando (estado `Running`) en el `cwd`
/// actual. Devuelve su id.
pub fn begin_run(&mut self, line: impl Into<String>, now: u64) -> RunId {
let id = self.next_id;
self.next_id += 1;
self.history.push(CommandRun {
id,
line: line.into(),
cwd: self.cwd.clone(),
status: RunStatus::Running,
exit_code: None,
output: Vec::new(),
started_at: now,
finished_at: None,
});
id
}
pub fn run(&self, id: RunId) -> Option<&CommandRun> {
self.history.iter().find(|r| r.id == id)
}
fn run_mut(&mut self, id: RunId) -> Option<&mut CommandRun> {
self.history.iter_mut().find(|r| r.id == id)
}
/// Añade una línea de salida a un comando en curso.
pub fn append_output(&mut self, id: RunId, line: impl Into<String>) {
if let Some(r) = self.run_mut(id) {
r.output.push(line.into());
}
}
/// Marca un comando como terminado con su código de salida.
pub fn finish_run(&mut self, id: RunId, exit_code: i32, now: u64) {
if let Some(r) = self.run_mut(id) {
r.exit_code = Some(exit_code);
r.status = if exit_code == 0 { RunStatus::Ok } else { RunStatus::Failed };
r.finished_at = Some(now);
}
}
/// Historial completo, del más antiguo al más reciente.
pub fn history(&self) -> &[CommandRun] {
&self.history
}
/// Comandos que siguen corriendo.
pub fn running(&self) -> Vec<RunId> {
self.history
.iter()
.filter(|r| r.is_running())
.map(|r| r.id)
.collect()
}
/// Vacía el historial (no toca los grupos ni el `cwd`).
pub fn clear_history(&mut self) {
self.history.clear();
}
// --- Grupos reutilizables ---
/// Guarda un grupo de comandos. Si ya existe uno con ese nombre, lo
/// reemplaza.
pub fn save_group(&mut self, name: impl Into<String>, lines: Vec<String>) {
let name = name.into();
self.groups.retain(|g| g.name != name);
self.groups.push(CommandGroup { name, lines });
}
/// Guarda como grupo las líneas de los últimos `n` comandos del
/// historial — la forma natural de "convertir lo que acabo de hacer
/// en una receta".
pub fn save_recent_as_group(&mut self, name: impl Into<String>, n: usize) {
let lines: Vec<String> = self
.history
.iter()
.rev()
.take(n)
.rev()
.map(|r| r.line.clone())
.collect();
self.save_group(name, lines);
}
pub fn groups(&self) -> &[CommandGroup] {
&self.groups
}
pub fn group(&self, name: &str) -> Option<&CommandGroup> {
self.groups.iter().find(|g| g.name == name)
}
/// Quita un grupo. `true` si existía.
pub fn remove_group(&mut self, name: &str) -> bool {
let before = self.groups.len();
self.groups.retain(|g| g.name != name);
self.groups.len() != before
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn isolation_id_follows_the_directory() {
let mut s = WorkSession::new("trabajo", "/home/sergio/brahman");
let id_a = s.isolation_id();
s.set_cwd("/tmp");
let id_b = s.isolation_id();
assert_ne!(id_a, id_b, "cd cambia el contexto de aislamiento");
// Estable: el mismo directorio da el mismo id.
s.set_cwd("/home/sergio/brahman");
assert_eq!(s.isolation_id(), id_a);
}
#[test]
fn a_run_records_its_directory() {
let mut s = WorkSession::new("t", "/home");
let id = s.begin_run("ls -la", 1000);
assert_eq!(s.run(id).unwrap().cwd, "/home");
assert_eq!(s.run(id).unwrap().status, RunStatus::Running);
}
#[test]
fn output_accumulates_and_run_finishes() {
let mut s = WorkSession::new("t", "/home");
let id = s.begin_run("echo hola", 1000);
s.append_output(id, "hola");
s.finish_run(id, 0, 1001);
let r = s.run(id).unwrap();
assert_eq!(r.output, vec!["hola"]);
assert_eq!(r.status, RunStatus::Ok);
assert_eq!(r.exit_code, Some(0));
assert_eq!(r.finished_at, Some(1001));
}
#[test]
fn nonzero_exit_marks_failed() {
let mut s = WorkSession::new("t", "/home");
let id = s.begin_run("false", 0);
s.finish_run(id, 1, 1);
assert_eq!(s.run(id).unwrap().status, RunStatus::Failed);
}
#[test]
fn running_lists_unfinished_commands() {
let mut s = WorkSession::new("t", "/home");
let a = s.begin_run("sleep 1", 0);
let b = s.begin_run("sleep 2", 0);
s.finish_run(a, 0, 1);
assert_eq!(s.running(), vec![b]);
}
#[test]
fn save_and_recall_a_group() {
let mut s = WorkSession::new("t", "/home");
s.save_group("deploy", vec!["cargo build".into(), "scp target host:/srv".into()]);
assert_eq!(s.group("deploy").unwrap().lines.len(), 2);
// Guardar con el mismo nombre reemplaza.
s.save_group("deploy", vec!["echo nuevo".into()]);
assert_eq!(s.group("deploy").unwrap().lines, vec!["echo nuevo"]);
}
#[test]
fn save_recent_history_as_a_group() {
let mut s = WorkSession::new("t", "/home");
for line in ["git add .", "git commit", "git push"] {
s.begin_run(line, 0);
}
s.save_recent_as_group("publicar", 2);
// Los 2 últimos, en orden cronológico.
assert_eq!(s.group("publicar").unwrap().lines, vec!["git commit", "git push"]);
}
#[test]
fn remove_group() {
let mut s = WorkSession::new("t", "/home");
s.save_group("x", vec!["echo x".into()]);
assert!(s.remove_group("x"));
assert!(!s.remove_group("x"));
}
}
+3
View File
@@ -952,3 +952,6 @@ Crash Annotation GraphicsCriticalError: |[C0][GFX1-]: Managed to allocate after
[Child 27081, MediaDecoderStateMachine #42] WARNING: 72497533eee0 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279 [Child 27081, MediaDecoderStateMachine #42] WARNING: 72497533eee0 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279
[Child 27081, MediaDecoderStateMachine #42] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630 [Child 27081, MediaDecoderStateMachine #42] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630
[Child 27081, MediaDecoderStateMachine #42] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168 [Child 27081, MediaDecoderStateMachine #42] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168
[Child 27081, MediaDecoderStateMachine #49] WARNING: 72496a597ca0 OpenCubeb() failed to init cubeb: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/AudioStream.cpp:279
[Child 27081, MediaDecoderStateMachine #49] WARNING: Decoder=724971e98800 [OnMediaSinkAudioError]: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachine.cpp:4630
[Child 27081, MediaDecoderStateMachine #49] WARNING: Decoder=724971e98800 Decode error: NS_ERROR_DOM_MEDIA_MEDIASINK_ERR (0x806e000b) - OnMediaSinkAudioError: file /home/ubuntu/actions-runner/_work/desktop/desktop/engine/dom/media/MediaDecoderStateMachineBase.cpp:168