36dac00c8d
Pipeline runtime:
- Fan-out 1→N (splitter task replica al N consumers) y fan-in N→1 (merger
task con mpsc + reader-per-input). DAGs no lineales soportados.
- Flow channels: Unix socket + tokio broadcast con replay buffer
configurable por pipeline (DiscernPolicy.replay_chunks). Subscribers
externos vía `shipote flow tail <socket>`.
- Templating en specs con `${KEY}` (CLI `--var KEY=VALUE`). Walk
recursivo sobre serde_json::Value, soporta todos los strings del schema.
- Pipelines guardados (`pipeline save/saved-list/drop/run-saved`)
persisten con el snapshot.
Lifecycle de comandos:
- Log capture per-stream (stdout/stderr separados) via pipe O_CLOEXEC +
AsyncFd. CLI `shipote logs <ws> <cmd> --stream {stdout,stderr,both}`.
- Stop graceful con tiempo configurable: SIGTERM → grace → SIGKILL.
Tanto a nivel workspace como pipeline individual.
- TTL auto-stop ya existente (Fase C) sigue funcionando.
ente-incarnate:
- ChildStdio declarativo (Fase C) + ChildPreExec declarativo nuevo:
NoNewPrivs, ParentDeathSig, Dumpable, NewSession, Chdir, Umask.
- Aplicación pre-execve async-signal-safe en ambos paths (plain via
Command::pre_exec, namespaced via callback del clone(2)).
Observabilidad:
- WorkspaceStats: RSS + RSS peak (VmHWM o memory.peak cgroup) + CPU usec
+ uptime. Fuente per-proc o cgroup según delegation.
- shipote-shell con sparkline ASCII por workspace (history cap 24),
card de flow channels activos, vista de comandos + saved pipelines.
- Tap → broker: cada edge enriquecido con TypeRef se anuncia como Card
efímera vía SidecarPool (graceful si broker no corre).
Discern:
- Integrado en yahweh-provider-fs (mime_type en EntityNode).
- Integrado en nouser-core::cluster::pick_lens como fallback cuando la
extensión cae a Lens::Grid.
79 tests pasan: ente-incarnate (16), nouser-core (27), shipote-card (8),
shipote-core (20), shipote-discern (5), yahweh-provider-fs (3).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
61 lines
2.0 KiB
Rust
61 lines
2.0 KiB
Rust
//! Path simple: spawn directo, sin namespacing.
|
|
|
|
use crate::env::{build_env, EnvSpec};
|
|
use crate::error::IncarnateError;
|
|
use crate::pre_exec::{apply_unchecked, ChildSetup};
|
|
use crate::ChildStdio;
|
|
use brahman_card::{Card, Payload};
|
|
use nix::unistd::Pid;
|
|
use std::os::fd::FromRawFd;
|
|
use std::os::unix::process::CommandExt;
|
|
use std::process::{Command, Stdio};
|
|
|
|
pub fn incarnate_plain(
|
|
card: &Card,
|
|
env_spec: &EnvSpec,
|
|
stdio: &ChildStdio,
|
|
setup: &ChildSetup,
|
|
) -> Result<Pid, IncarnateError> {
|
|
let (exec, argv, base_envp) = match &card.payload {
|
|
Payload::Native { exec, argv, envp } => (exec.clone(), argv.clone(), envp.clone()),
|
|
Payload::Legacy { exec, argv, .. } => (exec.clone(), argv.clone(), Vec::new()),
|
|
_ => return Err(IncarnateError::NonExecutablePayload),
|
|
};
|
|
let env = build_env(card, &base_envp, env_spec);
|
|
let mut cmd = Command::new(&exec);
|
|
cmd.args(&argv);
|
|
cmd.env_clear();
|
|
for (k, v) in &env {
|
|
cmd.env(k, v);
|
|
}
|
|
if let Some(fd) = stdio.stdin_fd {
|
|
// SAFETY: el caller garantiza que `fd` está abierto y le
|
|
// transfiere ownership al child. `Command` lo cierra tras spawn.
|
|
cmd.stdin(unsafe { Stdio::from_raw_fd(fd) });
|
|
}
|
|
if let Some(fd) = stdio.stdout_fd {
|
|
cmd.stdout(unsafe { Stdio::from_raw_fd(fd) });
|
|
}
|
|
if let Some(fd) = stdio.stderr_fd {
|
|
cmd.stderr(unsafe { Stdio::from_raw_fd(fd) });
|
|
}
|
|
if !setup.is_empty() {
|
|
// Clone para que la closure sea 'static (Command::pre_exec lo exige).
|
|
let ops = setup.ops.clone();
|
|
// SAFETY: pre_exec corre post-fork pre-exec. apply_unchecked sólo
|
|
// hace syscalls async-signal-safe.
|
|
unsafe {
|
|
cmd.pre_exec(move || {
|
|
let r = apply_unchecked(&ops);
|
|
if r != 0 {
|
|
Err(std::io::Error::from_raw_os_error(libc::EINVAL))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
});
|
|
}
|
|
}
|
|
let child = cmd.spawn()?;
|
|
Ok(Pid::from_raw(child.id() as i32))
|
|
}
|