refactor(monorepo): reorganización lógica + renames + SDDs + split CHANGELOG

Reorganización física de crates/:
- core/ (mezclaba 6 propósitos) se divide en protocol/, init/, runtime/, compat/
- shared/ (3 crates) se redistribuye en protocol/ e init/
- lapaloma (sub-módulo de ui_engine) se promueve a modules/pineal/

Renames de proyectos:
- shipote → shuma (runtime de sandboxes)
- nouser → akasha (explorador de Mónadas)
- yahweh → nahual (motor GPUI, antes ui_engine/)
- lapaloma → pineal (data-viz agnóstica)

Fraccionamiento UI → core agnóstico:
- vista-core (DeckState + snap, 175 LOC, 5 tests verdes)
- barra-core (Task + render_html + sanitize, 90 LOC, 5 tests verdes)
- vista-web y barra-web ahora son thin DOM bindings

Documentación nueva:
- 16 SDDs por subdirectorio (≤80 LOC c/u): protocol/init/runtime/compat
  + 10 módulos + apps/
- docs/STATUS.md con cifras reales por proyecto
- docs/ROADMAP.md con plan a finalización (6 hitos, ~6-8 semanas)
- CHANGELOG.md particionado en docs/changelog/<proyecto>.md (7 buckets)

Automatización:
- scripts/reorg.py — script idempotente que: git mv directorios, renombra
  package names, recomputa path = refs, reescribe imports rust, actualiza
  workspace Cargo.toml. Soporta --dry-run.
- scripts/split-changelog.py — particiona CHANGELOG por componente.

