feat(shipote): data plane + DAG fan-in/out + stats + lifecycle (fases F-I)

Pipeline runtime:
- Fan-out 1→N (splitter task replica al N consumers) y fan-in N→1 (merger
  task con mpsc + reader-per-input). DAGs no lineales soportados.
- Flow channels: Unix socket + tokio broadcast con replay buffer
  configurable por pipeline (DiscernPolicy.replay_chunks). Subscribers
  externos vía `shipote flow tail <socket>`.
- Templating en specs con `${KEY}` (CLI `--var KEY=VALUE`). Walk
  recursivo sobre serde_json::Value, soporta todos los strings del schema.
- Pipelines guardados (`pipeline save/saved-list/drop/run-saved`)
  persisten con el snapshot.

Lifecycle de comandos:
- Log capture per-stream (stdout/stderr separados) via pipe O_CLOEXEC +
  AsyncFd. CLI `shipote logs <ws> <cmd> --stream {stdout,stderr,both}`.
- Stop graceful con tiempo configurable: SIGTERM → grace → SIGKILL.
  Tanto a nivel workspace como pipeline individual.
- TTL auto-stop ya existente (Fase C) sigue funcionando.

ente-incarnate:
- ChildStdio declarativo (Fase C) + ChildPreExec declarativo nuevo:
  NoNewPrivs, ParentDeathSig, Dumpable, NewSession, Chdir, Umask.
- Aplicación pre-execve async-signal-safe en ambos paths (plain via
  Command::pre_exec, namespaced via callback del clone(2)).

Observabilidad:
- WorkspaceStats: RSS + RSS peak (VmHWM o memory.peak cgroup) + CPU usec
  + uptime. Fuente per-proc o cgroup según delegation.
- shipote-shell con sparkline ASCII por workspace (history cap 24),
  card de flow channels activos, vista de comandos + saved pipelines.
- Tap → broker: cada edge enriquecido con TypeRef se anuncia como Card
  efímera vía SidecarPool (graceful si broker no corre).

Discern:
- Integrado en yahweh-provider-fs (mime_type en EntityNode).
- Integrado en nouser-core::cluster::pick_lens como fallback cuando la
  extensión cae a Lens::Grid.

