docs(shipote): README + 4 docs en docs/ (ARCHITECTURE, CLI, RECIPES, DEVELOPMENT)

- README.md en crates/modules/shipote/ como entry point.
- docs/ARCHITECTURE.md — 11 crates, capas, decisiones (O_CLOEXEC,
  dirty AtomicBool, pipeline restart entero, etc.) + snapshot versioning.
- docs/CLI.md — referencia comando por comando, flags, env vars.
- docs/RECIPES.md — specs TOML para workspaces y pipelines típicos.
- docs/DEVELOPMENT.md — compilar, correr daemon/shell/CLI, tests,
  smoke E2E manual, debugging FDs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-11 17:10:44 +00:00
parent a9124449f9
commit d962fe4601
5 changed files with 888 additions and 0 deletions
+158
View File
@@ -0,0 +1,158 @@
# Arquitectura de shipote
## Diagrama de capas
```
┌──────────────────────────────────────────────────────────────────┐
│ Apps (binarios) │
│ │
│ shipote-daemon shipote (CLI) shipote-shell │
│ crates/apps/... crates/apps/... crates/apps/... │
└─────────┬─────────────────────┬──────────────────────┬───────────┘
│ │ │
│ postcard frames (length-prefixed)
│ │ │
└─────────────────────┴──────────────────────┘
shipote-protocol — wire types
crates/modules/shipote/shipote-protocol
┌──────────────────────────────────────────────────────────────────┐
│ shipote-core — runtime │
│ crates/modules/shipote/shipote-core │
│ │
│ WorkspaceManager (Arc<Mutex<Inner>>) │
│ ├ workspaces: HashMap<WorkspaceId, WorkspaceState> │
│ ├ saved_pipelines, pipeline_flows, pipeline_supervisors │
│ ├ restart_specs, pending_pipeline_restarts │
│ └ dirty: AtomicBool (snapshot incremental) │
│ │
│ Modules: │
│ - pipeline.rs (run_pipeline, splitter, merger, TokenBucket) │
│ - flow_channel.rs (Unix socket + tokio::broadcast + replay) │
│ - logbuf.rs (ring buffer 64 KiB para stdout/stderr) │
│ - stats.rs (WorkspaceStats: RSS/CPU/peak/cores) │
│ - persist.rs (ShipoteSnapshot v4 con JSON atomic write) │
└──────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ Soporte compartido │
│ │
│ shipote-card shipote-discern │
│ - WorkspaceSpec - DiscernPipeline │
│ - PipelineSpec - MagicBytes, JsonProbe, etc. │
│ - CommandRef - Detecta brahman:card │
│ - DiscernPolicy │
│ │
│ ente-incarnate (crates/shared/) — extraído de ente-soma │
│ - Incarnator { caps, env, stdio, pre_exec } │
│ - CapabilitySet::detect (user_ns, cgroup, kernel version) │
│ - ChildStdio, ChildSetup (NoNewPrivs, Chdir, ...) │
│ - clone(2) + namespaces + cgroup move (post-clone setup) │
└──────────────────────────────────────────────────────────────────┘
brahman-card · ente-incarnate · nix · libc
```
## Crates (11 nuevos)
### Core del runtime
- **`crates/shared/ente-incarnate`** — rutina extraída del Init para encarnar Cards en procesos aislados. Reusable por cualquier supervisor. NO requiere ser PID 1.
- **`crates/modules/shipote/shipote-card`** — tipos: `WorkspaceSpec`, `PipelineSpec`, `CommandRef`, `FlowEdge`, `DiscernPolicy`, `QuotaEnforcement`. Compilan a `brahman_card::Card`.
- **`crates/modules/shipote/shipote-protocol`** — wire postcard length-prefixed sobre Unix socket en `$XDG_RUNTIME_DIR/shipote.sock`.
- **`crates/modules/shipote/shipote-discern`** — `DiscernPipeline` + discerners default. Reusado por `yahweh-provider-fs` y `nouser-core::cluster`.
- **`crates/modules/shipote/shipote-core`** — `WorkspaceManager` + módulos `pipeline`, `flow_channel`, `logbuf`, `persist`, `stats`.
### Binarios
- **`crates/apps/shipote-daemon`** — long-running. Escucha admin socket, ejecuta el dispatch, reaper periódico.
- **`crates/apps/shipote-cli`** — `shipote` (clap). Para administración + scripting.
- **`crates/apps/shipote-shell`** — GUI con `yahweh_launcher`. Dashboard: estado, workspaces, comandos, flows, quotas, sparkline.
### Consumidores del discerner (externos a shipote pero usándolo)
- **`crates/modules/ui_engine/libs/providers/fs`** — `FileDataProvider` ahora puebla `mime_type` con `DiscernPipeline`.
- **`crates/modules/nouser/core/src/cluster.rs`** — `pick_lens` usa el discerner como fallback cuando la extensión cae a `Lens::Grid`.
## Modelo conceptual
### Workspace
Espacio aislado raíz. Una **Card con `kind: Ente`, `payload: Virtual`** que aloja N comandos hijos. Su `SomaSpec` define el sandbox compartido.
```toml
label = "demo"
on_exit = "reap"
ttl = 60000 # opcional, ms
[soma.rlimits]
mem_bytes = 10485760 # 10 MiB
nproc = 4
[soma.cgroup]
path = "shipote/demo"
[quota_enforce]
mem = "kill" # None | Log | Kill
```
### CommandRef
Un comando dentro del workspace. Su `SomaSpec` se **intersecta** con el del padre (OR de namespaces, MIN de rlimits).
### PipelineSpec
DAG de `CommandRef` con edges tipados. Soporta:
- **1→1**: pipe directo (sin task intermedio).
- **1→N (fan-out)**: splitter task replica.
- **N→1 (fan-in)**: merger task con mpsc (una sub-task por reader → channel → escribir al consumer.stdin).
### Flow channel (data plane)
Unix socket en `$XDG_RUNTIME_DIR/shipote-flow-<pipeline_id>-<edge_idx>.sock`. Cada subscriber recibe primero el replay (cap por chunks y/o bytes), después el broadcast live. Tokio `broadcast::Sender<Arc<Vec<u8>>>` — zero-copy entre subscribers.
### Discerner
Pipeline ordenado: `MagicBytes``CardProbe``JsonProbe``TomlProbe``Utf8Probe`. Cada uno devuelve `Discernment { ty: TypeRef, confidence, mime, lens }`.
## Decisiones clave de arquitectura
### Por qué `ente-incarnate` es separado
La rutina de aislamiento (clone+ns+cgroup+rlimits) vivía dentro del Init (`ente-soma`). Era global y atada a PID 1. La separamos en Fase 0 para que shipote (y futuros supervisores) puedan reusarla sin implicar privilegio.
Hoy `ente-soma` es un wrapper de ~30 líneas sobre `ente-incarnate` que preserva la semántica histórica (`set_bus_sock` global + `strict_caps=false`). `ente-zero` sigue funcionando intocado.
### Por qué pipeline-level restart relanza el pipeline ENTERO
Los pipes intermedios de un comando que muere ya están cerrados. Restart parcial no funciona. La identidad ULID del pipeline cambia entre intentos; `restart_count` y `current_backoff_ms` se preservan en el supervisor.
### Por qué replay buffer guarda ANTES del broadcast
Si guardas después, un subscriber que conecta entre `broadcast::send` y `replay.push_back` puede perder ese chunk. El orden importa: replay → broadcast.
### Por qué `pipe2(O_CLOEXEC)` siempre
Sin `O_CLOEXEC`, el siguiente comando del pipeline hereda copias del write-end del pipe anterior. El read-end nunca ve EOF y el consumidor cuelga. Bug encontrado y arreglado en Fase F.
### Por qué AsyncFd + non-blocking en taps/mergers
`tokio::fs::File::from_std` con un FD blocking bloquea el reactor entero. La solución: `fcntl(F_SETFL, O_NONBLOCK)` antes de envolver en `AsyncFd<OwnedFd>`. Patrón usado en splitter, merger y log drainer.
### Por qué token-bucket real reemplazó el sleep simple
El sleep `chunk_size / rate` era uniforme y no permitía burst legítimo. `TokenBucket` con refill por wall time + `capacity=rate_bps` (1s burst) permite burst real y throttle suave.
### Por qué `dirty: AtomicBool` para snapshot incremental
Cada SIGTERM hacía full re-serialize aunque no hubiera cambios. `Ordering::Relaxed` está bien porque el peor caso es un save innecesario. `restore_snapshot` resetea dirty=false (acabamos de hidratar, no es mutation).
### Por qué auth con SO_PEERCRED y `target: "audit"`
SO_PEERCRED es automático en Unix sockets — leer `ucred.uid` del peer y comparar con el propio uid. Sin grupos, sin caps. `SHIPOTE_TRUST_ANYONE=1` es el escape hatch.
`target: "audit"` separa el log de mutaciones del log normal. Filtrable con `EnvFilter`: `SHIPOTE_LOG=warn,audit=info`.
## Versión de snapshot
| Versión | Campos | Forward-compat |
|---------|--------|----------------|
| v1 | `workspaces` | ✓ |
| v2 | + `saved_pipelines` | ✓ (default vacío) |
| v3 | + `live_pipelines` | ✓ (default vacío) |
| v4 | + `stats_history` per workspace (cap 16) | ✓ (default vacío) |
`SNAPSHOT_VERSION = 4`. Reader rechaza versiones > 4 (forward-incompat). Todos los campos nuevos usan `#[serde(default)]`.
## Wire protocol
Todos los mensajes son enums tagged externamente, serializados con `postcard`. Framing: u32 BE length-prefix + payload. Max frame: 1 MiB.
### Requests
- Reads: `Ping`, `Health`, `Capabilities`, `WorkspaceList`, `WorkspaceStats`, `WorkspaceStatsHistory`, `WorkspaceFullSummary`, `WorkspaceQuota`, `CommandList`, `CommandLogs`, `PipelineSavedList`, `FlowList`, `FlowThroughput`, `Discern`.
- Mutaciones (loguean audit): `WorkspaceCreate`, `WorkspaceStop`, `Run`, `PipelineRun`, `PipelineRunSaved`, `PipelineStop`, `PipelineSave`, `PipelineDrop`, `FlowDrop`.
+134
View File
@@ -0,0 +1,134 @@
# Referencia CLI — `shipote`
El binario `shipote` (compilado por `cargo build -p shipote-cli`) se conecta al daemon vía Unix socket. Por default usa `$XDG_RUNTIME_DIR/shipote.sock`; override con `--socket <path>`.
```sh
shipote [--socket PATH] <comando> [...]
```
## Comandos globales
### `shipote ping`
Health-check rápido. Imprime `pong` si el daemon responde.
### `shipote health`
Health endpoint estructurado.
```
version: 0.1.0
uptime: 11411 ms
alive_workspaces: 1
alive_commands: 1
alive_pipelines: 0
active_flows: 0
dirty: true
```
### `shipote caps`
Capacidades runtime detectadas: kernel version, `user_ns` status, `cgroup_v2`, `cgroup_delegated`, `cap_sys_admin`.
### `shipote discern <path>`
Discierne ad-hoc un archivo (no requiere workspace). Imprime `ty`, `confidence`, `mime`, `lens`.
## Workspaces
### `shipote workspace create <spec.toml|.json>`
Crea workspace desde un spec. Imprime el ULID asignado. Si el cgroup está delegated y el spec declara rlimits, se aplica `memory.max`/`pids.max`.
### `shipote workspace list`
Lista workspaces vivos.
### `shipote workspace stats <id>`
Resource accounting:
```
commands: 1 alive / 1 total
rss: 2.05 MiB
rss_peak: 58.89 MiB
cpu: 0.300 s
cpu_pct: 98.7 % (24.7% total / 4 cores)
source: proc
uptime: 1002 ms
```
### `shipote workspace quota <id>`
Reporta breaches del `soma.rlimits`. Si `quota_enforce.{mem,nproc}=Kill`, el daemon mata automáticamente al detectar.
### `shipote workspace stop <id> [--grace-ms N]`
Stop graceful. Default `grace_ms=1000`. `0` = SIGKILL inmediato.
## Comandos directos
### `shipote run -w <ws-id> [--restart-on-failure] <exec> -- <argv>...`
One-shot dentro del workspace. Si `--restart-on-failure`, el reaper relanza con backoff exponencial cuando exit != 0.
**Nota**: usar `--` antes de los argv del comando para que clap no los confunda con flags suyos:
```sh
shipote run -w 01... /bin/sh -- -c "echo hola"
```
### `shipote commands <ws-id>`
Lista comandos del workspace (vivos + exited) con pid, status, bytes_log.
### `shipote logs <ws-id> <cmd-id> [--stream {stdout|stderr|both}] [--tail N] [--follow]`
Tail del ring buffer de logs.
- `--stream` default `stdout`. `both` concatena stderr-tras-stdout.
- `--tail N`: últimos N bytes (0 = todo).
- `--follow` (`-f`): poll cada 200ms, imprime suffix nuevo hasta que el comando termine.
## Pipelines
### `shipote pipeline run <spec.toml> [--tap] [--tail] [--var KEY=VAL ...]`
Lanza pipeline.
- `--tap`: crea flow channels (data plane) por cada edge.
- `--tail`: auto-implica `--tap`, suscribe al primer flow socket y vuelca a stdout.
- `--var KEY=VAL` (repetible): sustituye `${KEY}` en el spec antes de lanzar.
### `shipote pipeline stop <pipeline-id> [--grace-ms N]`
Stop selectivo del pipeline (no afecta otros comandos del workspace).
### `shipote pipeline save <name> <spec.toml>`
Persiste el spec bajo `name` (sobrevive restart vía snapshot).
### `shipote pipeline saved-list`
Lista pipelines guardados.
### `shipote pipeline drop <name>`
Elimina un saved pipeline.
### `shipote pipeline run-saved <name> [--tap] [--tail] [--var KEY=VAL ...]`
Ejecuta un pipeline guardado con vars opcionales.
## Flows
### `shipote flow list`
Lista pipelines vivos con sockets activos (uno por edge con `--tap`).
### `shipote flow throughput`
Bytes acumulados + rate por socket.
```
shipote-flow-<ULID>-0.sock 0.1 KiB total 0.07 KiB/s
```
### `shipote flow tail <socket-path>`
Conecta directo al Unix socket y vuelca hasta EOF. Si el splitter tiene replay buffer, lo recibís primero.
### `shipote flow drop <pipeline-id>`
Cierra el data plane de un pipeline (drop de FlowChannels).
## Variables de entorno
- `SHIPOTE_LOG` — filtro `tracing-subscriber` (`info`, `warn`, `audit=info,warn`, ...).
- `SHIPOTE_TRUST_ANYONE=1` — daemon acepta peers con cualquier uid (testing).
- `XDG_RUNTIME_DIR` — dónde se crea el socket admin y los flow sockets.
- `XDG_STATE_HOME` — dónde se guarda el snapshot. Fallback: `$HOME/.local/state/shipote/state.json`.
## Trucos del shell zsh
`shipote run` toma flags propios antes del `--`. Si el comando que ejecutás tiene un argumento que arranca con `-`, usá `--`:
```sh
# Mal (clap intenta parsear `-c` como flag de shipote):
shipote run -w <ws> /bin/sh -c "echo hi"
# Bien:
shipote run -w <ws> /bin/sh -- -c "echo hi"
```
+244
View File
@@ -0,0 +1,244 @@
# Development guide — shipote
Cómo compilar, correr, testear y debuggear shipote desde la raíz del monorepo (`/home/sergio/brahman`).
## Compilar
Todos los crates de shipote y su soporte:
```sh
cargo build -p shipote-daemon -p shipote-cli -p shipote-shell
```
Sólo lo esencial (daemon + cli, sin GUI):
```sh
cargo build -p shipote-daemon -p shipote-cli
```
Workspace completo:
```sh
cargo check --workspace
```
## Correr el daemon
```sh
SHIPOTE_LOG=info ./target/debug/shipote-daemon
```
Filtros útiles:
```sh
SHIPOTE_LOG=warn,shipote_core=info ./target/debug/shipote-daemon # menos ruido
SHIPOTE_LOG=warn,audit=info ./target/debug/shipote-daemon # sólo audit
SHIPOTE_LOG=info,shipote_core::pipeline=debug ./target/debug/shipote-daemon # debug del pipeline
```
El daemon:
- Escucha `$XDG_RUNTIME_DIR/shipote.sock` (fallback `/run/user/$UID/shipote.sock`).
- Restaura snapshot de `$XDG_STATE_HOME/shipote/state.json` (fallback `$HOME/.local/state/shipote/state.json`).
- Relanza pipelines `live` (con `restart_on_failure=true`) automáticamente.
- Reaper cada 500 ms (cosecha zombies + drena pending restarts + chequea quota breaches).
- SIGTERM/SIGINT → snapshot + `stop_with_grace(1s)` de todos los workspaces + exit.
## Correr el shell (GUI)
```sh
cargo run -p shipote-shell
```
Polling cada 2 s al daemon. Si el daemon no está corriendo, muestra "Daemon DOWN" banner rojo. Sparkline de RSS por workspace. Cards: Estado, Capabilities, Workspaces, Comandos, Saved pipelines, Flow channels, Quota breaches, Live tail.
Requiere DISPLAY (X11/Wayland) — no corre en sandbox sin gráfico.
## Correr el CLI
```sh
./target/debug/shipote ping
./target/debug/shipote health
./target/debug/shipote workspace create demo.toml
```
Ver [CLI.md](CLI.md) para referencia completa.
## Tests
Suite completa (85 tests al cierre de Fase R):
```sh
cargo test -p ente-incarnate \
-p shipote-card \
-p shipote-protocol \
-p shipote-discern \
-p shipote-core \
-p yahweh-provider-fs \
-p nouser-core \
--no-fail-fast
```
Breakdown:
| Crate | Tests |
|---|---|
| `ente-incarnate` | 16 (clone+ns, stdio, pre_exec, caps detect, cgroup paths) |
| `nouser-core` | 27 (pre-existentes — discerner integrado sin regresiones) |
| `shipote-card` | 8 (roundtrip TOML/JSON, compile-to-card, intersect_soma) |
| `shipote-core` | 26 (workspaces, pipeline fan-in/out, flow_channel replay, stats history, restart, quota enforce, snapshot dirty-skip, ...) |
| `shipote-discern` | 5 (MagicBytes, JsonProbe, CardProbe, TomlProbe, Utf8Probe) |
| `yahweh-provider-fs` | 3 (discern_head + integración) |
Tests específicos útiles:
```sh
# Fan-in (merger):
cargo test -p shipote-core --lib pipeline_fanin -- --nocapture
# Replay buffer:
cargo test -p shipote-core --lib replay -- --nocapture
# Quota enforcement:
cargo test -p shipote-core --lib quota_enforce -- --nocapture
# Snapshot incremental:
cargo test -p shipote-core --lib save_snapshot_skips
```
> Algunos tests del pipeline (fan-in/fan-out) son tokio-async y necesitan `--test-threads=1` cuando todos los tests corren juntos. La suite por crate ya funciona bien.
## Smoke E2E manual
Demo end-to-end de los features principales:
```sh
# 1. Limpiar estado previo
rm -f $HOME/.local/state/shipote/state.json
pkill -f shipote-daemon
# 2. Arrancar daemon (en background o terminal aparte)
SHIPOTE_LOG=info ./target/debug/shipote-daemon &
sleep 0.3
# 3. Crear workspace
cat > /tmp/ws.toml <<'EOF'
label = "demo"
on_exit = "reap"
EOF
WS=$(./target/debug/shipote workspace create /tmp/ws.toml)
echo "workspace: $WS"
# 4. Health check
./target/debug/shipote health
# 5. Run comando con log capture
./target/debug/shipote run -w $WS /bin/echo -- "hello shipote"
sleep 0.3
CMD=$(./target/debug/shipote commands $WS | head -1 | awk '{print $1}')
./target/debug/shipote logs $WS $CMD
# 6. Pipeline con tap (data plane real)
cat > /tmp/pipe.toml <<EOF
label = "demo-pipe"
workspace = "$WS"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "p1"
payload.Native = { exec = "/bin/echo", argv = ['{"hello": 1}'], envp = [] }
[[nodes]]
label = "p2"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
EOF
./target/debug/shipote pipeline run /tmp/pipe.toml --tap
# 7. Flow throughput
./target/debug/shipote flow throughput
# 8. Stats con CPU%
./target/debug/shipote workspace stats $WS
# 9. SIGTERM (drain + snapshot)
pkill -TERM -f shipote-daemon
```
## Debug del daemon
### Logs del daemon en archivo
```sh
SHIPOTE_LOG=info ./target/debug/shipote-daemon 2>&1 | tee /tmp/shipote-daemon.log
```
### Filtrar audit log
```sh
SHIPOTE_LOG=warn,audit=info ./target/debug/shipote-daemon 2>&1 | grep audit
```
### Verificar caps runtime
```sh
shipote caps
```
Te dice si `user_ns` está bloqueado por sysctl/LSM y si tu cgroup está delegado.
### Inspeccionar el snapshot
```sh
cat $HOME/.local/state/shipote/state.json | jq .
```
JSON con: version, timestamp_ms, workspaces, saved_pipelines, live_pipelines, stats_history persistida.
### Debugging FDs
```sh
ls -la /proc/$(pgrep -x shipote-daemon)/fd | head -20
```
Si ves muchos `pipe:[N]` o `socket:[N]` huérfanos, hay leak en algún spawn.
## Arquitectura del repositorio
```
crates/
├── apps/
│ ├── shipote-daemon/ ← binario long-running
│ ├── shipote-cli/ ← binario `shipote`
│ └── shipote-shell/ ← GUI GPUI
├── modules/shipote/
│ ├── shipote-card/ ← tipos WorkspaceSpec/PipelineSpec/...
│ ├── shipote-protocol/ ← wire postcard
│ ├── shipote-discern/ ← MagicBytes/Json/Toml/Card/Utf8
│ ├── shipote-core/ ← WorkspaceManager + pipeline + flow_channel + ...
│ ├── README.md ← entry point (este archivo es vecino)
│ └── docs/
│ ├── ARCHITECTURE.md
│ ├── CLI.md
│ ├── RECIPES.md
│ └── DEVELOPMENT.md ← estás acá
└── shared/
└── ente-incarnate/ ← extraído de ente-soma; reusable por shipote y otros
```
## Memoria del proyecto
Toda la historia de fases (F a R) está documentada en:
```
/home/sergio/.claude/projects/-home-sergio-brahman/memory/project_shipote.md
```
Esa memoria persiste entre conversaciones con Claude Code. Si arrancás una sesión nueva, Claude la consulta automáticamente.
## Issues conocidos
### El remote `origin` está mal configurado
`https:/sergio:...` con un solo `/`. Para subir los 10 commits locales:
```sh
git remote set-url origin https://sergio:<password>@gitea.gioser.net/sergio/brahman
git push origin main
```
### El daemon no se conecta al broker
Si el Init (`brahman-init.sock`) no está corriendo, el sidecar loguea `no conectado` y el daemon sigue standalone. Esto es **diseñado** (graceful degradation), no un bug. Los announcements de edge-card al broker tampoco llegan en este modo.
### Pipeline restart pierde ULIDs
Cada relaunch genera un pipeline_id nuevo. Trackers externos que dependen del ULID se rompen. Workaround: usar el `label` del pipeline (estable entre restarts).
### `tokio::test` + `waitpid` síncrono
Algunos tests del pipeline necesitan `--test-threads=1` porque el waitpid síncrono dentro de un `current_thread` runtime bloquea el reactor antes de que las tareas spawneadas corran.
+279
View File
@@ -0,0 +1,279 @@
# Recetas de specs
Specs TOML para casos comunes. Todas asumen que `<WS>` ya existe (creado con `shipote workspace create <ws-spec>`).
## Workspaces
### Workspace mínimo
```toml
label = "demo"
on_exit = "reap"
```
### Workspace con TTL (auto-stop tras N ms)
```toml
label = "ephemeral"
on_exit = "reap"
ttl = 30000 # 30 s
```
### Workspace con rlimits (sólo accounting)
```toml
label = "bounded"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 10485760 # 10 MiB — visible en `shipote workspace quota`
nproc = 4
```
### Workspace con enforcement automático
```toml
label = "strict"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 5242880
nproc = 2
[quota_enforce]
mem = "kill" # None | Log | Kill
nproc = "kill"
```
### Workspace con cgroup delegado (kernel enforces)
```toml
label = "cgroup-enforced"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 10485760
nproc = 4
[soma.cgroup]
path = "shipote/bounded" # bajo $cgroup_actual/shipote/bounded
```
> Requiere `cgroup_delegated: true` en `shipote caps`. Sino el accounting funciona pero el kernel no enforces.
### Workspace con namespacing real
```toml
label = "isolated"
on_exit = "reap"
[soma.namespaces]
user = true
pid = true
mount = true
net = false
uts = false
ipc = false
cgroup = false
[soma.rlimits]
mem_bytes = 0
nproc = 0
nofile = 0
[soma.cgroup]
path = ""
```
> Requiere `user_ns: Allowed` (o `cap_sys_admin: true`).
## Pipelines
### Pipeline lineal con tap (data plane)
```toml
label = "echo-cat"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "producer"
payload.Native = { exec = "/bin/echo", argv = ['{"hello": 1}'], envp = [] }
[[nodes]]
label = "consumer"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
```
Run:
```sh
shipote pipeline run echo-cat.toml --tap
# imprime: edge ty=json mime=application/json conf=0.95
```
### Pipeline con fan-out (1 → N)
```toml
label = "broadcast"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "src"
payload.Native = { exec = "/bin/echo", argv = ["mensaje compartido"], envp = [] }
[[nodes]]
label = "wc-c"
payload.Native = { exec = "/usr/bin/wc", argv = ["-c"], envp = [] }
[[nodes]]
label = "wc-l"
payload.Native = { exec = "/usr/bin/wc", argv = ["-l"], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"
```
### Pipeline con fan-in (N → 1)
```toml
label = "merge"
workspace = "<WS>"
[[nodes]]
label = "p1"
payload.Native = { exec = "/bin/echo", argv = ["from-p1"], envp = [] }
[[nodes]]
label = "p2"
payload.Native = { exec = "/bin/echo", argv = ["from-p2"], envp = [] }
[[nodes]]
label = "merge-sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"
[[edges]]
from = 1
from_output = "stdout"
to = 2
to_input = "stdin"
```
### Pipeline con replay y rate-limit
```toml
label = "throttled"
workspace = "<WS>"
[discern]
sample_bytes = 4096
enrich_producer = true
replay_chunks = 32 # default
replay_bytes = 65536 # cap adicional por bytes (0 = sólo chunks)
max_bytes_per_sec = 1024 # token-bucket con burst de 1s
[[nodes]]
label = "fast"
payload.Native = { exec = "/bin/sh", argv = ["-c", "for i in 1 2 3 4 5; do echo line-$i; done"], envp = [] }
[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
```
### Pipeline supervisado (restart on failure con backoff)
```toml
label = "supervised"
workspace = "<WS>"
restart_on_failure = true
restart_backoff_ms = 200 # inicial; escala x2
restart_max_backoff_ms = 30000 # cap
restart_max = 5 # 0 = infinito
[[nodes]]
label = "flaky"
payload.Native = { exec = "/bin/false", argv = [], envp = [] }
```
Después de 5 restarts (`/bin/false` siempre exit=1), el daemon loguea `restart_max reached — giving up` y el supervisor se descarta.
### Pipeline con templating
Spec con placeholders:
```toml
label = "tmpl-${VARIANT}"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "gen-${VARIANT}"
payload.Native = { exec = "/bin/echo", argv = ["greeting from ${VARIANT}"], envp = [] }
[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
```
Run con vars:
```sh
shipote pipeline run tmpl.toml --var VARIANT=alpha
shipote pipeline run tmpl.toml --var VARIANT=beta
```
Variables sin match quedan intactas (útil para detectar olvidos).
## Subscribers externos
### Tail directo a un flow socket
```sh
shipote pipeline run mypipe.toml --tap &
sleep 0.3
SOCK=$(shipote flow list | grep shipote-flow | xargs)
shipote flow tail "$SOCK"
```
Si conectás tarde, el replay buffer te entrega los últimos N chunks (según `replay_chunks` y `replay_bytes` del spec).
### Modo live-tail integrado
```sh
shipote pipeline run mypipe.toml --tail
# vuelca el primer flow_socket a stdout hasta que el productor termine.
```
## Combinatorias útiles
### Workspace con cleanup automático
```toml
label = "burst-and-die"
on_exit = "reap"
ttl = 10000 # auto-stop a los 10s
[soma.rlimits]
mem_bytes = 5242880
```
### Pipeline JSON-aware con discern enriched
- Producer escribe JSON.
- Discern detecta `application/json` con confidence 0.95.
- Card efímera anunciada al broker (si está corriendo): `shipote.flow.<id>.<from>.<output>.json`.
- Subscribers downstream pueden filtrar por TypeRef en el broker.