This commit is contained in:
sergio
2026-05-10 21:58:16 +00:00
parent 3d55f189c0
commit c22d2480b9
36 changed files with 5158 additions and 363 deletions
@@ -0,0 +1,420 @@
//! Pipeline runtime: encadena nodos con pipes y opcionalmente intercepta
//! cada flow para discernir su contenido.
//!
//! Cada nodo se encarna via [`ente_incarnate::Incarnator`] — eso significa
//! que **cada comando puede tener su propio SomaSpec** (namespaces, cgroup,
//! rlimits) heredado del workspace. La conexión stdin↔stdout se hace con
//! `pipe2(2)` + `ChildStdio` declarativo: el callback de clone(2) hace los
//! `dup2` pre-execve sin romper la regla async-signal-safe.
use crate::CoreError;
use brahman_card::Payload;
use ente_incarnate::{ChildStdio, Incarnator};
use nix::fcntl::OFlag;
use nix::unistd::pipe2;
use shipote_card::{FlowEdge, PipelineSpec};
use shipote_discern::{DiscernPipeline, Discernment, Hint};
use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tracing::{debug, info, warn};
use ulid::Ulid;
/// Resultado de lanzar un pipeline.
#[derive(Debug, Clone)]
pub struct PipelineLaunch {
pub pipeline: Ulid,
pub command_pids: Vec<(String, i32)>,
/// Discernments por edge, en el mismo orden que `spec.edges`.
pub edge_discernments: Vec<EdgeDiscernment>,
}
#[derive(Debug, Clone)]
pub struct EdgeDiscernment {
pub from_label: String,
pub from_output: String,
pub to_label: String,
pub to_input: String,
pub discernment: Option<Discernment>,
}
/// Lanza un pipeline conectando nodos por stdin/stdout. Cada nodo se
/// encarna via `Incarnator` (con o sin namespacing según su SomaSpec).
///
/// v1: pipeline lineal (un edge entrante por nodo). Múltiples edges
/// entrantes generan warning y sólo el primero se honra.
pub async fn run_pipeline(
spec: &PipelineSpec,
workspace_label: &str,
tap: bool,
discerner: Arc<DiscernPipeline>,
incarnator: Arc<Incarnator>,
) -> Result<PipelineLaunch, CoreError> {
spec.validate()?;
let n = spec.nodes.len();
info!(
nodes = n,
edges = spec.edges.len(),
tap,
"launching pipeline (incarnated)"
);
// Predecessor: para cada nodo, su edge entrante (si tiene).
let mut predecessor: Vec<Option<&FlowEdge>> = vec![None; n];
for e in &spec.edges {
if predecessor[e.to].is_some() {
warn!(node = e.to, "v1 pipeline: nodo con múltiples predecessors — sólo se honra el primero");
continue;
}
predecessor[e.to] = Some(e);
}
let mut pids = Vec::with_capacity(n);
let mut taps: Vec<TapHandle> = Vec::new();
// Para cada nodo i que produce, guardamos el FD de read del pipe
// del productor → al armar el consumidor lo consume.
// Pero como puede haber tap intermedio, llevamos un esquema:
// - Sin tap: read FD del pipe productor → stdin del consumidor.
// - Con tap: read FD del pipe productor → tokio proxy → write FD
// del pipe consumidor → stdin del consumidor.
// Para simplicidad lineal, `pending_stdin_for_next` guarda el FD que
// el siguiente consumidor debe usar como stdin.
let mut pending_stdin_for_next: Option<RawFd> = None;
for (i, node) in spec.nodes.iter().enumerate() {
// Validar payload ejecutable.
match &node.payload {
Payload::Native { .. } | Payload::Legacy { .. } => {}
_ => {
return Err(CoreError::Incarnate(
ente_incarnate::IncarnateError::NonExecutablePayload,
))
}
}
// Compilamos a Card.
let card = node.to_card(i, workspace_label)?;
// ¿Soy productor? Necesito stdout_fd hacia un pipe nuevo.
let i_is_producer = spec.edges.iter().any(|e| e.from == i);
let stdin_fd: Option<RawFd> = pending_stdin_for_next.take();
let mut stdout_fd: Option<RawFd> = None;
let mut next_pending: Option<RawFd> = None;
// FDs que el PADRE debe cerrar tras spawn (son nuestra copia del
// extremo que pasamos al hijo).
let mut parent_closes: Vec<RawFd> = Vec::new();
if i_is_producer {
let (r, w) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
let r_raw = r.into_raw_fd();
let w_raw = w.into_raw_fd();
stdout_fd = Some(w_raw);
parent_closes.push(w_raw);
if tap {
// Necesitamos un segundo pipe entre tap y consumidor.
let (r2, w2) = pipe2(OFlag::O_CLOEXEC).map_err(|e| {
CoreError::Incarnate(ente_incarnate::IncarnateError::Pipe(e))
})?;
let r2_raw = r2.into_raw_fd();
let w2_raw = w2.into_raw_fd();
next_pending = Some(r2_raw);
// El tap lee de r_raw y escribe a w2_raw.
let edge = predecessor
.iter()
.find_map(|p| *p)
.and_then(|e| if e.from == i { Some(e) } else { None })
// Edge donde i es from:
.or_else(|| spec.edges.iter().find(|e| e.from == i));
let from_label = node.label.clone();
let to_label = edge
.map(|e| spec.nodes[e.to].label.clone())
.unwrap_or_default();
let from_output = edge.map(|e| e.from_output.clone()).unwrap_or_default();
let to_input = edge.map(|e| e.to_input.clone()).unwrap_or_default();
let sample_bytes = spec.discern.sample_bytes;
let disc = discerner.clone();
let h = spawn_tap(
r_raw, w2_raw, sample_bytes, disc, from_label, from_output, to_label, to_input,
);
taps.push(h);
// r_raw y w2_raw pasaron a manos del tokio task. No los
// cerramos en el padre.
} else {
// Sin tap, el read del productor va directo al stdin del
// siguiente consumidor.
next_pending = Some(r_raw);
}
}
let stdio = ChildStdio {
stdin_fd,
stdout_fd,
stderr_fd: None,
};
// Incarnator absorbe los fds de `stdio` — no los cerramos acá.
// `parent_closes` queda obsoleto.
let _ = parent_closes;
let outcome = incarnator
.incarnate_with(&card, stdio)
.map_err(CoreError::Incarnate)?;
let pid = outcome.pid;
pids.push((node.label.clone(), pid.as_raw()));
debug!(label = %node.label, pid = pid.as_raw(), "node incarnated");
pending_stdin_for_next = next_pending;
}
let pipeline_id = Ulid::new();
let mut edge_discernments = Vec::with_capacity(taps.len());
for t in taps {
match t.handle.await {
Ok(d) => edge_discernments.push(d),
Err(e) => warn!(?e, "tap handle joined with error"),
}
}
Ok(PipelineLaunch {
pipeline: pipeline_id,
command_pids: pids,
edge_discernments,
})
}
struct TapHandle {
handle: tokio::task::JoinHandle<EdgeDiscernment>,
}
#[allow(clippy::too_many_arguments)]
fn spawn_tap(
producer_r_fd: RawFd,
consumer_w_fd: RawFd,
sample_bytes: usize,
discerner: Arc<DiscernPipeline>,
from_label: String,
from_output: String,
to_label: String,
to_input: String,
) -> TapHandle {
// Marcar non-blocking ANTES de envolverlos en AsyncFd. Sino tokio
// bloquea el reactor en operaciones lentas.
set_nonblocking(producer_r_fd);
set_nonblocking(consumer_w_fd);
let handle = tokio::spawn(async move {
// SAFETY: el caller transfiere ownership de los fds al task.
let r_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(producer_r_fd) };
let w_std = unsafe { std::os::fd::OwnedFd::from_raw_fd_compat(consumer_w_fd) };
let r = AsyncFd::with_interest(r_std, Interest::READABLE).expect("AsyncFd r");
let w = AsyncFd::with_interest(w_std, Interest::WRITABLE).expect("AsyncFd w");
let mut sample: Vec<u8> = Vec::with_capacity(sample_bytes);
let mut buf = [0u8; 4096];
let mut total: u64 = 0;
// Fase 1: sampling + pump.
let mut eof = false;
while !eof && sample.len() < sample_bytes {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
Ok(n) => n,
Err(e) => { warn!(?e, "tap producer read failed"); break; }
};
if n == 0 { break; }
let take = n.min(sample_bytes - sample.len());
sample.extend_from_slice(&buf[..take]);
if let Err(e) = async_write_all(&w, &buf[..n]).await {
warn!(?e, "tap consumer write failed");
break;
}
total += n as u64;
}
let d = discerner.discern(&sample, &Hint { path: None, size_total: None });
// Fase 2: pump-only hasta EOF.
while !eof {
let n = match async_read(&r, &mut buf).await {
Ok(0) => { eof = true; 0 }
Ok(n) => n,
Err(_) => break,
};
if n == 0 { break; }
if async_write_all(&w, &buf[..n]).await.is_err() { break; }
total += n as u64;
}
debug!(bytes = total, "tap finished");
EdgeDiscernment {
from_label,
from_output,
to_label,
to_input,
discernment: d,
}
});
TapHandle { handle }
}
async fn async_read(
afd: &AsyncFd<std::os::fd::OwnedFd>,
buf: &mut [u8],
) -> std::io::Result<usize> {
loop {
let mut guard = afd.readable().await?;
let fd = afd.as_raw_fd();
// SAFETY: lectura sobre fd válido propiedad del AsyncFd.
let r = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if r >= 0 {
return Ok(r as usize);
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
guard.clear_ready();
continue;
}
return Err(err);
}
}
async fn async_write_all(
afd: &AsyncFd<std::os::fd::OwnedFd>,
mut buf: &[u8],
) -> std::io::Result<()> {
while !buf.is_empty() {
let mut guard = afd.writable().await?;
let fd = afd.as_raw_fd();
// SAFETY: escritura sobre fd válido propiedad del AsyncFd.
let r = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
if r > 0 {
buf = &buf[r as usize..];
continue;
}
if r == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"write 0",
));
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
guard.clear_ready();
continue;
}
return Err(err);
}
Ok(())
}
fn set_nonblocking(fd: RawFd) {
// SAFETY: fcntl con F_SETFL es seguro para fds válidos.
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL, 0);
if flags >= 0 {
libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
}
}
// Extension trait para abstraer la API de OwnedFd entre versiones (compat).
trait OwnedFdFromRawCompat: Sized {
unsafe fn from_raw_fd_compat(fd: RawFd) -> Self;
}
impl OwnedFdFromRawCompat for std::os::fd::OwnedFd {
unsafe fn from_raw_fd_compat(fd: RawFd) -> Self {
use std::os::fd::FromRawFd;
// SAFETY: el caller transfiere ownership de `fd` a la `OwnedFd`.
unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) }
}
}
// Re-export para que el unused warning del AsRawFd se calle si no se usa.
#[allow(dead_code)]
fn _keep_raw(_: &dyn AsRawFd) {}
#[cfg(test)]
mod tests {
use super::*;
use brahman_card::Payload;
use ente_incarnate::IncarnatorConfig;
use shipote_card::{CommandRef, DiscernPolicy, FlowEdge, PipelineSpec, WorkspaceId};
fn cmd(label: &str, exec: &str, argv: &[&str]) -> CommandRef {
CommandRef {
label: label.into(),
payload: Payload::Native {
exec: exec.into(),
argv: argv.iter().map(|s| s.to_string()).collect(),
envp: vec![],
},
soma: Default::default(),
flows: Default::default(),
supervision: brahman_card::Supervision::OneShot,
}
}
#[tokio::test]
async fn pipeline_isolated_echo_to_cat_runs() {
let spec = PipelineSpec {
label: "echo-cat".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["hola pipeline aislado"]),
cmd("p2", "/bin/cat", &[]),
],
edges: vec![FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
}],
discern: DiscernPolicy::default(),
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", false, disc, inc).await.unwrap();
assert_eq!(launch.command_pids.len(), 2);
// Cosecha.
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
#[tokio::test]
async fn pipeline_isolated_with_tap_captures_discernment() {
let spec = PipelineSpec {
label: "json-cat".into(),
workspace: WorkspaceId::new(),
nodes: vec![
cmd("p1", "/bin/echo", &["{\"hello\": 1}"]),
cmd("p2", "/bin/cat", &[]),
],
edges: vec![FlowEdge {
from: 0,
from_output: "stdout".into(),
to: 1,
to_input: "stdin".into(),
}],
discern: DiscernPolicy {
sample_bytes: 4096,
enrich_producer: true,
},
};
let disc = Arc::new(DiscernPipeline::default_pipeline());
let inc = Arc::new(Incarnator::new(IncarnatorConfig::default()));
let launch = run_pipeline(&spec, "ws", true, disc, inc).await.unwrap();
assert_eq!(launch.edge_discernments.len(), 1);
let d = &launch.edge_discernments[0];
let dis = d.discernment.as_ref().expect("discernment present");
assert_eq!(dis.mime.as_deref(), Some("application/json"));
// Cosecha.
for (_, pid) in &launch.command_pids {
let _ = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(*pid), None);
}
}
}