Files
brahman/crates/modules/shipote/docs/RECIPES.md
T
sergio d962fe4601 docs(shipote): README + 4 docs en docs/ (ARCHITECTURE, CLI, RECIPES, DEVELOPMENT)
- README.md en crates/modules/shipote/ como entry point.
- docs/ARCHITECTURE.md — 11 crates, capas, decisiones (O_CLOEXEC,
  dirty AtomicBool, pipeline restart entero, etc.) + snapshot versioning.
- docs/CLI.md — referencia comando por comando, flags, env vars.
- docs/RECIPES.md — specs TOML para workspaces y pipelines típicos.
- docs/DEVELOPMENT.md — compilar, correr daemon/shell/CLI, tests,
  smoke E2E manual, debugging FDs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 17:10:44 +00:00

5.6 KiB

Recetas de specs

Specs TOML para casos comunes. Todas asumen que <WS> ya existe (creado con shipote workspace create <ws-spec>).

Workspaces

Workspace mínimo

label = "demo"
on_exit = "reap"

Workspace con TTL (auto-stop tras N ms)

label = "ephemeral"
on_exit = "reap"
ttl = 30000  # 30 s

Workspace con rlimits (sólo accounting)

label = "bounded"
on_exit = "reap"

[soma.rlimits]
mem_bytes = 10485760    # 10 MiB — visible en `shipote workspace quota`
nproc = 4

Workspace con enforcement automático

label = "strict"
on_exit = "reap"

[soma.rlimits]
mem_bytes = 5242880
nproc = 2

[quota_enforce]
mem = "kill"            # None | Log | Kill
nproc = "kill"

Workspace con cgroup delegado (kernel enforces)

label = "cgroup-enforced"
on_exit = "reap"

[soma.rlimits]
mem_bytes = 10485760
nproc = 4

[soma.cgroup]
path = "shipote/bounded"   # bajo $cgroup_actual/shipote/bounded

Requiere cgroup_delegated: true en shipote caps. Sino el accounting funciona pero el kernel no enforces.

Workspace con namespacing real

label = "isolated"
on_exit = "reap"

[soma.namespaces]
user = true
pid = true
mount = true
net = false
uts = false
ipc = false
cgroup = false

[soma.rlimits]
mem_bytes = 0
nproc = 0
nofile = 0

[soma.cgroup]
path = ""

Requiere user_ns: Allowed (o cap_sys_admin: true).

Pipelines

Pipeline lineal con tap (data plane)

label = "echo-cat"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }

[[nodes]]
label = "producer"
payload.Native = { exec = "/bin/echo", argv = ['{"hello": 1}'], envp = [] }

[[nodes]]
label = "consumer"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }

[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"

Run:

shipote pipeline run echo-cat.toml --tap
# imprime: edge ty=json mime=application/json conf=0.95

Pipeline con fan-out (1 → N)

label = "broadcast"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }

[[nodes]]
label = "src"
payload.Native = { exec = "/bin/echo", argv = ["mensaje compartido"], envp = [] }

[[nodes]]
label = "wc-c"
payload.Native = { exec = "/usr/bin/wc", argv = ["-c"], envp = [] }

[[nodes]]
label = "wc-l"
payload.Native = { exec = "/usr/bin/wc", argv = ["-l"], envp = [] }

[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"

[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"

Pipeline con fan-in (N → 1)

label = "merge"
workspace = "<WS>"

[[nodes]]
label = "p1"
payload.Native = { exec = "/bin/echo", argv = ["from-p1"], envp = [] }

[[nodes]]
label = "p2"
payload.Native = { exec = "/bin/echo", argv = ["from-p2"], envp = [] }

[[nodes]]
label = "merge-sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }

[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"

[[edges]]
from = 1
from_output = "stdout"
to = 2
to_input = "stdin"

Pipeline con replay y rate-limit

label = "throttled"
workspace = "<WS>"

[discern]
sample_bytes = 4096
enrich_producer = true
replay_chunks = 32        # default
replay_bytes = 65536      # cap adicional por bytes (0 = sólo chunks)
max_bytes_per_sec = 1024  # token-bucket con burst de 1s

[[nodes]]
label = "fast"
payload.Native = { exec = "/bin/sh", argv = ["-c", "for i in 1 2 3 4 5; do echo line-$i; done"], envp = [] }

[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }

[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"

Pipeline supervisado (restart on failure con backoff)

label = "supervised"
workspace = "<WS>"
restart_on_failure = true
restart_backoff_ms = 200       # inicial; escala x2
restart_max_backoff_ms = 30000 # cap
restart_max = 5                # 0 = infinito

[[nodes]]
label = "flaky"
payload.Native = { exec = "/bin/false", argv = [], envp = [] }

Después de 5 restarts (/bin/false siempre exit=1), el daemon loguea restart_max reached — giving up y el supervisor se descarta.

Pipeline con templating

Spec con placeholders:

label = "tmpl-${VARIANT}"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }

[[nodes]]
label = "gen-${VARIANT}"
payload.Native = { exec = "/bin/echo", argv = ["greeting from ${VARIANT}"], envp = [] }

[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }

[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"

Run con vars:

shipote pipeline run tmpl.toml --var VARIANT=alpha
shipote pipeline run tmpl.toml --var VARIANT=beta

Variables sin match quedan intactas (útil para detectar olvidos).

Subscribers externos

Tail directo a un flow socket

shipote pipeline run mypipe.toml --tap &
sleep 0.3
SOCK=$(shipote flow list | grep shipote-flow | xargs)
shipote flow tail "$SOCK"

Si conectás tarde, el replay buffer te entrega los últimos N chunks (según replay_chunks y replay_bytes del spec).

Modo live-tail integrado

shipote pipeline run mypipe.toml --tail
# vuelca el primer flow_socket a stdout hasta que el productor termine.

Combinatorias útiles

Workspace con cleanup automático

label = "burst-and-die"
on_exit = "reap"
ttl = 10000  # auto-stop a los 10s

[soma.rlimits]
mem_bytes = 5242880

Pipeline JSON-aware con discern enriched

  • Producer escribe JSON.
  • Discern detecta application/json con confidence 0.95.
  • Card efímera anunciada al broker (si está corriendo): shipote.flow.<id>.<from>.<output>.json.
  • Subscribers downstream pueden filtrar por TypeRef en el broker.