Validación:
- cargo check --workspace pasa (124 crates + 2 nuevos cores).
- 10 tests adicionales (5 en vista-core + 5 en barra-core) verdes.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-19 14:48:34 +00:00
parent 86fb6ae20b
commit 550c98f275
375 changed files with 8512 additions and 7155 deletions
@@ -0,0 +1,478 @@
//! Flow channels: data plane sobre Unix socket por edge enriquecido.
//!
//! Cuando un splitter detecta el TypeRef de un edge, además de replicar a
//! los consumers internos del pipeline, se levanta un FlowChannel que
//! expone los bytes a subscribers externos (otros módulos del fractal).
//!
//! ## Diseño
//!
//! - `tokio::sync::broadcast::channel` para fan-out lock-less entre el
//! splitter (sender) y los N subscribers conectados.
//! - `UnixListener` accept-loop: por cada cliente nuevo, spawn una task
//! que drena el receiver y escribe al socket.
//! - Subscribers lentos pueden perder mensajes (broadcast::Receiver::Lagged)
//! — se loguea warn y se sigue. Esto es deliberado para no bloquear el
//! splitter en consumers lentos.
//!
//! ## Lifetime
//!
//! `FlowChannel` se construye con `new(path)`. Cuando se drop:
//! - El `accept_task` se cancela (vía drop del `tokio::task::JoinHandle`
//! que tenemos abort-on-drop).
//! - El socket file se borra del FS (`Drop` impl).
//!
//! Sender clones son baratos; los subscribers conectados se enteran del
//! cierre cuando todos los senders se dropean.
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;
use tokio::sync::broadcast;
use tokio::task::AbortHandle;
use tracing::{debug, warn};
/// Capacidad del broadcast channel. Si un subscriber está más de N chunks
/// atrasado, queda `Lagged` y empieza a perder mensajes.
const BROADCAST_CAP: usize = 64;
/// Chunks default del replay buffer. Cuando un cliente nuevo se conecta,
/// recibe hasta estos N chunks antes de iniciar el broadcast live.
/// Override via `FlowChannel::with_replay_cap`.
pub const DEFAULT_REPLAY_CHUNKS: usize = 32;
pub struct FlowChannel {
sender: broadcast::Sender<Arc<Vec<u8>>>,
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_caps: ReplayCaps,
socket_path: PathBuf,
meter: Arc<FlowMeter>,
_accept_handle: AbortOnDrop,
}
/// Contador de bytes y rate (bytes/s ventana 1s).
#[derive(Debug)]
pub struct FlowMeter {
/// Bytes acumulados desde la creación del FlowChannel.
total_bytes: std::sync::atomic::AtomicU64,
/// Ring buffer de (timestamp_ms, bytes_acumulados) para calcular
/// el rate sobre los últimos N samples.
rate_window: Mutex<VecDeque<(u64, u64)>>,
}
const RATE_WINDOW_SAMPLES: usize = 32;
impl FlowMeter {
fn new() -> Self {
Self {
total_bytes: std::sync::atomic::AtomicU64::new(0),
rate_window: Mutex::new(VecDeque::with_capacity(RATE_WINDOW_SAMPLES)),
}
}
fn record(&self, delta: u64) {
let now = self.total_bytes
.fetch_add(delta, std::sync::atomic::Ordering::Relaxed)
+ delta;
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if let Ok(mut w) = self.rate_window.lock() {
if w.len() >= RATE_WINDOW_SAMPLES {
w.pop_front();
}
w.push_back((ts, now));
}
}
/// Bytes totales acumulados desde la creación.
pub fn total_bytes(&self) -> u64 {
self.total_bytes.load(std::sync::atomic::Ordering::Relaxed)
}
/// Bytes por segundo (rolling sobre la ventana). 0 si no hay
/// historia suficiente o si el último sample es muy viejo (>5s).
pub fn bytes_per_sec(&self) -> f64 {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let w = match self.rate_window.lock() {
Ok(w) => w,
Err(_) => return 0.0,
};
if w.len() < 2 {
return 0.0;
}
let last = w.back().copied().unwrap();
// Si el último sample tiene >5s, asumimos idle.
if now_ms.saturating_sub(last.0) > 5000 {
return 0.0;
}
let first = w.front().copied().unwrap();
let dt_ms = last.0.saturating_sub(first.0).max(1);
let d_bytes = last.1.saturating_sub(first.1);
(d_bytes as f64 * 1000.0) / dt_ms as f64
}
}
#[derive(Debug, Clone, Copy)]
pub struct ReplayCaps {
/// Máximo de chunks retenidos.
pub chunks: usize,
/// Máximo de bytes (sumando len de chunks). `0` = sin tope.
pub bytes: usize,
}
impl ReplayCaps {
pub fn chunks_only(chunks: usize) -> Self {
Self { chunks: chunks.max(1), bytes: 0 }
}
pub fn new(chunks: usize, bytes: usize) -> Self {
Self { chunks: chunks.max(1), bytes }
}
}
#[derive(Clone)]
pub struct FlowSender {
sender: broadcast::Sender<Arc<Vec<u8>>>,
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_caps: ReplayCaps,
meter: Arc<FlowMeter>,
}
impl FlowSender {
/// Pushea al broadcast y al replay buffer. Si no hay subscribers,
/// el broadcast::send retorna Err pero igual guardamos en replay
/// (subscribers tarde verán los chunks pasados).
pub fn send(&self, data: Arc<Vec<u8>>) {
let incoming = data.len();
let caps = self.replay_caps;
if let Ok(mut g) = self.replay.lock() {
evict_for_incoming(&mut g, caps, incoming);
g.push_back(data.clone());
}
self.meter.record(incoming as u64);
let _ = self.sender.send(data);
}
}
/// Evict los chunks más viejos para hacer espacio a un chunk entrante de
/// `incoming` bytes — el buffer post-push queda dentro de los caps.
fn evict_for_incoming(buf: &mut VecDeque<Arc<Vec<u8>>>, caps: ReplayCaps, incoming: usize) {
// 1) chunks: dejar lugar para 1 más.
while buf.len() + 1 > caps.chunks {
if buf.pop_front().is_none() {
break;
}
}
// 2) bytes (si está activado).
if caps.bytes > 0 {
let mut current: usize = buf.iter().map(|a| a.len()).sum();
while current + incoming > caps.bytes {
match buf.pop_front() {
Some(c) => current = current.saturating_sub(c.len()),
None => break,
}
}
}
}
impl std::fmt::Debug for FlowChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowChannel")
.field("socket_path", &self.socket_path)
.field("subscribers", &self.sender.receiver_count())
.finish()
}
}
impl FlowChannel {
/// Crea un FlowChannel atado al path `socket_path`. Si el path ya
/// existe, lo borra antes de bind (asume restart limpio).
pub fn new(socket_path: PathBuf) -> std::io::Result<Self> {
Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(DEFAULT_REPLAY_CHUNKS))
}
pub fn with_replay_cap(socket_path: PathBuf, chunks: usize) -> std::io::Result<Self> {
Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(chunks))
}
pub fn with_replay_caps(socket_path: PathBuf, caps: ReplayCaps) -> std::io::Result<Self> {
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
if let Some(parent) = socket_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let listener = UnixListener::bind(&socket_path)?;
let (tx, _rx_unused) = broadcast::channel::<Arc<Vec<u8>>>(BROADCAST_CAP);
let replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(caps.chunks)));
let tx_for_accept = tx.clone();
let replay_for_accept = replay.clone();
let path_for_log = socket_path.clone();
let join = tokio::spawn(async move {
debug!(path = %path_for_log.display(), "flow channel listening");
loop {
let (mut stream, _addr) = match listener.accept().await {
Ok(p) => p,
Err(e) => {
warn!(?e, "flow channel accept failed");
return;
}
};
// Snapshot del replay buffer Y subscribe al broadcast.
// El orden es crítico: subscribe ANTES de drenar el replay
// para no perder chunks que llegan justo en el medio.
let mut rx = tx_for_accept.subscribe();
let snapshot: Vec<Arc<Vec<u8>>> = {
let g = replay_for_accept.lock().expect("replay lock");
g.iter().cloned().collect()
};
tokio::spawn(async move {
// Fase 1: drenar replay snapshot al subscriber.
for chunk in &snapshot {
if let Err(e) = stream.write_all(chunk).await {
debug!(?e, "flow subscriber dropped during replay");
return;
}
}
// Fase 2: live broadcast.
loop {
match rx.recv().await {
Ok(chunk) => {
if let Err(e) = stream.write_all(&chunk).await {
debug!(?e, "flow subscriber dropped");
return;
}
}
Err(broadcast::error::RecvError::Closed) => return,
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "flow subscriber lagged");
}
}
}
});
}
});
Ok(Self {
sender: tx,
replay,
replay_caps: caps,
socket_path,
meter: Arc::new(FlowMeter::new()),
_accept_handle: AbortOnDrop(join.abort_handle()),
})
}
pub fn meter(&self) -> &FlowMeter {
&self.meter
}
/// Push un chunk al channel. Si no hay subscribers, drop silencioso.
/// Siempre se guarda en el replay buffer (con cap rotation por chunks
/// y opcionalmente por bytes).
pub fn send(&self, data: Vec<u8>) {
let incoming = data.len();
let arc = Arc::new(data);
let caps = self.replay_caps;
if let Ok(mut g) = self.replay.lock() {
evict_for_incoming(&mut g, caps, incoming);
g.push_back(arc.clone());
}
self.meter.record(incoming as u64);
let _ = self.sender.send(arc);
}
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
/// Handle clone-able para que tasks externas (splitter) pushen al
/// channel sin tener ownership del FlowChannel. Cada push se guarda
/// también en el replay buffer y se contabiliza en el meter.
pub fn sender_handle(&self) -> FlowSender {
FlowSender {
sender: self.sender.clone(),
replay: self.replay.clone(),
replay_caps: self.replay_caps,
meter: self.meter.clone(),
}
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl Drop for FlowChannel {
fn drop(&mut self) {
// El AbortOnDrop cancela el accept loop; sólo nos queda limpiar el
// socket file.
let _ = std::fs::remove_file(&self.socket_path);
}
}
struct AbortOnDrop(AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// Path canónico para un flow channel: `$XDG_RUNTIME_DIR/shuma-flow-<id>.sock`.
pub fn default_flow_socket_path(id: &str) -> PathBuf {
let base = std::env::var("XDG_RUNTIME_DIR").unwrap_or_else(|_| {
let uid = nix::unistd::getuid().as_raw();
let p = format!("/run/user/{uid}");
if std::path::Path::new(&p).exists() {
p
} else {
"/tmp".into()
}
});
PathBuf::from(base).join(format!("shuma-flow-{id}.sock"))
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
use tokio::net::UnixStream;
#[tokio::test]
async fn channel_delivers_to_subscriber() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
let ch = FlowChannel::new(path.clone()).unwrap();
// Subscriber se conecta.
let path_clone = path.clone();
let task = tokio::spawn(async move {
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
let mut buf = vec![0u8; 64];
let n = stream.read(&mut buf).await.unwrap();
buf.truncate(n);
buf
});
// Damos tiempo al accept.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Hasta que haya 1 receiver_count, el send no llega.
for _ in 0..50 {
if ch.subscriber_count() >= 1 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
ch.send(b"hello-flow".to_vec());
let received = tokio::time::timeout(std::time::Duration::from_secs(2), task)
.await
.expect("timeout")
.unwrap();
assert_eq!(received, b"hello-flow");
}
#[tokio::test]
async fn replay_buffer_serves_late_subscriber() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
let ch = FlowChannel::new(path.clone()).unwrap();
// Pushes ANTES de cualquier subscriber: van solo al replay.
ch.send(b"chunk-1".to_vec());
ch.send(b"chunk-2".to_vec());
ch.send(b"chunk-3".to_vec());
// Subscriber LATE — debe recibir los 3 chunks del replay.
let path_clone = path.clone();
let task = tokio::spawn(async move {
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
let mut buf = vec![0u8; 256];
// Leemos hasta recibir los 3 chunks (21 bytes esperados).
let mut total = Vec::new();
for _ in 0..20 {
let n = stream.read(&mut buf).await.unwrap();
if n == 0 {
break;
}
total.extend_from_slice(&buf[..n]);
if total.len() >= 21 {
break;
}
}
total
});
let received = tokio::time::timeout(std::time::Duration::from_secs(2), task)
.await
.expect("timeout")
.unwrap();
let s = String::from_utf8_lossy(&received);
assert!(s.contains("chunk-1"), "got: {s:?}");
assert!(s.contains("chunk-2"), "got: {s:?}");
assert!(s.contains("chunk-3"), "got: {s:?}");
}
#[tokio::test]
async fn replay_evicts_by_bytes_cap() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
// chunks=100 (no limita), bytes=20: deberíamos retener sólo los
// últimos chunks cuyos bytes sumen ≤ 20.
let ch = FlowChannel::with_replay_caps(path.clone(), ReplayCaps::new(100, 20)).unwrap();
ch.send(b"AAAAAAAA".to_vec()); // 8 bytes
ch.send(b"BBBBBBBB".to_vec()); // 8 → total 16
ch.send(b"CCCCCCCC".to_vec()); // 8 → total 24 > 20, evict A → 16
ch.send(b"DDDDDDDD".to_vec()); // 8 → total 24 > 20, evict B → 16
let path_clone = path.clone();
let task = tokio::spawn(async move {
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
let mut buf = vec![0u8; 64];
let mut total = Vec::new();
for _ in 0..20 {
let n = stream.read(&mut buf).await.unwrap();
if n == 0 {
break;
}
total.extend_from_slice(&buf[..n]);
if total.len() >= 16 {
break;
}
}
total
});
let got = tokio::time::timeout(std::time::Duration::from_secs(2), task)
.await
.expect("timeout")
.unwrap();
let s = String::from_utf8_lossy(&got);
// Sólo C y D (los más viejos A y B fueron evicted).
assert!(!s.contains("AAAA"), "should have evicted A: {s:?}");
assert!(!s.contains("BBBB"), "should have evicted B: {s:?}");
assert!(s.contains("CCCC"), "should keep C: {s:?}");
assert!(s.contains("DDDD"), "should keep D: {s:?}");
}
#[tokio::test]
async fn drop_removes_socket() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
{
let _ch = FlowChannel::new(path.clone()).unwrap();
assert!(path.exists());
}
// Después del drop, el socket file no debe quedar.
// Damos un pelín de tiempo al runtime para que el drop corra
// mientras estamos en task.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(!path.exists());
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,122 @@
//! Ring buffer en memoria para capturar stdout/stderr de comandos.
//!
//! Tamaño fijo por comando (config: `MAX_LOG_BYTES`). Cuando se llena,
//! descarta los bytes más viejos. Pensado para diagnostico rápido, no
//! para retención histórica — eso es trabajo de un journald-like aparte.
use std::sync::{Arc, Mutex};
/// Bytes máximos retenidos por comando. 64 KiB cubre logs típicos sin
/// abusar de memoria si el daemon tiene cientos de comandos vivos.
pub const MAX_LOG_BYTES: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub struct LogBuf {
inner: Arc<Mutex<Inner>>,
}
#[derive(Debug)]
struct Inner {
/// Bytes raw. Cuando se acerca al cap, descartamos head para mantener
/// el tail.
buf: Vec<u8>,
cap: usize,
/// Total escrito alguna vez (no decrementado al recortar).
written_total: u64,
}
impl LogBuf {
pub fn new() -> Self {
Self::with_cap(MAX_LOG_BYTES)
}
pub fn with_cap(cap: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner {
buf: Vec::with_capacity(cap.min(4096)),
cap,
written_total: 0,
})),
}
}
pub fn append(&self, data: &[u8]) {
let Ok(mut g) = self.inner.lock() else { return };
g.written_total += data.len() as u64;
g.buf.extend_from_slice(data);
// Recorte cuando excede cap (con un pequeño slack para evitar
// shift en cada append). El usuario ve sólo el tail.
if g.buf.len() > g.cap + 1024 {
let drop = g.buf.len() - g.cap;
g.buf.drain(..drop);
}
}
/// Devuelve el tail de hasta `n` bytes (o todo si `n=0`).
pub fn tail(&self, n: usize) -> Vec<u8> {
let g = match self.inner.lock() {
Ok(g) => g,
Err(_) => return Vec::new(),
};
if n == 0 || n >= g.buf.len() {
return g.buf.clone();
}
g.buf[g.buf.len() - n..].to_vec()
}
/// Cuántos bytes hay actualmente en el buffer.
pub fn len(&self) -> usize {
self.inner.lock().map(|g| g.buf.len()).unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn written_total(&self) -> u64 {
self.inner.lock().map(|g| g.written_total).unwrap_or(0)
}
}
impl Default for LogBuf {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_tail_basic() {
let lb = LogBuf::with_cap(100);
lb.append(b"hello ");
lb.append(b"world\n");
let t = lb.tail(0);
assert_eq!(t, b"hello world\n");
}
#[test]
fn cap_drops_oldest() {
let lb = LogBuf::with_cap(10);
lb.append(&[b'a'; 8]);
lb.append(&[b'b'; 8]);
// Después del recorte, debe quedar ~10 bytes pero el slack
// permite hasta 10+1024. Como pasamos slack, no se recorta aún
// en este caso (16 bytes < 10+1024). Forzamos un append grande.
lb.append(&[b'c'; 2048]);
assert!(lb.len() <= 10 + 1024);
let t = lb.tail(0);
// El tail debe contener 'c's (los más recientes).
assert!(t.iter().filter(|&&b| b == b'c').count() > 0);
}
#[test]
fn written_total_tracks_all() {
let lb = LogBuf::with_cap(10);
lb.append(b"abcdef");
lb.append(b"ghijkl");
assert_eq!(lb.written_total(), 12);
}
}
@@ -0,0 +1,383 @@
//! Persistencia del estado del WorkspaceManager.
//!
//! v1: sólo `WorkspaceSpec`s vivos. Los comandos (PIDs) NO se persisten —
//! el kernel los mata al cerrar el daemon. Sólo la *intención declarada*
//! (Workspaces creados con su spec) sobrevive a un reboot del daemon.
use crate::WorkspaceManager;
use serde::{Deserialize, Serialize};
use shuma_card::{PipelineSpec, WorkspaceId, WorkspaceSpec};
use std::path::{Path, PathBuf};
use tracing::{info, warn};
/// v2 agregó `saved_pipelines`. v3 agrega `live_pipelines`. v4 agrega
/// `stats_history` por workspace (sparkline survives daemon restart).
/// Versiones inferiores leen campos ausentes como vacío.
pub const SNAPSHOT_VERSION: u16 = 4;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShipoteSnapshot {
pub version: u16,
pub timestamp_ms: u64,
pub workspaces: Vec<WorkspaceEntry>,
#[serde(default)]
pub saved_pipelines: Vec<PipelineEntry>,
/// Pipelines vivos con supervisor (`restart_on_failure=true`) al
/// momento del snapshot. El daemon los relanza al restore.
#[serde(default)]
pub live_pipelines: Vec<LivePipelineEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceEntry {
pub id: WorkspaceId,
pub spec: WorkspaceSpec,
/// Stats history persistida — cap reasonable para no inflar el JSON.
/// Sólo se guardan campos serializables (no Instant).
#[serde(default)]
pub stats_history: Vec<PersistedStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedStats {
pub commands_alive: u32,
pub commands_total: u32,
pub rss_bytes: Option<u64>,
pub rss_peak_bytes: Option<u64>,
pub cpu_usec: Option<u64>,
pub cpu_percent: Option<f32>,
pub cpu_cores: u32,
pub uptime_ms: u64,
}
impl From<&crate::stats::WorkspaceStats> for PersistedStats {
fn from(s: &crate::stats::WorkspaceStats) -> Self {
Self {
commands_alive: s.commands_alive,
commands_total: s.commands_total,
rss_bytes: s.rss_bytes,
rss_peak_bytes: s.rss_peak_bytes,
cpu_usec: s.cpu_usec,
cpu_percent: s.cpu_percent,
cpu_cores: s.cpu_cores,
uptime_ms: s.uptime_ms,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineEntry {
pub name: String,
pub spec: PipelineSpec,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LivePipelineEntry {
pub workspace: WorkspaceId,
pub spec: PipelineSpec,
pub tap: bool,
}
impl ShipoteSnapshot {
pub fn write(&self, path: &Path) -> anyhow::Result<()> {
let bytes = serde_json::to_vec_pretty(self)?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
let tmp = path.with_extension("tmp");
std::fs::write(&tmp, &bytes)?;
std::fs::rename(&tmp, path)?;
Ok(())
}
pub fn read(path: &Path) -> anyhow::Result<Self> {
let bytes = std::fs::read(path)?;
let snap: ShipoteSnapshot = serde_json::from_slice(&bytes)?;
// v1 y v2 son compatibles forward (v1 sin saved_pipelines lee como vec vacío).
if snap.version > SNAPSHOT_VERSION {
anyhow::bail!(
"snapshot version {} no soportada (esperada ≤ {})",
snap.version,
SNAPSHOT_VERSION
);
}
Ok(snap)
}
}
/// Path canónico del snapshot: `$XDG_STATE_HOME/shuma/state.json`,
/// fallback `$HOME/.local/state/shuma/state.json`,
/// fallback `/tmp/shuma-state-$UID.json`.
pub fn default_snapshot_path() -> PathBuf {
if let Ok(state) = std::env::var("XDG_STATE_HOME") {
return PathBuf::from(state).join("shuma/state.json");
}
if let Ok(home) = std::env::var("HOME") {
return PathBuf::from(home).join(".local/state/shuma/state.json");
}
let uid = nix::unistd::getuid().as_raw();
PathBuf::from(format!("/tmp/shuma-state-{uid}.json"))
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
impl WorkspaceManager {
/// Toma snapshot del estado actual.
pub async fn snapshot(&self) -> ShipoteSnapshot {
const PERSIST_STATS_CAP: usize = 16;
let g = self.inner.lock().await;
let workspaces = g
.workspaces
.iter()
.map(|(id, ws)| {
// Persist sólo los últimos N samples — el resto crece
// y el JSON se infla.
let take = ws.stats_history.len().min(PERSIST_STATS_CAP);
let skip = ws.stats_history.len() - take;
let stats_history: Vec<PersistedStats> = ws
.stats_history
.iter()
.skip(skip)
.map(PersistedStats::from)
.collect();
WorkspaceEntry {
id: *id,
spec: ws.spec.clone(),
stats_history,
}
})
.collect();
let saved_pipelines = g
.saved_pipelines
.iter()
.map(|(name, spec)| PipelineEntry {
name: name.clone(),
spec: spec.clone(),
})
.collect();
// Pipelines vivos con supervisor — preserva la intención. Los
// pids/sockets/discernments son ephemeral y se regeneran al
// restore (relaunch desde cero).
let live_pipelines = g
.pipeline_supervisors
.values()
.map(|sup| LivePipelineEntry {
workspace: sup.workspace,
spec: sup.spec.clone(),
tap: sup.tap,
})
.collect();
ShipoteSnapshot {
version: SNAPSHOT_VERSION,
timestamp_ms: now_ms(),
workspaces,
saved_pipelines,
live_pipelines,
}
}
/// Escribe snapshot a disco. Si `is_dirty()` es false **y** el path
/// existe (snapshot previo válido), skip la escritura.
pub async fn save_snapshot(&self, path: &Path) -> anyhow::Result<()> {
if !self.is_dirty() && path.exists() {
info!(path = %path.display(), "snapshot SKIPPED (clean)");
return Ok(());
}
let snap = self.snapshot().await;
snap.write(path)?;
// Clear dirty: lo que está en disco es el current state.
self.dirty
.store(false, std::sync::atomic::Ordering::Relaxed);
info!(path = %path.display(), workspaces = snap.workspaces.len(), "snapshot saved");
Ok(())
}
/// Carga snapshot desde disco y restaura los Workspaces + saved
/// pipelines. Devuelve los `live_pipelines` para que el caller
/// (daemon) los relance — no podemos relanzarlos desde acá porque
/// `run_pipeline` necesita `Incarnator` + `DiscernPipeline`.
/// Errores no-fatales (workspaces inválidos) se loguean y se saltan.
pub async fn restore_snapshot(
self: &std::sync::Arc<Self>,
path: &Path,
) -> anyhow::Result<RestoreOutcome> {
let snap = match ShipoteSnapshot::read(path) {
Ok(s) => s,
Err(e) => {
warn!(?e, path = %path.display(), "no snapshot — start fresh");
return Ok(RestoreOutcome::default());
}
};
let mut out = RestoreOutcome::default();
for entry in snap.workspaces {
// v2+: reusamos el id original así clients que tracking
// workspace_id no se rompen al restart.
let label = entry.spec.label.clone();
let id = entry.id;
let history = entry.stats_history;
match self.create_with_id(id, entry.spec).await {
Ok(_) => {
out.workspaces_restored += 1;
// Hidratar history persistida. Convertimos
// PersistedStats → WorkspaceStats (perdemos
// los campos no serializables como `source`).
if !history.is_empty() {
let mut g = self.inner.lock().await;
if let Some(ws) = g.workspaces.get_mut(&id) {
for ps in history {
ws.stats_history.push_back(crate::stats::WorkspaceStats {
commands_alive: ps.commands_alive,
commands_total: ps.commands_total,
rss_bytes: ps.rss_bytes,
rss_peak_bytes: ps.rss_peak_bytes,
cpu_usec: ps.cpu_usec,
cpu_percent: ps.cpu_percent,
cpu_cores: ps.cpu_cores,
source: "persisted".into(),
uptime_ms: ps.uptime_ms,
});
}
}
}
}
Err(e) => warn!(?e, %label, "skipped workspace en restore"),
}
}
for entry in snap.saved_pipelines {
self.save_pipeline(entry.name, entry.spec).await;
out.saved_pipelines_restored += 1;
}
out.live_pipelines = snap.live_pipelines;
// Restore no cuenta como mutación — lo que está en disco es lo
// que acabamos de cargar. Sin esto, el próximo SIGTERM siempre
// re-escribiría aunque no hubiese cambios reales.
self.dirty
.store(false, std::sync::atomic::Ordering::Relaxed);
info!(
workspaces = out.workspaces_restored,
saved_pipelines = out.saved_pipelines_restored,
live_pipelines = out.live_pipelines.len(),
"snapshot restored"
);
Ok(out)
}
}
/// Lo que el caller del restore obtiene. Las `live_pipelines` requieren
/// `Incarnator + DiscernPipeline` para relanzarlas → el caller las
/// procesa (típicamente el daemon).
#[derive(Debug, Default)]
pub struct RestoreOutcome {
pub workspaces_restored: usize,
pub saved_pipelines_restored: usize,
pub live_pipelines: Vec<LivePipelineEntry>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::WorkspaceManager;
use ente_incarnate::IncarnatorConfig;
use shuma_card::{ExitPolicy, WorkspaceSpec};
use std::sync::Arc;
fn sample_ws(label: &str) -> WorkspaceSpec {
WorkspaceSpec {
label: label.into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: ExitPolicy::Reap,
quota_enforce: Default::default(),
}
}
#[tokio::test]
async fn roundtrip_snapshot_preserves_ulids() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("state.json");
let mgr1 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let (id1, _) = mgr1.create(sample_ws("a")).await.unwrap();
let (id2, _) = mgr1.create(sample_ws("b")).await.unwrap();
mgr1.save_snapshot(&path).await.unwrap();
let mgr2 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let out = mgr2.restore_snapshot(&path).await.unwrap();
assert_eq!(out.workspaces_restored, 2);
let listed = mgr2.list().await;
let restored_ids: std::collections::HashSet<_> = listed.iter().map(|s| s.id).collect();
assert!(restored_ids.contains(&id1));
assert!(restored_ids.contains(&id2));
}
#[tokio::test]
async fn save_snapshot_skips_when_clean() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("state.json");
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let _ = mgr.create(sample_ws("dirty-test")).await.unwrap();
assert!(mgr.is_dirty(), "create debería marcar dirty");
mgr.save_snapshot(&path).await.unwrap();
assert!(!mgr.is_dirty(), "save_snapshot debería limpiar dirty");
let mtime1 = std::fs::metadata(&path).unwrap().modified().unwrap();
// Esperamos un pelín para que mtime cambie si fuera re-escrito.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
// Segundo save sin mutación → skip.
mgr.save_snapshot(&path).await.unwrap();
let mtime2 = std::fs::metadata(&path).unwrap().modified().unwrap();
assert_eq!(mtime1, mtime2, "skip cuando clean — mtime no cambia");
}
#[tokio::test]
async fn snapshot_includes_saved_pipelines() {
use shuma_card::{CommandRef, DiscernPolicy, PipelineSpec};
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("state.json");
let mgr1 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let (ws_id, _) = mgr1.create(sample_ws("ws")).await.unwrap();
let spec = PipelineSpec {
label: "echo-cat".into(),
workspace: ws_id,
nodes: vec![CommandRef {
label: "n1".into(),
payload: brahman_card::Payload::Native {
exec: "/bin/echo".into(),
argv: vec!["hi".into()],
envp: vec![],
},
soma: Default::default(),
flows: Default::default(),
supervision: brahman_card::Supervision::OneShot,
}],
edges: vec![],
discern: DiscernPolicy::default(),
restart_on_failure: false,
restart_backoff_ms: 200,
restart_max_backoff_ms: 30_000,
restart_max: 0,
};
mgr1.save_pipeline("daily".into(), spec).await;
mgr1.save_snapshot(&path).await.unwrap();
let mgr2 = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
mgr2.restore_snapshot(&path).await.unwrap();
let saved = mgr2.list_saved_pipelines().await;
assert_eq!(saved, vec!["daily".to_string()]);
let got = mgr2.get_saved_pipeline("daily").await.expect("saved");
assert_eq!(got.label, "echo-cat");
}
#[test]
fn default_path_ends_with_state_json() {
let p = default_snapshot_path();
assert!(p.to_string_lossy().ends_with("state.json"));
}
}
@@ -0,0 +1,808 @@
//! Pipeline runtime: encadena nodos con pipes y opcionalmente intercepta
//! cada flow para discernir su contenido.
//!
//! Cada nodo se encarna via [`ente_incarnate::Incarnator`] — eso significa
//! que **cada comando puede tener su propio SomaSpec** (namespaces, cgroup,
//! rlimits) heredado del workspace. La conexión stdin↔stdout se hace con
//! `pipe2(2)` + `ChildStdio` declarativo: el callback de clone(2) hace los
//! `dup2` pre-execve sin romper la regla async-signal-safe.
use crate::CoreError;
use brahman_card::Payload;
use ente_incarnate::{ChildStdio, Incarnator};
use nix::fcntl::OFlag;
use nix::unistd::pipe2;
use shuma_card::PipelineSpec;
use shuma_discern::{DiscernPipeline, Discernment, Hint};
use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tracing::{debug, info, warn};
use ulid::Ulid;
/// Resultado de lanzar un pipeline.
#[derive(Debug)]
pub struct PipelineLaunch {
pub pipeline: Ulid,
pub command_pids: Vec<(String, i32)>,
/// Discernments por edge, en el mismo orden que `spec.edges`.
pub edge_discernments: Vec<EdgeDiscernment>,
}
#[derive(Debug, Clone)]
pub struct EdgeDiscernment {
pub from_label: String,
pub from_output: String,
pub to_label: String,
pub to_input: String,
pub discernment: Option<Discernment>,
/// Path del Unix socket donde otros módulos pueden suscribirse al
/// stream replicado por este edge. `None` cuando tap=false (no hay
/// data plane porque no hay sampling).
pub flow_socket: Option<std::path::PathBuf>,
}
/// Lanza un pipeline conectando nodos por stdin/stdout. Cada nodo se
/// encarna via `Incarnator` (con o sin namespacing según su SomaSpec).
///
/// Soporta:
/// - Pipeline lineal (1 producer → 1 consumer).
/// - **Fan-out** (1 producer → N consumers): shuma interpone un
/// splitter que duplica bytes a cada destino. Cuando `tap=true`, el
/// splitter además samplea para discernir.
/// - Múltiples predecessors por nodo NO se soporta aún (fan-in): sólo se
/// honra el primer edge entrante.
pub async fn run_pipeline(
spec: &PipelineSpec,
workspace_label: &str,
tap: bool,
discerner: Arc<DiscernPipeline>,
incarnator: Arc<Incarnator>,
manager: Option<Arc<crate::WorkspaceManager>>,
) -> Result<PipelineLaunch, CoreError> {
spec.validate()?;
let n = spec.nodes.len();
info!(
nodes = n,
edges = spec.edges.len(),
tap,
"launching pipeline (incarnated)"
);
// Pre-compute grafo:
// - `consumers[i]` = índices de edges salientes de `i`.
// - `predecessors[j]` = índices de edges entrantes a `j`.
let mut consumers: Vec<Vec<usize>> = vec![Vec::new(); n];
let mut predecessors: Vec<Vec<usize>> = vec![Vec::new(); n];
for (idx, e) in spec.edges.iter().enumerate() {
consumers[e.from].push(idx);
predecessors[e.to].push(idx);
}
// Por cada edge: par (r_to_consumer, w_from_producer_side).
// El consumer recibe r_to_consumer; el producer escribe a w_from_producer_side
// (directa o vía splitter).
let mut edge_r: Vec<RawFd> = vec![-1; spec.edges.len()];
let mut edge_w: Vec<RawFd> = vec![-1; spec.edges.len()];
for i in 0..spec.edges.len() {
let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
edge_r[i] = r.into_raw_fd();
edge_w[i] = w.into_raw_fd();
}
let mut consumer_stdin_fd: Vec<Option<RawFd>> = vec![None; n];
let mut producer_stdout_fd: Vec<Option<RawFd>> = vec![None; n];
let mut splitter_specs: Vec<SplitterSpec> = Vec::new();
let mut merger_specs: Vec<MergerSpec> = Vec::new();
// Stdout del producer: directo a edge_w[único] si tiene 1 consumer y NO tap;
// sino, pipe propio que va al splitter task.
for i in 0..n {
if consumers[i].is_empty() {
continue;
}
if consumers[i].len() == 1 && !tap {
producer_stdout_fd[i] = Some(edge_w[consumers[i][0]]);
continue;
}
// Splitter: pipe propio para el productor → splitter lee y replica a edge_w[*].
let (prod_r, prod_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
producer_stdout_fd[i] = Some(prod_w.into_raw_fd());
let prod_r_fd = prod_r.into_raw_fd();
let mut consumer_writes: Vec<RawFd> = Vec::with_capacity(consumers[i].len());
let mut edge_meta: Vec<EdgeMeta> = Vec::with_capacity(consumers[i].len());
for edge_idx in &consumers[i] {
let edge = &spec.edges[*edge_idx];
consumer_writes.push(edge_w[*edge_idx]);
edge_meta.push(EdgeMeta {
from_label: spec.nodes[edge.from].label.clone(),
from_output: edge.from_output.clone(),
to_label: spec.nodes[edge.to].label.clone(),
to_input: edge.to_input.clone(),
});
}
splitter_specs.push(SplitterSpec {
producer_r_fd: prod_r_fd,
consumer_w_fds: consumer_writes,
edges: edge_meta,
tap,
sample_bytes: spec.discern.sample_bytes,
max_bytes_per_sec: spec.discern.max_bytes_per_sec,
});
}
// Stdin del consumer: edge_r[único] si tiene 1 predecessor; sino, merger.
for j in 0..n {
match predecessors[j].len() {
0 => {}
1 => {
consumer_stdin_fd[j] = Some(edge_r[predecessors[j][0]]);
}
_ => {
// Merger: lee de N edge_r y escribe a un nuevo pipe cuyo
// read end es el stdin del consumer.
let (cons_r, cons_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
consumer_stdin_fd[j] = Some(cons_r.into_raw_fd());
let inputs: Vec<RawFd> = predecessors[j]
.iter()
.map(|eidx| edge_r[*eidx])
.collect();
merger_specs.push(MergerSpec {
producer_r_fds: inputs,
consumer_w_fd: cons_w.into_raw_fd(),
});
}
}
}
// Encarnamos cada nodo con su stdin/stdout fd asignado.
let mut pids = Vec::with_capacity(n);
for (i, node) in spec.nodes.iter().enumerate() {
match &node.payload {
Payload::Native { .. } | Payload::Legacy { .. } => {}
_ => {
return Err(CoreError::Incarnate(
ente_incarnate::IncarnateError::NonExecutablePayload,
))
}
}
let card = node.to_card(i, workspace_label)?;
let stdio = ChildStdio {
stdin_fd: consumer_stdin_fd[i],
stdout_fd: producer_stdout_fd[i],
stderr_fd: None,
};
let outcome = incarnator
.incarnate_with(&card, stdio)
.map_err(CoreError::Incarnate)?;
let pid = outcome.pid;
pids.push((node.label.clone(), pid.as_raw()));
debug!(label = %node.label, pid = pid.as_raw(), "node incarnated");
}
let pipeline_id_for_flows = Ulid::new();
// Si tap=true, creamos un FlowChannel por edge para el data plane.
// Cada splitter pushea al sender del channel correspondiente.
let pipeline_id = pipeline_id_for_flows;
let mut flow_channels: Vec<crate::flow_channel::FlowChannel> = Vec::new();
let mut splitter_channels: Vec<Vec<Option<crate::flow_channel::FlowSender>>> =
Vec::with_capacity(splitter_specs.len());
let mut edge_socket_for_splitter: Vec<Vec<Option<std::path::PathBuf>>> = Vec::new();
for s in &splitter_specs {
let mut senders_per_edge = Vec::with_capacity(s.edges.len());
let mut paths_per_edge = Vec::with_capacity(s.edges.len());
for (i, _em) in s.edges.iter().enumerate() {
if !s.tap {
senders_per_edge.push(None);
paths_per_edge.push(None);
continue;
}
// Socket name = pipeline_id full (26 chars ULID) + edge_idx.
// ULID es único globalmente → cero colisiones entre runs.
// Edge_idx desambigua múltiples sockets del mismo pipeline.
// No incluimos from_label en el name (puede tener chars que
// no van en paths Unix — los hints van en `EdgeDiscernment`).
let id = format!("{}-{}", pipeline_id, i);
let mut socket = crate::flow_channel::default_flow_socket_path(&id);
// Fallback: si el path existe (raro — daemon crashed sin
// cleanup), agregar suffix numérico hasta encontrar libre.
let mut suffix = 1u32;
while socket.exists() {
let alt = format!("{id}-{suffix}");
socket = crate::flow_channel::default_flow_socket_path(&alt);
suffix += 1;
if suffix > 1000 {
warn!(orig = id, "flow socket collision: 1000 retries — using as-is");
break;
}
}
match crate::flow_channel::FlowChannel::with_replay_caps(
socket.clone(),
crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes),
) {
Ok(fc) => {
senders_per_edge.push(Some(fc.sender_handle()));
paths_per_edge.push(Some(socket));
flow_channels.push(fc);
}
Err(e) => {
warn!(?e, "flow channel new failed");
senders_per_edge.push(None);
paths_per_edge.push(None);
}
}
}
splitter_channels.push(senders_per_edge);
edge_socket_for_splitter.push(paths_per_edge);
}
// Registramos los flow_channels en el manager AHORA, antes de await
// las tasks. Esto permite que clientes externos hagan `flow list` y
// se suscriban mientras el pipeline aún produce data.
if let Some(mgr) = &manager {
if !flow_channels.is_empty() {
let drained: Vec<crate::flow_channel::FlowChannel> = flow_channels.drain(..).collect();
mgr.retain_pipeline_flows(pipeline_id, drained).await;
}
}
// Spawn mergers + splitters después del incarnate. Cada task posee
// sus fds y los cierra al terminar (via Drop de OwnedFd).
let mut merger_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for m in merger_specs {
merger_handles.push(spawn_merger(m));
}
let mut tap_handles: Vec<SplitterHandle> = Vec::new();
for (s, senders) in splitter_specs.into_iter().zip(splitter_channels.into_iter()) {
tap_handles.push(spawn_splitter(s, discerner.clone(), senders));
}
let mut edge_discernments = Vec::new();
for (h, paths) in tap_handles.into_iter().zip(edge_socket_for_splitter.into_iter()) {
match h.handle.await {
Ok(eds) => {
for (mut ed, path) in eds.into_iter().zip(paths.into_iter()) {
ed.flow_socket = path;
edge_discernments.push(ed);
}
}
Err(e) => warn!(?e, "splitter handle joined with error"),
}
}
for h in merger_handles {
if let Err(e) = h.await {
warn!(?e, "merger handle joined with error");
}
}
Ok(PipelineLaunch {
pipeline: pipeline_id,
command_pids: pids,
edge_discernments,
})
}
#[allow(dead_code)]
fn short_ulid(u: &Ulid) -> String {
let s = u.to_string();
s[s.len() - 6..].to_string()
}
#[derive(Debug, Clone)]
struct EdgeMeta {
from_label: String,
from_output: String,
to_label: String,
to_input: String,
}
struct SplitterSpec {
producer_r_fd: RawFd,
consumer_w_fds: Vec<RawFd>,
edges: Vec<EdgeMeta>,
tap: bool,
sample_bytes: usize,
/// Rate-limit en bytes/s (0 = sin limit). Tras cada chunk de `n`
/// bytes, splitter sleeps `n / max_bytes_per_sec` segundos.
max_bytes_per_sec: u64,
}
struct SplitterHandle {
handle: tokio::task::JoinHandle<Vec<EdgeDiscernment>>,
}
struct MergerSpec {
producer_r_fds: Vec<RawFd>,
consumer_w_fd: RawFd,
}
fn spawn_merger(spec: MergerSpec) -> tokio::task::JoinHandle<()> {
for fd in &spec.producer_r_fds {
set_nonblocking(*fd);
}
set_nonblocking(spec.consumer_w_fd);
// Patrón: una task lectora por cada producer reenvía bytes a un mpsc.
// El merger principal consume del mpsc y escribe al consumer.
// Esto evita el "block en reader idle" del enfoque round-robin sobre
// AsyncFd::ready() (los readers idle nunca dejan turno).
tokio::spawn(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(32);
let nr = spec.producer_r_fds.len();
for fd in spec.producer_r_fds {
let tx = tx.clone();
tokio::spawn(async move {
// SAFETY: ownership transferida.
let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) };
let r = match AsyncFd::with_interest(owned, Interest::READABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "merger reader AsyncFd");
return;
}
};
let mut buf = [0u8; 4096];
loop {
match async_read(&r, &mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
Err(_) => break,
}
}
// Drop de tx → cuando todos los readers cerraron, el rx
// recibe None y el merger termina.
});
}
drop(tx); // sólo los reader tasks tienen sus clones ahora.
// SAFETY: ownership transferida al task.
let w_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.consumer_w_fd) };
let w = match AsyncFd::with_interest(w_owned, Interest::WRITABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "merger AsyncFd w");
return;
}
};
let mut total: u64 = 0;
while let Some(chunk) = rx.recv().await {
if async_write_all(&w, &chunk).await.is_err() {
return;
}
total += chunk.len() as u64;
}
debug!(bytes = total, readers = nr, "merger finished");
})
}
fn spawn_splitter(
spec: SplitterSpec,
discerner: Arc<DiscernPipeline>,
edge_senders: Vec<Option<crate::flow_channel::FlowSender>>,
) -> SplitterHandle {
set_nonblocking(spec.producer_r_fd);
for fd in &spec.consumer_w_fds {
set_nonblocking(*fd);
}
let handle = tokio::spawn(async move {
// SAFETY: ownership transferida al task.
let r_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.producer_r_fd) };
let r = match AsyncFd::with_interest(r_owned, Interest::READABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "splitter AsyncFd r");
return Vec::new();
}
};
let mut writers: Vec<AsyncFd<std::os::fd::OwnedFd>> = Vec::with_capacity(spec.consumer_w_fds.len());
for fd in spec.consumer_w_fds {
let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) };
match AsyncFd::with_interest(owned, Interest::WRITABLE) {
Ok(a) => writers.push(a),
Err(e) => warn!(?e, "splitter AsyncFd w"),
}
}
let mut sample: Vec<u8> = Vec::with_capacity(spec.sample_bytes);
let mut buf = [0u8; 4096];
let mut total: u64 = 0;
let mut eof = false;
let mut bucket = if spec.max_bytes_per_sec > 0 {
Some(TokenBucket::new(spec.max_bytes_per_sec))
} else {
None
};
// Fase 1: sampling (sólo si tap=true) + replicación.
while !eof && (spec.tap && sample.len() < spec.sample_bytes) {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
Ok(n) => n,
Err(e) => { warn!(?e, "splitter read"); break; }
};
if n == 0 { break; }
if spec.tap {
let take = n.min(spec.sample_bytes - sample.len());
sample.extend_from_slice(&buf[..take]);
}
// Token bucket: reserva ANTES de broadcast — si hay debt,
// sleep antes de mandar al subscriber.
if let Some(b) = bucket.as_mut() {
let wait = b.reserve(n as u64);
if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
}
let d = if spec.tap {
discerner.discern(&sample, &Hint { path: None, size_total: None })
} else {
None
};
// Fase 2: replicación pura.
while !eof {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
Ok(n) => n,
Err(_) => break,
};
if n == 0 { break; }
if let Some(b) = bucket.as_mut() {
let wait = b.reserve(n as u64);
if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
}
debug!(bytes = total, consumers = writers.len(), "splitter finished");
// Mismo discernment para todos los edges del splitter (es el mismo
// stream replicado). Devolvemos N entries (una por edge) para que
// la UI/CLI los liste todos. flow_socket lo rellena el caller.
spec.edges
.into_iter()
.map(|em| EdgeDiscernment {
from_label: em.from_label,
from_output: em.from_output,
to_label: em.to_label,
to_input: em.to_input,
discernment: d.clone(),
flow_socket: None,
})
.collect()
});
SplitterHandle { handle }
}
/// Token-bucket real con capacidad de burst.
/// - `rate_bps`: tokens (bytes) por segundo de refill.
/// - `capacity`: máx tokens acumulables. Default = 1 segundo de rate.
/// - `tokens`: tokens disponibles (puede negativos para "debt").
/// - `last_refill`: para calcular cuántos refill desde la última call.
struct TokenBucket {
rate_bps: u64,
capacity: u64,
tokens: f64,
last_refill: std::time::Instant,
}
impl TokenBucket {
fn new(rate_bps: u64) -> Self {
Self {
rate_bps,
capacity: rate_bps, // 1 second worth of burst.
tokens: rate_bps as f64,
last_refill: std::time::Instant::now(),
}
}
/// Refill desde la última call según wall time. Reserva `cost`
/// tokens; si no alcanza, retorna el sleep necesario.
fn reserve(&mut self, cost: u64) -> std::time::Duration {
let now = std::time::Instant::now();
let elapsed_secs = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed_secs * self.rate_bps as f64)
.min(self.capacity as f64);
self.last_refill = now;
self.tokens -= cost as f64;
if self.tokens >= 0.0 {
std::time::Duration::ZERO
} else {
// Debt: tiempo para recuperar a 0 tokens.
let secs_needed = -self.tokens / self.rate_bps as f64;
std::time::Duration::from_secs_f64(secs_needed)
}
}
}
async fn broadcast_chunk(
writers: &[AsyncFd<std::os::fd::OwnedFd>],
edge_senders: &[Option<crate::flow_channel::FlowSender>],
data: &[u8],
) {
// Internal pipes a los consumers del pipeline.
for w in writers {
let _ = async_write_all(w, data).await;
}
// Externos: broadcast a subscribers vía FlowChannel.
// Cada edge tiene su propio sender (mismo data — el sample/discernment
// viaja por broadcast separados para que un subscriber por edge vea su
// stream específico).
if edge_senders.iter().any(|s| s.is_some()) {
let shared = std::sync::Arc::new(data.to_vec());
for s in edge_senders {
if let Some(s) = s {
let _ = s.send(shared.clone());
}
}
}
}
async fn async_read(
afd: &AsyncFd<std::os::fd::OwnedFd>,
buf: &mut [u8],
) -> std::io::Result<usize> {
loop {
let mut guard = afd.readable().await?;
let fd = afd.as_raw_fd();
// SAFETY: lectura sobre fd válido propiedad del AsyncFd.
let r = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if r >= 0 {
return Ok(r as usize);
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
guard.clear_ready();
continue;
}
return Err(err);
}
}
async fn async_write_all(
afd: &AsyncFd<std::os::fd::OwnedFd>,
mut buf: &[u8],
) -> std::io::Result<()> {
while !buf.is_empty() {
let mut guard = afd.writable().await?;
let fd = afd.as_raw_fd();
// SAFETY: escritura sobre fd válido propiedad del AsyncFd.
let r = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
if r > 0 {
buf = &buf[r as usize..];
continue;
}
if r == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"write 0",
));
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
guard.clear_ready();
continue;
}
return Err(err);
}
Ok(())
}
fn set_nonblocking(fd: RawFd) {
// SAFETY: fcntl con F_SETFL es seguro para fds válidos.
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL, 0);
if flags >= 0 {
libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
}
}
// Extension trait para abstraer la API de OwnedFd entre versiones (compat).
trait OwnedFdFromRawCompat: Sized {
unsafe fn from_raw_fd_compat(fd: RawFd) -> Self;
}
impl OwnedFdFromRawCompat for std::os::fd::OwnedFd {
unsafe fn from_raw_fd_compat(fd: RawFd) -> Self {
use std::os::fd::FromRawFd;
// SAFETY: el caller transfiere ownership de `fd` a la `OwnedFd`.
unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) }
}
}
// Re-export para que el unused warning del AsRawFd se calle si no se usa.
#[allow(dead_code)]
fn _keep_raw(_: &dyn AsRawFd) {}
#[cfg(test)]
mod tests {
use super::*;
use brahman_card::Payload;
use ente_incarnate::IncarnatorConfig;
use shuma_card::{CommandRef, DiscernPolicy, FlowEdge, PipelineSpec, WorkspaceId};
fn cmd(label: &str, exec: &str, argv: &[&str]) -> CommandRef {
CommandRef {
label: label.into(),
payload: Payload::Native {
exec: exec.into(),
argv: argv.iter().map(|s| s.to_string()).collect(),
envp: vec![],
},
soma: Default::default(),
flows: Default::default(),
supervision: brahman_card::Supervision::OneShot,
}
}
#[tokio::test]
async fn pipeline_isolated_echo_to_cat_runs() {
let spec = PipelineSpec {
label: "echo-cat".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["hola pipeline aislado"]),
cmd("p2", "/bin/cat", &[]),
],
edges: vec![FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
}],
discern: DiscernPolicy::default(),
restart_on_failure: false,
restart_backoff_ms: 200,
restart_max_backoff_ms: 30_000,
restart_max: 0,
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 2);
// Cosecha.
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_fanin_two_to_one() {
// 2 productores → 1 consumer (cat). El merger multiplexa.
let spec = PipelineSpec {
label: "fanin".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["from-p1"]),
cmd("p2", "/bin/echo", &["from-p2"]),
cmd("c", "/bin/cat", &[]),
],
edges: vec![
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
FlowEdge {
from: 1,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
],
discern: DiscernPolicy::default(),
restart_on_failure: false,
restart_backoff_ms: 200,
restart_max_backoff_ms: 30_000,
restart_max: 0,
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 3);
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_fanout_one_to_two() {
// 1 productor (echo) → 2 consumers (wc -c). Splitter replica.
let spec = PipelineSpec {
label: "fanout".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p", "/bin/echo", &["fanout-test"]),
cmd("c1", "/bin/cat", &[]),
cmd("c2", "/bin/cat", &[]),
],
edges: vec![
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
},
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
],
discern: DiscernPolicy::default(),
restart_on_failure: false,
restart_backoff_ms: 200,
restart_max_backoff_ms: 30_000,
restart_max: 0,
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 3);
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_isolated_with_tap_captures_discernment() {
let spec = PipelineSpec {
label: "json-cat".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["{\"hello\": 1}"]),
cmd("p2", "/bin/cat", &[]),
],
edges: vec![FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
}],
discern: DiscernPolicy {
sample_bytes: 4096,
enrich_producer: true,
replay_chunks: 32,
replay_bytes: 0,
max_bytes_per_sec: 0,
},
restart_on_failure: false,
restart_backoff_ms: 200,
restart_max_backoff_ms: 30_000,
restart_max: 0,
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", true, disc, inc, None).await.unwrap();
assert_eq!(launch.edge_discernments.len(), 1);
let d = &launch.edge_discernments[0];
let dis = d.discernment.as_ref().expect("discernment present");
assert_eq!(dis.mime.as_deref(), Some("application/json"));
// Cosecha.
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
}
@@ -0,0 +1,210 @@
//! Resource accounting por workspace.
//!
//! Dos fuentes:
//! - **Per-proc** (`/proc/<pid>/status` + `stat`): suma RSS y CPU ticks de
//! los comandos vivos del workspace. Siempre disponible. Costo: O(N pids).
//! - **Cgroup v2** (`memory.current`, `cpu.stat`): un read por workspace si
//! `SomaSpec.cgroup.path` está y es leíble. Más preciso (incluye descendants).
//!
//! Si ambos están disponibles, devolvemos el cgroup (más preciso) y dejamos
//! el per-proc como `sample_via_proc`.
use std::path::Path;
use std::time::Instant;
#[derive(Debug, Clone, Default)]
pub struct WorkspaceStats {
pub commands_alive: u32,
pub commands_total: u32,
/// RSS sumado en bytes. `None` si no se pudo medir.
pub rss_bytes: Option<u64>,
/// High-water mark de RSS (peak alguna vez observado). Cgroup v2:
/// `memory.peak` (≥6.5). Per-proc: suma de `VmHWM` de cada pid.
pub rss_peak_bytes: Option<u64>,
/// Tiempo CPU acumulado en microsegundos. `None` si no se pudo medir.
pub cpu_usec: Option<u64>,
/// %CPU instantáneo derivado entre dos samples consecutivos. `None`
/// en el primer sample (no hay baseline). `100.0` = 1 core saturado.
/// `400.0` con 4 cores activos = la máquina al 100%.
pub cpu_percent: Option<f32>,
/// Cores online detectados (sysconf `_SC_NPROCESSORS_ONLN`). Útil
/// para normalizar `cpu_percent / cpu_cores` → 0..100 absoluto.
pub cpu_cores: u32,
/// Fuente del dato: "proc" | "cgroup" | "mixed".
pub source: String,
/// Wall-clock uptime del workspace en milisegundos.
pub uptime_ms: u64,
}
impl WorkspaceStats {
/// CPU% normalizado al 100% total de la máquina (no por core).
/// Útil para comparar workspaces independiente del paralelismo.
pub fn cpu_percent_total(&self) -> Option<f32> {
self.cpu_percent
.map(|p| if self.cpu_cores == 0 { p } else { p / self.cpu_cores as f32 })
}
}
/// Reporte de quotas: comparación entre el accounting real y los
/// `rlimits` declarados en `SomaSpec`. NO hace enforcement automático
/// en v1 — sólo accounting + reporting. El caller decide qué hacer.
#[derive(Debug, Clone, Default)]
pub struct QuotaReport {
/// Límite de memoria declarado (bytes). None = sin límite.
pub mem_limit: Option<u64>,
/// Límite de procesos declarado.
pub nproc_limit: Option<u32>,
/// Lista de violaciones detectadas (strings humano-legibles).
/// Empty = todo dentro de quota.
pub breaches: Vec<String>,
}
/// Detecta cores online runtime. Cacheado vía OnceLock — el valor no
/// cambia salvo hotplug, que es raro y aceptamos sample stale.
fn online_cores() -> u32 {
static CACHED: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
*CACHED.get_or_init(|| {
let n = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };
if n > 0 { n as u32 } else { 1 }
})
}
/// Mide stats para un set de PIDs vivos + un path de cgroup opcional.
pub fn measure(
alive_pids: &[i32],
cgroup_path: Option<&Path>,
workspace_started: Instant,
) -> WorkspaceStats {
let mut rss_proc: u64 = 0;
let mut rss_peak_proc: u64 = 0;
let mut cpu_proc: u64 = 0;
let mut proc_ok = false;
for &pid in alive_pids {
if let Some((rss, peak, cpu)) = read_proc_pid(pid) {
rss_proc += rss;
rss_peak_proc += peak;
cpu_proc += cpu;
proc_ok = true;
}
}
let cgroup = cgroup_path.and_then(read_cgroup_stats);
let (rss, rss_peak, cpu, source) = match (cgroup, proc_ok) {
(Some(cg), _) => (Some(cg.rss), cg.rss_peak, Some(cg.cpu_usec), "cgroup".to_string()),
(None, true) => (
Some(rss_proc),
Some(rss_peak_proc),
Some(cpu_proc),
"proc".to_string(),
),
(None, false) => (None, None, None, "none".to_string()),
};
WorkspaceStats {
commands_alive: alive_pids.len() as u32,
commands_total: 0,
rss_bytes: rss,
rss_peak_bytes: rss_peak,
cpu_usec: cpu,
cpu_percent: None, // El caller lo rellena con el diff vs prev sample.
cpu_cores: online_cores(),
source,
uptime_ms: workspace_started.elapsed().as_millis() as u64,
}
}
struct CgroupStats {
rss: u64,
rss_peak: Option<u64>,
cpu_usec: u64,
}
/// Lee `(rss_bytes, rss_peak_bytes, cpu_usec)` de `/proc/<pid>/`. None si el proc desapareció.
fn read_proc_pid(pid: i32) -> Option<(u64, u64, u64)> {
let (rss_kb, hwm_kb) = {
let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
let mut rss = 0u64;
let mut hwm = 0u64;
for l in status.lines() {
if let Some(rest) = l.strip_prefix("VmRSS:") {
rss = rest
.trim()
.split_whitespace()
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
} else if let Some(rest) = l.strip_prefix("VmHWM:") {
hwm = rest
.trim()
.split_whitespace()
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
}
}
(rss, hwm)
};
let cpu_usec = {
let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
// formato: pid (comm) state ppid pgrp ... utime stime cutime cstime
// Cuidado: comm puede tener espacios y paréntesis. Buscamos la última `)`.
let end_comm = stat.rfind(')')?;
let after = &stat[end_comm + 1..];
let fields: Vec<&str> = after.split_whitespace().collect();
// Tras `)`, índice 0 = state, índice 11 = utime, 12 = stime.
let utime = fields.get(11).and_then(|s| s.parse::<u64>().ok()).unwrap_or(0);
let stime = fields.get(12).and_then(|s| s.parse::<u64>().ok()).unwrap_or(0);
let ticks = utime + stime;
// Convertimos ticks → microsegundos. SC_CLK_TCK típicamente 100.
let clk_tck = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }.max(1) as u64;
ticks * 1_000_000 / clk_tck
};
Some((rss_kb * 1024, hwm_kb * 1024, cpu_usec))
}
/// Lee `CgroupStats` del cgroup. None si no existe o no es leíble.
/// `memory.peak` requiere kernel ≥6.5; si falta, `rss_peak` queda None.
fn read_cgroup_stats(cgroup_path: &Path) -> Option<CgroupStats> {
let mem = std::fs::read_to_string(cgroup_path.join("memory.current"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())?;
let cpu_stat = std::fs::read_to_string(cgroup_path.join("cpu.stat")).ok()?;
let cpu_usec = cpu_stat
.lines()
.find_map(|l| l.strip_prefix("usage_usec"))
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let peak = std::fs::read_to_string(cgroup_path.join("memory.peak"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok());
Some(CgroupStats {
rss: mem,
rss_peak: peak,
cpu_usec,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn measure_with_no_pids_returns_zero() {
let stats = measure(&[], None, Instant::now());
assert_eq!(stats.commands_alive, 0);
assert_eq!(stats.rss_bytes, None);
assert_eq!(stats.source, "none");
}
#[test]
fn measure_self_pid_returns_data() {
let me = std::process::id() as i32;
let stats = measure(&[me], None, Instant::now());
assert_eq!(stats.commands_alive, 1);
// Nuestro propio RSS debería ser > 0.
assert!(stats.rss_bytes.unwrap_or(0) > 0);
assert_eq!(stats.source, "proc");
}
}