d962fe4601
- README.md en crates/modules/shipote/ como entry point. - docs/ARCHITECTURE.md — 11 crates, capas, decisiones (O_CLOEXEC, dirty AtomicBool, pipeline restart entero, etc.) + snapshot versioning. - docs/CLI.md — referencia comando por comando, flags, env vars. - docs/RECIPES.md — specs TOML para workspaces y pipelines típicos. - docs/DEVELOPMENT.md — compilar, correr daemon/shell/CLI, tests, smoke E2E manual, debugging FDs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
5.6 KiB
5.6 KiB
Recetas de specs
Specs TOML para casos comunes. Todas asumen que <WS> ya existe (creado con shipote workspace create <ws-spec>).
Workspaces
Workspace mínimo
label = "demo"
on_exit = "reap"
Workspace con TTL (auto-stop tras N ms)
label = "ephemeral"
on_exit = "reap"
ttl = 30000 # 30 s
Workspace con rlimits (sólo accounting)
label = "bounded"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 10485760 # 10 MiB — visible en `shipote workspace quota`
nproc = 4
Workspace con enforcement automático
label = "strict"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 5242880
nproc = 2
[quota_enforce]
mem = "kill" # None | Log | Kill
nproc = "kill"
Workspace con cgroup delegado (kernel enforces)
label = "cgroup-enforced"
on_exit = "reap"
[soma.rlimits]
mem_bytes = 10485760
nproc = 4
[soma.cgroup]
path = "shipote/bounded" # bajo $cgroup_actual/shipote/bounded
Requiere
cgroup_delegated: trueenshipote caps. Sino el accounting funciona pero el kernel no enforces.
Workspace con namespacing real
label = "isolated"
on_exit = "reap"
[soma.namespaces]
user = true
pid = true
mount = true
net = false
uts = false
ipc = false
cgroup = false
[soma.rlimits]
mem_bytes = 0
nproc = 0
nofile = 0
[soma.cgroup]
path = ""
Requiere
user_ns: Allowed(ocap_sys_admin: true).
Pipelines
Pipeline lineal con tap (data plane)
label = "echo-cat"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "producer"
payload.Native = { exec = "/bin/echo", argv = ['{"hello": 1}'], envp = [] }
[[nodes]]
label = "consumer"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
Run:
shipote pipeline run echo-cat.toml --tap
# imprime: edge ty=json mime=application/json conf=0.95
Pipeline con fan-out (1 → N)
label = "broadcast"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "src"
payload.Native = { exec = "/bin/echo", argv = ["mensaje compartido"], envp = [] }
[[nodes]]
label = "wc-c"
payload.Native = { exec = "/usr/bin/wc", argv = ["-c"], envp = [] }
[[nodes]]
label = "wc-l"
payload.Native = { exec = "/usr/bin/wc", argv = ["-l"], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"
Pipeline con fan-in (N → 1)
label = "merge"
workspace = "<WS>"
[[nodes]]
label = "p1"
payload.Native = { exec = "/bin/echo", argv = ["from-p1"], envp = [] }
[[nodes]]
label = "p2"
payload.Native = { exec = "/bin/echo", argv = ["from-p2"], envp = [] }
[[nodes]]
label = "merge-sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 2
to_input = "stdin"
[[edges]]
from = 1
from_output = "stdout"
to = 2
to_input = "stdin"
Pipeline con replay y rate-limit
label = "throttled"
workspace = "<WS>"
[discern]
sample_bytes = 4096
enrich_producer = true
replay_chunks = 32 # default
replay_bytes = 65536 # cap adicional por bytes (0 = sólo chunks)
max_bytes_per_sec = 1024 # token-bucket con burst de 1s
[[nodes]]
label = "fast"
payload.Native = { exec = "/bin/sh", argv = ["-c", "for i in 1 2 3 4 5; do echo line-$i; done"], envp = [] }
[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
Pipeline supervisado (restart on failure con backoff)
label = "supervised"
workspace = "<WS>"
restart_on_failure = true
restart_backoff_ms = 200 # inicial; escala x2
restart_max_backoff_ms = 30000 # cap
restart_max = 5 # 0 = infinito
[[nodes]]
label = "flaky"
payload.Native = { exec = "/bin/false", argv = [], envp = [] }
Después de 5 restarts (/bin/false siempre exit=1), el daemon loguea restart_max reached — giving up y el supervisor se descarta.
Pipeline con templating
Spec con placeholders:
label = "tmpl-${VARIANT}"
workspace = "<WS>"
discern = { sample_bytes = 4096, enrich_producer = true }
[[nodes]]
label = "gen-${VARIANT}"
payload.Native = { exec = "/bin/echo", argv = ["greeting from ${VARIANT}"], envp = [] }
[[nodes]]
label = "sink"
payload.Native = { exec = "/bin/cat", argv = [], envp = [] }
[[edges]]
from = 0
from_output = "stdout"
to = 1
to_input = "stdin"
Run con vars:
shipote pipeline run tmpl.toml --var VARIANT=alpha
shipote pipeline run tmpl.toml --var VARIANT=beta
Variables sin match quedan intactas (útil para detectar olvidos).
Subscribers externos
Tail directo a un flow socket
shipote pipeline run mypipe.toml --tap &
sleep 0.3
SOCK=$(shipote flow list | grep shipote-flow | xargs)
shipote flow tail "$SOCK"
Si conectás tarde, el replay buffer te entrega los últimos N chunks (según replay_chunks y replay_bytes del spec).
Modo live-tail integrado
shipote pipeline run mypipe.toml --tail
# vuelca el primer flow_socket a stdout hasta que el productor termine.
Combinatorias útiles
Workspace con cleanup automático
label = "burst-and-die"
on_exit = "reap"
ttl = 10000 # auto-stop a los 10s
[soma.rlimits]
mem_bytes = 5242880
Pipeline JSON-aware con discern enriched
- Producer escribe JSON.
- Discern detecta
application/jsoncon confidence 0.95. - Card efímera anunciada al broker (si está corriendo):
shipote.flow.<id>.<from>.<output>.json. - Subscribers downstream pueden filtrar por TypeRef en el broker.