79 tests pasan: ente-incarnate (16), nouser-core (27), shipote-card (8),
shipote-core (20), shipote-discern (5), yahweh-provider-fs (3).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 00:29:46 +00:00
parent c22d2480b9
commit 36dac00c8d
13 changed files with 2187 additions and 253 deletions
@@ -0,0 +1,320 @@
//! Flow channels: data plane sobre Unix socket por edge enriquecido.
//!
//! Cuando un splitter detecta el TypeRef de un edge, además de replicar a
//! los consumers internos del pipeline, se levanta un FlowChannel que
//! expone los bytes a subscribers externos (otros módulos del fractal).
//!
//! ## Diseño
//!
//! - `tokio::sync::broadcast::channel` para fan-out lock-less entre el
//! splitter (sender) y los N subscribers conectados.
//! - `UnixListener` accept-loop: por cada cliente nuevo, spawn una task
//! que drena el receiver y escribe al socket.
//! - Subscribers lentos pueden perder mensajes (broadcast::Receiver::Lagged)
//! — se loguea warn y se sigue. Esto es deliberado para no bloquear el
//! splitter en consumers lentos.
//!
//! ## Lifetime
//!
//! `FlowChannel` se construye con `new(path)`. Cuando se drop:
//! - El `accept_task` se cancela (vía drop del `tokio::task::JoinHandle`
//! que tenemos abort-on-drop).
//! - El socket file se borra del FS (`Drop` impl).
//!
//! Sender clones son baratos; los subscribers conectados se enteran del
//! cierre cuando todos los senders se dropean.
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;
use tokio::sync::broadcast;
use tokio::task::AbortHandle;
use tracing::{debug, warn};
/// Capacidad del broadcast channel. Si un subscriber está más de N chunks
/// atrasado, queda `Lagged` y empieza a perder mensajes.
const BROADCAST_CAP: usize = 64;
/// Chunks default del replay buffer. Cuando un cliente nuevo se conecta,
/// recibe hasta estos N chunks antes de iniciar el broadcast live.
/// Override via `FlowChannel::with_replay_cap`.
pub const DEFAULT_REPLAY_CHUNKS: usize = 32;
pub struct FlowChannel {
sender: broadcast::Sender<Arc<Vec<u8>>>,
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_cap: usize,
socket_path: PathBuf,
_accept_handle: AbortOnDrop,
}
#[derive(Clone)]
pub struct FlowSender {
sender: broadcast::Sender<Arc<Vec<u8>>>,
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
replay_cap: usize,
}
impl FlowSender {
/// Pushea al broadcast y al replay buffer. Si no hay subscribers,
/// el broadcast::send retorna Err pero igual guardamos en replay
/// (subscribers tarde verán los chunks pasados).
pub fn send(&self, data: Arc<Vec<u8>>) {
let cap = self.replay_cap;
if let Ok(mut g) = self.replay.lock() {
if g.len() >= cap {
g.pop_front();
}
g.push_back(data.clone());
}
let _ = self.sender.send(data);
}
}
impl std::fmt::Debug for FlowChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowChannel")
.field("socket_path", &self.socket_path)
.field("subscribers", &self.sender.receiver_count())
.finish()
}
}
impl FlowChannel {
/// Crea un FlowChannel atado al path `socket_path`. Si el path ya
/// existe, lo borra antes de bind (asume restart limpio).
pub fn new(socket_path: PathBuf) -> std::io::Result<Self> {
Self::with_replay_cap(socket_path, DEFAULT_REPLAY_CHUNKS)
}
pub fn with_replay_cap(socket_path: PathBuf, replay_cap: usize) -> std::io::Result<Self> {
let cap = replay_cap.max(1);
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
if let Some(parent) = socket_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let listener = UnixListener::bind(&socket_path)?;
let (tx, _rx_unused) = broadcast::channel::<Arc<Vec<u8>>>(BROADCAST_CAP);
let replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(cap)));
let tx_for_accept = tx.clone();
let replay_for_accept = replay.clone();
let path_for_log = socket_path.clone();
let join = tokio::spawn(async move {
debug!(path = %path_for_log.display(), "flow channel listening");
loop {
let (mut stream, _addr) = match listener.accept().await {
Ok(p) => p,
Err(e) => {
warn!(?e, "flow channel accept failed");
return;
}
};
// Snapshot del replay buffer Y subscribe al broadcast.
// El orden es crítico: subscribe ANTES de drenar el replay
// para no perder chunks que llegan justo en el medio.
let mut rx = tx_for_accept.subscribe();
let snapshot: Vec<Arc<Vec<u8>>> = {
let g = replay_for_accept.lock().expect("replay lock");
g.iter().cloned().collect()
};
tokio::spawn(async move {
// Fase 1: drenar replay snapshot al subscriber.
for chunk in &snapshot {
if let Err(e) = stream.write_all(chunk).await {
debug!(?e, "flow subscriber dropped during replay");
return;
}
}
// Fase 2: live broadcast.
loop {
match rx.recv().await {
Ok(chunk) => {
if let Err(e) = stream.write_all(&chunk).await {
debug!(?e, "flow subscriber dropped");
return;
}
}
Err(broadcast::error::RecvError::Closed) => return,
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "flow subscriber lagged");
}
}
}
});
}
});
Ok(Self {
sender: tx,
replay,
replay_cap: cap,
socket_path,
_accept_handle: AbortOnDrop(join.abort_handle()),
})
}
/// Push un chunk al channel. Si no hay subscribers, drop silencioso.
/// Siempre se guarda en el replay buffer (con cap rotation).
pub fn send(&self, data: Vec<u8>) {
let arc = Arc::new(data);
let cap = self.replay_cap;
if let Ok(mut g) = self.replay.lock() {
if g.len() >= cap {
g.pop_front();
}
g.push_back(arc.clone());
}
let _ = self.sender.send(arc);
}
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
/// Handle clone-able para que tasks externas (splitter) pushen al
/// channel sin tener ownership del FlowChannel. Cada push se guarda
/// también en el replay buffer.
pub fn sender_handle(&self) -> FlowSender {
FlowSender {
sender: self.sender.clone(),
replay: self.replay.clone(),
replay_cap: self.replay_cap,
}
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl Drop for FlowChannel {
fn drop(&mut self) {
// El AbortOnDrop cancela el accept loop; sólo nos queda limpiar el
// socket file.
let _ = std::fs::remove_file(&self.socket_path);
}
}
struct AbortOnDrop(AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// Path canónico para un flow channel: `$XDG_RUNTIME_DIR/shipote-flow-<id>.sock`.
pub fn default_flow_socket_path(id: &str) -> PathBuf {
let base = std::env::var("XDG_RUNTIME_DIR").unwrap_or_else(|_| {
let uid = nix::unistd::getuid().as_raw();
let p = format!("/run/user/{uid}");
if std::path::Path::new(&p).exists() {
p
} else {
"/tmp".into()
}
});
PathBuf::from(base).join(format!("shipote-flow-{id}.sock"))
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
use tokio::net::UnixStream;
#[tokio::test]
async fn channel_delivers_to_subscriber() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
let ch = FlowChannel::new(path.clone()).unwrap();
// Subscriber se conecta.
let path_clone = path.clone();
let task = tokio::spawn(async move {
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
let mut buf = vec![0u8; 64];
let n = stream.read(&mut buf).await.unwrap();
buf.truncate(n);
buf
});
// Damos tiempo al accept.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Hasta que haya 1 receiver_count, el send no llega.
for _ in 0..50 {
if ch.subscriber_count() >= 1 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
ch.send(b"hello-flow".to_vec());
let received = tokio::time::timeout(std::time::Duration::from_secs(2), task)
.await
.expect("timeout")
.unwrap();
assert_eq!(received, b"hello-flow");
}
#[tokio::test]
async fn replay_buffer_serves_late_subscriber() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
let ch = FlowChannel::new(path.clone()).unwrap();
// Pushes ANTES de cualquier subscriber: van solo al replay.
ch.send(b"chunk-1".to_vec());
ch.send(b"chunk-2".to_vec());
ch.send(b"chunk-3".to_vec());
// Subscriber LATE — debe recibir los 3 chunks del replay.
let path_clone = path.clone();
let task = tokio::spawn(async move {
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
let mut buf = vec![0u8; 256];
// Leemos hasta recibir los 3 chunks (21 bytes esperados).
let mut total = Vec::new();
for _ in 0..20 {
let n = stream.read(&mut buf).await.unwrap();
if n == 0 {
break;
}
total.extend_from_slice(&buf[..n]);
if total.len() >= 21 {
break;
}
}
total
});
let received = tokio::time::timeout(std::time::Duration::from_secs(2), task)
.await
.expect("timeout")
.unwrap();
let s = String::from_utf8_lossy(&received);
assert!(s.contains("chunk-1"), "got: {s:?}");
assert!(s.contains("chunk-2"), "got: {s:?}");
assert!(s.contains("chunk-3"), "got: {s:?}");
}
#[tokio::test]
async fn drop_removes_socket() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("flow.sock");
{
let _ch = FlowChannel::new(path.clone()).unwrap();
assert!(path.exists());
}
// Después del drop, el socket file no debe quedar.
// Damos un pelín de tiempo al runtime para que el drop corra
// mientras estamos en task.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(!path.exists());
}
}
+318 -47
View File
@@ -11,9 +11,11 @@
// el módulo concreto.
#![deny(unsafe_op_in_unsafe_fn)]
pub mod flow_channel;
pub mod logbuf;
pub mod persist;
pub mod pipeline;
pub mod stats;
use brahman_card::{Card, Payload, Supervision};
use ente_incarnate::{Incarnator, IncarnatorConfig};
@@ -55,10 +57,22 @@ pub struct CommandState {
pub pid: Pid,
pub alive: bool,
pub exit_status: Option<i32>,
/// Ring buffer compartido con la tokio task que drena stdout+stderr
/// del comando. `None` para comandos que no capturan output (futuro:
/// comandos con stdout=inherit).
pub logs: Option<logbuf::LogBuf>,
/// Ring buffer del stdout. `None` para comandos sin captura.
pub stdout: Option<logbuf::LogBuf>,
/// Ring buffer del stderr. Separado de `stdout` para que el CLI
/// pueda filtrarlos. `None` para comandos sin captura.
pub stderr: Option<logbuf::LogBuf>,
/// Si el comando fue lanzado como parte de un Pipeline, su ULID.
pub pipeline_id: Option<Ulid>,
}
/// Stream a leer en `get_command_logs`. `Both` concatena stderr-después-stdout
/// para una vista combinada (orden temporal aproximado).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogStream {
Stdout,
Stderr,
Both,
}
pub struct WorkspaceManager {
@@ -72,6 +86,11 @@ struct Inner {
/// que "pipelines vivos" — son specs guardados para reusar con
/// `run-saved`. Sobreviven restart vía snapshot.
saved_pipelines: HashMap<String, PipelineSpec>,
/// Flow channels vivos por pipeline. Se retienen hasta que el
/// pipeline termine — cuando todos los hijos del pipeline murieron,
/// el reaper los borra (futuro). v1: viven hasta `stop_pipeline_flows`
/// explícito o hasta shutdown.
pipeline_flows: HashMap<Ulid, Vec<crate::flow_channel::FlowChannel>>,
}
#[derive(Debug, Clone)]
@@ -156,11 +175,134 @@ impl WorkspaceManager {
inner: Arc::new(Mutex::new(Inner {
workspaces: HashMap::new(),
saved_pipelines: HashMap::new(),
pipeline_flows: HashMap::new(),
})),
incarnator: Arc::new(Incarnator::new(cfg)),
}
}
/// Registra los comandos lanzados por un pipeline en el workspace.
/// Esto permite `pipeline_stop` (matar selectivamente sólo los pids
/// de un pipeline). `pipeline_id` se setea en cada CommandState.
pub async fn register_pipeline_commands(
&self,
workspace: WorkspaceId,
pipeline_id: Ulid,
commands: Vec<(String, i32)>,
) {
let mut g = self.inner.lock().await;
let Some(ws) = g.workspaces.get_mut(&workspace) else { return };
for (label, pid) in commands {
let cmd_id = Ulid::new();
ws.commands.insert(
cmd_id,
CommandState {
id: cmd_id,
label,
pid: Pid::from_raw(pid),
alive: true,
exit_status: None,
stdout: None,
stderr: None,
pipeline_id: Some(pipeline_id),
},
);
}
}
/// Detiene selectivamente los comandos de un pipeline. SIGTERM →
/// `grace` → SIGKILL. Devuelve cantidad reapeada. Si no hay comandos
/// del pipeline en ningún workspace, retorna 0.
pub async fn stop_pipeline(
&self,
pipeline_id: Ulid,
grace: std::time::Duration,
) -> u32 {
// 1) Recolectamos pids de ese pipeline en todos los workspaces.
let mut targets: Vec<Pid> = Vec::new();
{
let g = self.inner.lock().await;
for ws in g.workspaces.values() {
for cmd in ws.commands.values() {
if cmd.alive && cmd.pipeline_id == Some(pipeline_id) {
targets.push(cmd.pid);
}
}
}
}
if targets.is_empty() {
return 0;
}
let initial = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM };
for pid in &targets {
let _ = kill(*pid, initial);
}
let mut reaped = 0u32;
let mut still = targets.clone();
let deadline = std::time::Instant::now() + grace;
let poll = std::time::Duration::from_millis(20);
while !still.is_empty() && std::time::Instant::now() < deadline {
still.retain(|pid| match waitpid(*pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => true,
Ok(_) => {
reaped += 1;
false
}
Err(_) => false,
});
if !still.is_empty() {
tokio::time::sleep(poll).await;
}
}
for pid in &still {
let _ = kill(*pid, Signal::SIGKILL);
let _ = waitpid(*pid, None);
reaped += 1;
}
// Marcar como dead en estado in-memory.
let mut g = self.inner.lock().await;
for ws in g.workspaces.values_mut() {
for cmd in ws.commands.values_mut() {
if cmd.pipeline_id == Some(pipeline_id) && cmd.alive {
cmd.alive = false;
}
}
}
// Drop flows del pipeline.
g.pipeline_flows.remove(&pipeline_id);
info!(%pipeline_id, reaped, "pipeline stopped");
reaped
}
/// Retiene los FlowChannels de un pipeline para que sobrevivan al
/// fin del request. Drop = cierre del data plane.
pub async fn retain_pipeline_flows(
&self,
pipeline: Ulid,
flows: Vec<crate::flow_channel::FlowChannel>,
) {
self.inner.lock().await.pipeline_flows.insert(pipeline, flows);
}
/// Lista pipelines vivos con sus sockets activos.
pub async fn list_flow_pipelines(&self) -> Vec<(Ulid, Vec<std::path::PathBuf>)> {
let g = self.inner.lock().await;
g.pipeline_flows
.iter()
.map(|(id, flows)| {
(
*id,
flows.iter().map(|f| f.socket_path().to_path_buf()).collect(),
)
})
.collect()
}
/// Cierra el data plane de un pipeline (drop = remove_file de sockets).
pub async fn drop_pipeline_flows(&self, pipeline: Ulid) -> bool {
self.inner.lock().await.pipeline_flows.remove(&pipeline).is_some()
}
pub fn incarnator(&self) -> &Incarnator {
&self.incarnator
}
@@ -208,6 +350,35 @@ impl WorkspaceManager {
.map(|w| w.spec.label.clone())
}
/// Estadísticas de recursos del workspace: RSS + CPU agregado de sus
/// comandos vivos. Lee `/proc/<pid>/` directamente; si el spec declara
/// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye
/// descendants).
pub async fn workspace_stats(&self, id: WorkspaceId) -> Option<stats::WorkspaceStats> {
let g = self.inner.lock().await;
let ws = g.workspaces.get(&id)?;
let alive: Vec<i32> = ws
.commands
.values()
.filter(|c| c.alive)
.map(|c| c.pid.as_raw())
.collect();
let total = ws.commands.len() as u32;
let cgroup_path = if ws.spec.soma.cgroup.path.is_empty() {
None
} else {
// resolve_cgroup_path está en ente_incarnate, pero acá basta
// con el path absoluto bajo /sys/fs/cgroup. Resolución gruesa.
Some(std::path::PathBuf::from(format!(
"/sys/fs/cgroup{}",
ws.spec.soma.cgroup.path
)))
};
let mut s = stats::measure(&alive, cgroup_path.as_deref(), ws.started);
s.commands_total = total;
Some(s)
}
pub async fn create(
self: &Arc<Self>,
spec: WorkspaceSpec,
@@ -269,31 +440,66 @@ impl WorkspaceManager {
}
pub async fn stop(&self, id: WorkspaceId) -> Result<u32, CoreError> {
self.stop_with_grace(id, std::time::Duration::from_millis(1000)).await
}
/// Variante con tiempo de gracia configurable. SIGTERM → espera `grace`
/// → SIGKILL si quedan vivos. `grace=0` = SIGKILL inmediato.
pub async fn stop_with_grace(
&self,
id: WorkspaceId,
grace: std::time::Duration,
) -> Result<u32, CoreError> {
let mut g = self.inner.lock().await;
let ws = g.workspaces.remove(&id).ok_or(CoreError::WorkspaceNotFound(id))?;
// También limpiamos flow_channels del workspace si los hubiera —
// por workspace lo retenemos por pipeline, no por workspace.
drop(g);
// 1) SIGTERM (o SIGKILL si grace=0) a todos vivos.
let initial_signal = if grace.is_zero() { Signal::SIGKILL } else { Signal::SIGTERM };
let alive_pids: Vec<Pid> = ws
.commands
.values()
.filter(|c| c.alive)
.map(|c| c.pid)
.collect();
for pid in &alive_pids {
let _ = kill(*pid, initial_signal);
}
// 2) Esperar hasta `grace` haciendo polling WNOHANG.
let mut reaped = 0u32;
for (_cid, cmd) in ws.commands {
if cmd.alive {
let _ = kill(cmd.pid, Signal::SIGTERM);
// Cosecha sin bloquear infinito: WNOHANG en loop con un par de intentos.
for _ in 0..50 {
match waitpid(cmd.pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => {
std::thread::sleep(std::time::Duration::from_millis(20));
}
Ok(_) => {
reaped += 1;
break;
}
Err(_) => break,
}
let mut still_alive: Vec<Pid> = alive_pids.clone();
let deadline = std::time::Instant::now() + grace;
let poll_interval = std::time::Duration::from_millis(20);
while !still_alive.is_empty() && std::time::Instant::now() < deadline {
still_alive.retain(|pid| match waitpid(*pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => true,
Ok(_) => {
reaped += 1;
false
}
// Último recurso: SIGKILL.
let _ = kill(cmd.pid, Signal::SIGKILL);
let _ = waitpid(cmd.pid, None);
Err(_) => false,
});
if !still_alive.is_empty() {
tokio::time::sleep(poll_interval).await;
}
}
info!(%id, reaped, "workspace stopped");
// 3) SIGKILL forzoso a los que quedan, y wait blocking.
for pid in &still_alive {
let _ = kill(*pid, Signal::SIGKILL);
let _ = waitpid(*pid, None);
reaped += 1;
}
info!(
%id,
reaped,
grace_ms = grace.as_millis() as u64,
sigkilled = still_alive.len(),
"workspace stopped"
);
Ok(reaped)
}
@@ -321,31 +527,36 @@ impl WorkspaceManager {
};
let card = cmd_ref.to_card(0, &workspace_label)?;
// Pipe para capturar stdout. O_CLOEXEC para que hijos del hijo
// no hereden la copia. v1: stderr=inherit (simplicidad; tail útil
// para stdout solo). Futuro: stderr separado en el ring.
let (capture_r, capture_w) =
// Dos pipes O_CLOEXEC: uno para stdout, otro para stderr.
use std::os::fd::IntoRawFd;
let (sout_r, sout_w) =
nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
use std::os::fd::IntoRawFd;
let capture_r_fd = capture_r.into_raw_fd();
let capture_w_fd = capture_w.into_raw_fd();
let (serr_r, serr_w) =
nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
let sout_r_fd = sout_r.into_raw_fd();
let sout_w_fd = sout_w.into_raw_fd();
let serr_r_fd = serr_r.into_raw_fd();
let serr_w_fd = serr_w.into_raw_fd();
let logs = logbuf::LogBuf::new();
let stdout_buf = logbuf::LogBuf::new();
let stderr_buf = logbuf::LogBuf::new();
let stdio = ente_incarnate::ChildStdio {
stdin_fd: None,
stdout_fd: Some(capture_w_fd),
stderr_fd: None,
stdout_fd: Some(sout_w_fd),
stderr_fd: Some(serr_w_fd),
};
let out = self.incarnator.incarnate_with(&card, stdio)?;
let cmd_id = card.id;
let cmd_label = cmd_ref.label.clone();
let pid = out.pid;
// Drainer: tokio task que lee capture_r_fd y appendea al ring.
spawn_log_drainer(capture_r_fd, logs.clone());
spawn_log_drainer(sout_r_fd, stdout_buf.clone());
spawn_log_drainer(serr_r_fd, stderr_buf.clone());
let mut g = self.inner.lock().await;
if let Some(ws) = g.workspaces.get_mut(&id) {
@@ -357,7 +568,9 @@ impl WorkspaceManager {
pid,
alive: true,
exit_status: None,
logs: Some(logs),
stdout: Some(stdout_buf),
stderr: Some(stderr_buf),
pipeline_id: None,
},
);
}
@@ -372,16 +585,28 @@ impl WorkspaceManager {
}
/// Devuelve el tail del log capturado para `(workspace, command)`.
/// `stream` selecciona stdout/stderr/both.
pub async fn get_command_logs(
&self,
workspace: WorkspaceId,
command: Ulid,
tail_bytes: usize,
stream: LogStream,
) -> Option<Vec<u8>> {
let g = self.inner.lock().await;
let ws = g.workspaces.get(&workspace)?;
let cmd = ws.commands.get(&command)?;
cmd.logs.as_ref().map(|lb| lb.tail(tail_bytes))
match stream {
LogStream::Stdout => cmd.stdout.as_ref().map(|lb| lb.tail(tail_bytes)),
LogStream::Stderr => cmd.stderr.as_ref().map(|lb| lb.tail(tail_bytes)),
LogStream::Both => {
let so = cmd.stdout.as_ref().map(|lb| lb.tail(tail_bytes)).unwrap_or_default();
let se = cmd.stderr.as_ref().map(|lb| lb.tail(tail_bytes)).unwrap_or_default();
let mut out = so;
out.extend_from_slice(&se);
Some(out)
}
}
}
/// Lista comandos de un workspace.
@@ -397,7 +622,8 @@ impl WorkspaceManager {
pid: c.pid.as_raw(),
alive: c.alive,
exit_status: c.exit_status,
log_bytes: c.logs.as_ref().map(|l| l.written_total()).unwrap_or(0),
log_bytes: c.stdout.as_ref().map(|l| l.written_total()).unwrap_or(0)
+ c.stderr.as_ref().map(|l| l.written_total()).unwrap_or(0),
})
.collect();
// Orden estable por ULID (temporal).
@@ -435,7 +661,9 @@ impl WorkspaceManager {
pid: out.pid,
alive: true,
exit_status: None,
logs: None, // run_pipeline NO captura logs (los conecta por pipes).
stdout: None, // run_pipeline NO captura (conecta por pipes).
stderr: None,
pipeline_id: None,
},
);
}
@@ -538,19 +766,16 @@ mod tests {
};
let (id, _) = mgr.create(spec).await.unwrap();
let summary = mgr
.run(
id,
"/bin/echo".into(),
vec!["captured-output".into()],
vec![],
)
.run(id, "/bin/echo".into(), vec!["captured-output".into()], vec![])
.await
.unwrap();
// Esperamos a que el comando termine y el drainer drene.
for _ in 0..50 {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
mgr.reap_dead().await;
let logs = mgr.get_command_logs(id, summary.id, 0).await.unwrap_or_default();
let logs = mgr
.get_command_logs(id, summary.id, 0, LogStream::Stdout)
.await
.unwrap_or_default();
if !logs.is_empty() {
let s = String::from_utf8_lossy(&logs);
assert!(s.contains("captured-output"), "got: {s:?}");
@@ -560,6 +785,52 @@ mod tests {
panic!("logs never captured");
}
#[tokio::test]
async fn run_captures_stderr_separately() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
let spec = WorkspaceSpec {
label: "stderr".into(),
soma: Default::default(),
permissions: Default::default(),
ttl: None,
flow_dirs: vec![],
on_exit: shipote_card::ExitPolicy::Reap,
};
let (id, _) = mgr.create(spec).await.unwrap();
// sh -c "echo OUT; echo ERR >&2"
let summary = mgr
.run(
id,
"/bin/sh".into(),
vec!["-c".into(), "echo OUT; echo ERR >&2".into()],
vec![],
)
.await
.unwrap();
for _ in 0..50 {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
mgr.reap_dead().await;
let so = mgr
.get_command_logs(id, summary.id, 0, LogStream::Stdout)
.await
.unwrap_or_default();
let se = mgr
.get_command_logs(id, summary.id, 0, LogStream::Stderr)
.await
.unwrap_or_default();
if !so.is_empty() && !se.is_empty() {
let so_s = String::from_utf8_lossy(&so);
let se_s = String::from_utf8_lossy(&se);
assert!(so_s.contains("OUT"), "stdout: {so_s:?}");
assert!(se_s.contains("ERR"), "stderr: {se_s:?}");
assert!(!so_s.contains("ERR"), "stdout no debería tener ERR");
assert!(!se_s.contains("OUT"), "stderr no debería tener OUT");
return;
}
}
panic!("logs never captured on both streams");
}
#[tokio::test]
async fn run_true_in_workspace() {
let mgr = Arc::new(WorkspaceManager::new(IncarnatorConfig::default()));
@@ -12,7 +12,7 @@ use brahman_card::Payload;
use ente_incarnate::{ChildStdio, Incarnator};
use nix::fcntl::OFlag;
use nix::unistd::pipe2;
use shipote_card::{FlowEdge, PipelineSpec};
use shipote_card::PipelineSpec;
use shipote_discern::{DiscernPipeline, Discernment, Hint};
use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
use std::sync::Arc;
@@ -22,7 +22,7 @@ use tracing::{debug, info, warn};
use ulid::Ulid;
/// Resultado de lanzar un pipeline.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PipelineLaunch {
pub pipeline: Ulid,
pub command_pids: Vec<(String, i32)>,
@@ -37,19 +37,29 @@ pub struct EdgeDiscernment {
pub to_label: String,
pub to_input: String,
pub discernment: Option<Discernment>,
/// Path del Unix socket donde otros módulos pueden suscribirse al
/// stream replicado por este edge. `None` cuando tap=false (no hay
/// data plane porque no hay sampling).
pub flow_socket: Option<std::path::PathBuf>,
}
/// Lanza un pipeline conectando nodos por stdin/stdout. Cada nodo se
/// encarna via `Incarnator` (con o sin namespacing según su SomaSpec).
///
/// v1: pipeline lineal (un edge entrante por nodo). Múltiples edges
/// entrantes generan warning y sólo el primero se honra.
/// Soporta:
/// - Pipeline lineal (1 producer → 1 consumer).
/// - **Fan-out** (1 producer → N consumers): shipote interpone un
/// splitter que duplica bytes a cada destino. Cuando `tap=true`, el
/// splitter además samplea para discernir.
/// - Múltiples predecessors por nodo NO se soporta aún (fan-in): sólo se
/// honra el primer edge entrante.
pub async fn run_pipeline(
spec: &PipelineSpec,
workspace_label: &str,
tap: bool,
discerner: Arc<DiscernPipeline>,
incarnator: Arc<Incarnator>,
manager: Option<Arc<crate::WorkspaceManager>>,
) -> Result<PipelineLaunch, CoreError> {
spec.validate()?;
let n = spec.nodes.len();
@@ -60,30 +70,100 @@ pub async fn run_pipeline(
"launching pipeline (incarnated)"
);
// Predecessor: para cada nodo, su edge entrante (si tiene).
let mut predecessor: Vec<Option<&FlowEdge>> = vec![None; n];
for e in &spec.edges {
if predecessor[e.to].is_some() {
warn!(node = e.to, "v1 pipeline: nodo con múltiples predecessors — sólo se honra el primero");
continue;
}
predecessor[e.to] = Some(e);
// Pre-compute grafo:
// - `consumers[i]` = índices de edges salientes de `i`.
// - `predecessors[j]` = índices de edges entrantes a `j`.
let mut consumers: Vec<Vec<usize>> = vec![Vec::new(); n];
let mut predecessors: Vec<Vec<usize>> = vec![Vec::new(); n];
for (idx, e) in spec.edges.iter().enumerate() {
consumers[e.from].push(idx);
predecessors[e.to].push(idx);
}
let mut pids = Vec::with_capacity(n);
let mut taps: Vec<TapHandle> = Vec::new();
// Para cada nodo i que produce, guardamos el FD de read del pipe
// del productor → al armar el consumidor lo consume.
// Pero como puede haber tap intermedio, llevamos un esquema:
// - Sin tap: read FD del pipe productor → stdin del consumidor.
// - Con tap: read FD del pipe productor → tokio proxy → write FD
// del pipe consumidor → stdin del consumidor.
// Para simplicidad lineal, `pending_stdin_for_next` guarda el FD que
// el siguiente consumidor debe usar como stdin.
let mut pending_stdin_for_next: Option<RawFd> = None;
// Por cada edge: par (r_to_consumer, w_from_producer_side).
// El consumer recibe r_to_consumer; el producer escribe a w_from_producer_side
// (directa o vía splitter).
let mut edge_r: Vec<RawFd> = vec![-1; spec.edges.len()];
let mut edge_w: Vec<RawFd> = vec![-1; spec.edges.len()];
for i in 0..spec.edges.len() {
let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
edge_r[i] = r.into_raw_fd();
edge_w[i] = w.into_raw_fd();
}
let mut consumer_stdin_fd: Vec<Option<RawFd>> = vec![None; n];
let mut producer_stdout_fd: Vec<Option<RawFd>> = vec![None; n];
let mut splitter_specs: Vec<SplitterSpec> = Vec::new();
let mut merger_specs: Vec<MergerSpec> = Vec::new();
// Stdout del producer: directo a edge_w[único] si tiene 1 consumer y NO tap;
// sino, pipe propio que va al splitter task.
for i in 0..n {
if consumers[i].is_empty() {
continue;
}
if consumers[i].len() == 1 && !tap {
producer_stdout_fd[i] = Some(edge_w[consumers[i][0]]);
continue;
}
// Splitter: pipe propio para el productor → splitter lee y replica a edge_w[*].
let (prod_r, prod_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
producer_stdout_fd[i] = Some(prod_w.into_raw_fd());
let prod_r_fd = prod_r.into_raw_fd();
let mut consumer_writes: Vec<RawFd> = Vec::with_capacity(consumers[i].len());
let mut edge_meta: Vec<EdgeMeta> = Vec::with_capacity(consumers[i].len());
for edge_idx in &consumers[i] {
let edge = &spec.edges[*edge_idx];
consumer_writes.push(edge_w[*edge_idx]);
edge_meta.push(EdgeMeta {
from_label: spec.nodes[edge.from].label.clone(),
from_output: edge.from_output.clone(),
to_label: spec.nodes[edge.to].label.clone(),
to_input: edge.to_input.clone(),
});
}
splitter_specs.push(SplitterSpec {
producer_r_fd: prod_r_fd,
consumer_w_fds: consumer_writes,
edges: edge_meta,
tap,
sample_bytes: spec.discern.sample_bytes,
});
}
// Stdin del consumer: edge_r[único] si tiene 1 predecessor; sino, merger.
for j in 0..n {
match predecessors[j].len() {
0 => {}
1 => {
consumer_stdin_fd[j] = Some(edge_r[predecessors[j][0]]);
}
_ => {
// Merger: lee de N edge_r y escribe a un nuevo pipe cuyo
// read end es el stdin del consumer.
let (cons_r, cons_w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
consumer_stdin_fd[j] = Some(cons_r.into_raw_fd());
let inputs: Vec<RawFd> = predecessors[j]
.iter()
.map(|eidx| edge_r[*eidx])
.collect();
merger_specs.push(MergerSpec {
producer_r_fds: inputs,
consumer_w_fd: cons_w.into_raw_fd(),
});
}
}
}
// Encarnamos cada nodo con su stdin/stdout fd asignado.
let mut pids = Vec::with_capacity(n);
for (i, node) in spec.nodes.iter().enumerate() {
// Validar payload ejecutable.
match &node.payload {
Payload::Native { .. } | Payload::Legacy { .. } => {}
_ => {
@@ -92,91 +172,98 @@ pub async fn run_pipeline(
))
}
}
// Compilamos a Card.
let card = node.to_card(i, workspace_label)?;
// ¿Soy productor? Necesito stdout_fd hacia un pipe nuevo.
let i_is_producer = spec.edges.iter().any(|e| e.from == i);
let stdin_fd: Option<RawFd> = pending_stdin_for_next.take();
let mut stdout_fd: Option<RawFd> = None;
let mut next_pending: Option<RawFd> = None;
// FDs que el PADRE debe cerrar tras spawn (son nuestra copia del
// extremo que pasamos al hijo).
let mut parent_closes: Vec<RawFd> = Vec::new();
if i_is_producer {
let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
let r_raw = r.into_raw_fd();
let w_raw = w.into_raw_fd();
stdout_fd = Some(w_raw);
parent_closes.push(w_raw);
if tap {
// Necesitamos un segundo pipe entre tap y consumidor.
let (r2, w2) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
let r2_raw = r2.into_raw_fd();
let w2_raw = w2.into_raw_fd();
next_pending = Some(r2_raw);
// El tap lee de r_raw y escribe a w2_raw.
let edge = predecessor
.iter()
.find_map(|p| *p)
.and_then(|e| if e.from == i { Some(e) } else { None })
// Edge donde i es from:
.or_else(|| spec.edges.iter().find(|e| e.from == i));
let from_label = node.label.clone();
let to_label = edge
.map(|e| spec.nodes[e.to].label.clone())
.unwrap_or_default();
let from_output = edge.map(|e| e.from_output.clone()).unwrap_or_default();
let to_input = edge.map(|e| e.to_input.clone()).unwrap_or_default();
let sample_bytes = spec.discern.sample_bytes;
let disc = discerner.clone();
let h = spawn_tap(
r_raw, w2_raw, sample_bytes, disc, from_label, from_output, to_label, to_input,
);
taps.push(h);
// r_raw y w2_raw pasaron a manos del tokio task. No los
// cerramos en el padre.
} else {
// Sin tap, el read del productor va directo al stdin del
// siguiente consumidor.
next_pending = Some(r_raw);
}
}
let stdio = ChildStdio {
stdin_fd,
stdout_fd,
stdin_fd: consumer_stdin_fd[i],
stdout_fd: producer_stdout_fd[i],
stderr_fd: None,
};
// Incarnator absorbe los fds de `stdio` — no los cerramos acá.
// `parent_closes` queda obsoleto.
let _ = parent_closes;
let outcome = incarnator
.incarnate_with(&card, stdio)
.map_err(CoreError::Incarnate)?;
let pid = outcome.pid;
pids.push((node.label.clone(), pid.as_raw()));
debug!(label = %node.label, pid = pid.as_raw(), "node incarnated");
pending_stdin_for_next = next_pending;
}
let pipeline_id = Ulid::new();
let pipeline_id_for_flows = Ulid::new();
// Si tap=true, creamos un FlowChannel por edge para el data plane.
// Cada splitter pushea al sender del channel correspondiente.
let pipeline_id = pipeline_id_for_flows;
let mut flow_channels: Vec<crate::flow_channel::FlowChannel> = Vec::new();
let mut splitter_channels: Vec<Vec<Option<crate::flow_channel::FlowSender>>> =
Vec::with_capacity(splitter_specs.len());
let mut edge_socket_for_splitter: Vec<Vec<Option<std::path::PathBuf>>> = Vec::new();
for s in &splitter_specs {
let mut senders_per_edge = Vec::with_capacity(s.edges.len());
let mut paths_per_edge = Vec::with_capacity(s.edges.len());
for (i, em) in s.edges.iter().enumerate() {
if !s.tap {
senders_per_edge.push(None);
paths_per_edge.push(None);
continue;
}
let id = format!(
"{}-{}-{}-{}",
short_ulid(&pipeline_id),
em.from_label,
em.from_output,
i
);
let socket = crate::flow_channel::default_flow_socket_path(&id);
match crate::flow_channel::FlowChannel::with_replay_cap(socket.clone(), spec.discern.replay_chunks) {
Ok(fc) => {
senders_per_edge.push(Some(fc.sender_handle()));
paths_per_edge.push(Some(socket));
flow_channels.push(fc);
}
Err(e) => {
warn!(?e, "flow channel new failed");
senders_per_edge.push(None);
paths_per_edge.push(None);
}
}
}
splitter_channels.push(senders_per_edge);
edge_socket_for_splitter.push(paths_per_edge);
}
let mut edge_discernments = Vec::with_capacity(taps.len());
for t in taps {
match t.handle.await {
Ok(d) => edge_discernments.push(d),
Err(e) => warn!(?e, "tap handle joined with error"),
// Registramos los flow_channels en el manager AHORA, antes de await
// las tasks. Esto permite que clientes externos hagan `flow list` y
// se suscriban mientras el pipeline aún produce data.
if let Some(mgr) = &manager {
if !flow_channels.is_empty() {
let drained: Vec<crate::flow_channel::FlowChannel> = flow_channels.drain(..).collect();
mgr.retain_pipeline_flows(pipeline_id, drained).await;
}
}
// Spawn mergers + splitters después del incarnate. Cada task posee
// sus fds y los cierra al terminar (via Drop de OwnedFd).
let mut merger_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for m in merger_specs {
merger_handles.push(spawn_merger(m));
}
let mut tap_handles: Vec<SplitterHandle> = Vec::new();
for (s, senders) in splitter_specs.into_iter().zip(splitter_channels.into_iter()) {
tap_handles.push(spawn_splitter(s, discerner.clone(), senders));
}
let mut edge_discernments = Vec::new();
for (h, paths) in tap_handles.into_iter().zip(edge_socket_for_splitter.into_iter()) {
match h.handle.await {
Ok(eds) => {
for (mut ed, path) in eds.into_iter().zip(paths.into_iter()) {
ed.flow_socket = path;
edge_discernments.push(ed);
}
}
Err(e) => warn!(?e, "splitter handle joined with error"),
}
}
for h in merger_handles {
if let Err(e) = h.await {
warn!(?e, "merger handle joined with error");
}
}
@@ -187,57 +274,156 @@ pub async fn run_pipeline(
})
}
struct TapHandle {
handle: tokio::task::JoinHandle<EdgeDiscernment>,
fn short_ulid(u: &Ulid) -> String {
let s = u.to_string();
s[s.len() - 6..].to_string()
}
#[allow(clippy::too_many_arguments)]
fn spawn_tap(
producer_r_fd: RawFd,
consumer_w_fd: RawFd,
sample_bytes: usize,
discerner: Arc<DiscernPipeline>,
#[derive(Debug, Clone)]
struct EdgeMeta {
from_label: String,
from_output: String,
to_label: String,
to_input: String,
) -> TapHandle {
// Marcar non-blocking ANTES de envolverlos en AsyncFd. Sino tokio
// bloquea el reactor en operaciones lentas.
set_nonblocking(producer_r_fd);
set_nonblocking(consumer_w_fd);
}
struct SplitterSpec {
producer_r_fd: RawFd,
consumer_w_fds: Vec<RawFd>,
edges: Vec<EdgeMeta>,
tap: bool,
sample_bytes: usize,
}
struct SplitterHandle {
handle: tokio::task::JoinHandle<Vec<EdgeDiscernment>>,
}
struct MergerSpec {
producer_r_fds: Vec<RawFd>,
consumer_w_fd: RawFd,
}
fn spawn_merger(spec: MergerSpec) -> tokio::task::JoinHandle<()> {
for fd in &spec.producer_r_fds {
set_nonblocking(*fd);
}
set_nonblocking(spec.consumer_w_fd);
// Patrón: una task lectora por cada producer reenvía bytes a un mpsc.
// El merger principal consume del mpsc y escribe al consumer.
// Esto evita el "block en reader idle" del enfoque round-robin sobre
// AsyncFd::ready() (los readers idle nunca dejan turno).
tokio::spawn(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(32);
let nr = spec.producer_r_fds.len();
for fd in spec.producer_r_fds {
let tx = tx.clone();
tokio::spawn(async move {
// SAFETY: ownership transferida.
let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) };
let r = match AsyncFd::with_interest(owned, Interest::READABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "merger reader AsyncFd");
return;
}
};
let mut buf = [0u8; 4096];
loop {
match async_read(&r, &mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx.send(buf[..n].to_vec()).await.is_err() {
break;
}
}
Err(_) => break,
}
}
// Drop de tx → cuando todos los readers cerraron, el rx
// recibe None y el merger termina.
});
}
drop(tx); // sólo los reader tasks tienen sus clones ahora.
// SAFETY: ownership transferida al task.
let w_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.consumer_w_fd) };
let w = match AsyncFd::with_interest(w_owned, Interest::WRITABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "merger AsyncFd w");
return;
}
};
let mut total: u64 = 0;
while let Some(chunk) = rx.recv().await {
if async_write_all(&w, &chunk).await.is_err() {
return;
}
total += chunk.len() as u64;
}
debug!(bytes = total, readers = nr, "merger finished");
})
}
fn spawn_splitter(
spec: SplitterSpec,
discerner: Arc<DiscernPipeline>,
edge_senders: Vec<Option<crate::flow_channel::FlowSender>>,
) -> SplitterHandle {
set_nonblocking(spec.producer_r_fd);
for fd in &spec.consumer_w_fds {
set_nonblocking(*fd);
}
let handle = tokio::spawn(async move {
// SAFETY: el caller transfiere ownership de los fds al task.
let r_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(producer_r_fd) };
let w_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(consumer_w_fd) };
let r = AsyncFd::with_interest(r_std, Interest::READABLE).expect("AsyncFd r");
let w = AsyncFd::with_interest(w_std, Interest::WRITABLE).expect("AsyncFd w");
// SAFETY: ownership transferida al task.
let r_owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(spec.producer_r_fd) };
let r = match AsyncFd::with_interest(r_owned, Interest::READABLE) {
Ok(a) => a,
Err(e) => {
warn!(?e, "splitter AsyncFd r");
return Vec::new();
}
};
let mut writers: Vec<AsyncFd<std::os::fd::OwnedFd>> = Vec::with_capacity(spec.consumer_w_fds.len());
for fd in spec.consumer_w_fds {
let owned = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(fd) };
match AsyncFd::with_interest(owned, Interest::WRITABLE) {
Ok(a) => writers.push(a),
Err(e) => warn!(?e, "splitter AsyncFd w"),
}
}
let mut sample: Vec<u8> = Vec::with_capacity(sample_bytes);
let mut sample: Vec<u8> = Vec::with_capacity(spec.sample_bytes);
let mut buf = [0u8; 4096];
let mut total: u64 = 0;
// Fase 1: sampling + pump.
let mut eof = false;
while !eof && sample.len() < sample_bytes {
// Fase 1: sampling (sólo si tap=true) + replicación.
while !eof && (spec.tap && sample.len() < spec.sample_bytes) {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
Ok(n) => n,
Err(e) => { warn!(?e, "tap producer read failed"); break; }
Err(e) => { warn!(?e, "splitter read"); break; }
};
if n == 0 { break; }
let take = n.min(sample_bytes - sample.len());
sample.extend_from_slice(&buf[..take]);
if let Err(e) = async_write_all(&w, &buf[..n]).await {
warn!(?e, "tap consumer write failed");
break;
if spec.tap {
let take = n.min(spec.sample_bytes - sample.len());
sample.extend_from_slice(&buf[..take]);
}
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
}
let d = discerner.discern(&sample, &Hint { path: None, size_total: None });
// Fase 2: pump-only hasta EOF.
let d = if spec.tap {
discerner.discern(&sample, &Hint { path: None, size_total: None })
} else {
None
};
// Fase 2: replicación pura.
while !eof {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
@@ -245,19 +431,50 @@ fn spawn_tap(
Err(_) => break,
};
if n == 0 { break; }
if async_write_all(&w, &buf[..n]).await.is_err() { break; }
broadcast_chunk(&writers, &edge_senders, &buf[..n]).await;
total += n as u64;
}
debug!(bytes = total, "tap finished");
EdgeDiscernment {
from_label,
from_output,
to_label,
to_input,
discernment: d,
}
debug!(bytes = total, consumers = writers.len(), "splitter finished");
// Mismo discernment para todos los edges del splitter (es el mismo
// stream replicado). Devolvemos N entries (una por edge) para que
// la UI/CLI los liste todos. flow_socket lo rellena el caller.
spec.edges
.into_iter()
.map(|em| EdgeDiscernment {
from_label: em.from_label,
from_output: em.from_output,
to_label: em.to_label,
to_input: em.to_input,
discernment: d.clone(),
flow_socket: None,
})
.collect()
});
TapHandle { handle }
SplitterHandle { handle }
}
async fn broadcast_chunk(
writers: &[AsyncFd<std::os::fd::OwnedFd>],
edge_senders: &[Option<crate::flow_channel::FlowSender>],
data: &[u8],
) {
// Internal pipes a los consumers del pipeline.
for w in writers {
let _ = async_write_all(w, data).await;
}
// Externos: broadcast a subscribers vía FlowChannel.
// Cada edge tiene su propio sender (mismo data — el sample/discernment
// viaja por broadcast separados para que un subscriber por edge vea su
// stream específico).
if edge_senders.iter().any(|s| s.is_some()) {
let shared = std::sync::Arc::new(data.to_vec());
for s in edge_senders {
if let Some(s) = s {
let _ = s.send(shared.clone());
}
}
}
}
async fn async_read(
@@ -377,7 +594,7 @@ mod tests {
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc).await.unwrap();
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 2);
// Cosecha.
for (_, pid) in &launch.command_pids {
@@ -385,6 +602,78 @@ mod tests {
}
}
#[tokio::test]
async fn pipeline_fanin_two_to_one() {
// 2 productores → 1 consumer (cat). El merger multiplexa.
let spec = PipelineSpec {
label: "fanin".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["from-p1"]),
cmd("p2", "/bin/echo", &["from-p2"]),
cmd("c", "/bin/cat", &[]),
],
edges: vec![
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
FlowEdge {
from: 1,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
],
discern: DiscernPolicy::default(),
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 3);
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_fanout_one_to_two() {
// 1 productor (echo) → 2 consumers (wc -c). Splitter replica.
let spec = PipelineSpec {
label: "fanout".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p", "/bin/echo", &["fanout-test"]),
cmd("c1", "/bin/cat", &[]),
cmd("c2", "/bin/cat", &[]),
],
edges: vec![
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
},
FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 2,
to_input: "stdin".into(),
},
],
discern: DiscernPolicy::default(),
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc, None).await.unwrap();
assert_eq!(launch.command_pids.len(), 3);
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_isolated_with_tap_captures_discernment() {
let spec = PipelineSpec {
@@ -403,11 +692,12 @@ mod tests {
discern: DiscernPolicy {
sample_bytes: 4096,
enrich_producer: true,
replay_chunks: 32,
},
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", true, disc, inc).await.unwrap();
let launch = run_pipeline(&spec, "ws", true, disc, inc, None).await.unwrap();
assert_eq!(launch.edge_discernments.len(), 1);
let d = &launch.edge_discernments[0];
let dis = d.discernment.as_ref().expect("discernment present");
@@ -0,0 +1,168 @@
//! Resource accounting por workspace.
//!
//! Dos fuentes:
//! - **Per-proc** (`/proc/<pid>/status` + `stat`): suma RSS y CPU ticks de
//! los comandos vivos del workspace. Siempre disponible. Costo: O(N pids).
//! - **Cgroup v2** (`memory.current`, `cpu.stat`): un read por workspace si
//! `SomaSpec.cgroup.path` está y es leíble. Más preciso (incluye descendants).
//!
//! Si ambos están disponibles, devolvemos el cgroup (más preciso) y dejamos
//! el per-proc como `sample_via_proc`.
use std::path::Path;
use std::time::Instant;
#[derive(Debug, Clone, Default)]
pub struct WorkspaceStats {
pub commands_alive: u32,
pub commands_total: u32,
/// RSS sumado en bytes. `None` si no se pudo medir.
pub rss_bytes: Option<u64>,
/// High-water mark de RSS (peak alguna vez observado). Cgroup v2:
/// `memory.peak` (≥6.5). Per-proc: suma de `VmHWM` de cada pid.
pub rss_peak_bytes: Option<u64>,
/// Tiempo CPU acumulado en microsegundos. `None` si no se pudo medir.
pub cpu_usec: Option<u64>,
/// Fuente del dato: "proc" | "cgroup" | "mixed".
pub source: String,
/// Wall-clock uptime del workspace en milisegundos.
pub uptime_ms: u64,
}
/// Mide stats para un set de PIDs vivos + un path de cgroup opcional.
pub fn measure(
alive_pids: &[i32],
cgroup_path: Option<&Path>,
workspace_started: Instant,
) -> WorkspaceStats {
let mut rss_proc: u64 = 0;
let mut rss_peak_proc: u64 = 0;
let mut cpu_proc: u64 = 0;
let mut proc_ok = false;
for &pid in alive_pids {
if let Some((rss, peak, cpu)) = read_proc_pid(pid) {
rss_proc += rss;
rss_peak_proc += peak;
cpu_proc += cpu;
proc_ok = true;
}
}
let cgroup = cgroup_path.and_then(read_cgroup_stats);
let (rss, rss_peak, cpu, source) = match (cgroup, proc_ok) {
(Some(cg), _) => (Some(cg.rss), cg.rss_peak, Some(cg.cpu_usec), "cgroup".to_string()),
(None, true) => (
Some(rss_proc),
Some(rss_peak_proc),
Some(cpu_proc),
"proc".to_string(),
),
(None, false) => (None, None, None, "none".to_string()),
};
WorkspaceStats {
commands_alive: alive_pids.len() as u32,
commands_total: 0,
rss_bytes: rss,
rss_peak_bytes: rss_peak,
cpu_usec: cpu,
source,
uptime_ms: workspace_started.elapsed().as_millis() as u64,
}
}
struct CgroupStats {
rss: u64,
rss_peak: Option<u64>,
cpu_usec: u64,
}
/// Lee `(rss_bytes, rss_peak_bytes, cpu_usec)` de `/proc/<pid>/`. None si el proc desapareció.
fn read_proc_pid(pid: i32) -> Option<(u64, u64, u64)> {
let (rss_kb, hwm_kb) = {
let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
let mut rss = 0u64;
let mut hwm = 0u64;
for l in status.lines() {
if let Some(rest) = l.strip_prefix("VmRSS:") {
rss = rest
.trim()
.split_whitespace()
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
} else if let Some(rest) = l.strip_prefix("VmHWM:") {
hwm = rest
.trim()
.split_whitespace()
.next()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
}
}
(rss, hwm)
};
let cpu_usec = {
let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
// formato: pid (comm) state ppid pgrp ... utime stime cutime cstime
// Cuidado: comm puede tener espacios y paréntesis. Buscamos la última `)`.
let end_comm = stat.rfind(')')?;
let after = &stat[end_comm + 1..];
let fields: Vec<&str> = after.split_whitespace().collect();
// Tras `)`, índice 0 = state, índice 11 = utime, 12 = stime.
let utime = fields.get(11).and_then(|s| s.parse::<u64>().ok()).unwrap_or(0);
let stime = fields.get(12).and_then(|s| s.parse::<u64>().ok()).unwrap_or(0);
let ticks = utime + stime;
// Convertimos ticks → microsegundos. SC_CLK_TCK típicamente 100.
let clk_tck = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }.max(1) as u64;
ticks * 1_000_000 / clk_tck
};
Some((rss_kb * 1024, hwm_kb * 1024, cpu_usec))
}
/// Lee `CgroupStats` del cgroup. None si no existe o no es leíble.
/// `memory.peak` requiere kernel ≥6.5; si falta, `rss_peak` queda None.
fn read_cgroup_stats(cgroup_path: &Path) -> Option<CgroupStats> {
let mem = std::fs::read_to_string(cgroup_path.join("memory.current"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())?;
let cpu_stat = std::fs::read_to_string(cgroup_path.join("cpu.stat")).ok()?;
let cpu_usec = cpu_stat
.lines()
.find_map(|l| l.strip_prefix("usage_usec"))
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let peak = std::fs::read_to_string(cgroup_path.join("memory.peak"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok());
Some(CgroupStats {
rss: mem,
rss_peak: peak,
cpu_usec,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn measure_with_no_pids_returns_zero() {
let stats = measure(&[], None, Instant::now());
assert_eq!(stats.commands_alive, 0);
assert_eq!(stats.rss_bytes, None);
assert_eq!(stats.source, "none");
}
#[test]
fn measure_self_pid_returns_data() {
let me = std::process::id() as i32;
let stats = measure(&[me], None, Instant::now());
assert_eq!(stats.commands_alive, 1);
// Nuestro propio RSS debería ser > 0.
assert!(stats.rss_bytes.unwrap_or(0) > 0);
assert_eq!(stats.source, "proc");
}
}