diff --git a/CHANGELOG.md b/CHANGELOG.md index b2bd633..1e26cf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,49 @@ ratio/diff ver `git show `. ## 2026-05-10 +### feat(brahman-demo): bootstrap script reproducible — broker + producer + consumer + 4 explorers +Iter 22. Cierra el set de iteraciones de hoy: cualquier persona (o +future-me retomando el repo) puede levantar el escenario completo +con un comando. + +Crate nuevo `crates/apps/brahman-demo/` con 3 binarios: +- **`brahman-demo-broker`**: standalone `Server::bind` con un Broker + configurado, escucha en el socket default. Reemplaza a + `ente-zero` para fines de demo (ente-zero pesa kernel surface + + child subreaper + bus + brain + audit; el demo no lo necesita). +- **`brahman-demo-producer`**: registra una Card con `flow.output[demo-stream:json]` + y queda pingueando. +- **`brahman-demo-consumer`**: registra una Card con `flow.input[demo-feed:json]` + (mismo type → matchea con el producer) y queda escuchando + `MatchEvent`s. + +Variables de entorno respetadas en los 3: `BRAHMAN_INIT_SOCKET`, +`BRAHMAN_BROKER_CONTEXT` (sólo broker), `BRAHMAN_DEMO_LABEL/FLOW/TYPE`, +`RUST_LOG`. + +Script nuevo `scripts/bootstrap-demo.sh`: +- Modos: `all` (default — broker + producer + consumer + 4 explorers), + `broker` (sin GUIs, sólo backend), `only` (sólo broker, sin + producer/consumer ni GUIs). +- Cleanup-safe: trap `EXIT INT TERM` mata todos los procesos + spawneados (con SIGTERM grace + SIGKILL fallback) y borra el socket. +- Espera activa hasta 5s a que el socket aparezca antes de spawnear + los siguientes (evita ENOENT en el handshake). +- Logs separados por proceso bajo `$BRAHMAN_DEMO_LOG_DIR` (default + `/tmp/brahman-demo`). Re-invocaciones limpian los logs viejos. +- Re-build automático opcional (comentado por default — asume + `cargo build --workspace` ya hecho). + +Smoke verificado end-to-end (sin DISPLAY, sólo backend): +- Broker arranca, bind del socket OK. +- Consumer conecta, asigna session. +- Producer conecta, asigna session. +- Consumer recibe `MatchEvent { Available, demo-feed ← demo-stream, + via: Exact, pinned: false }` automáticamente — el broker computó + el match y lo pusheó por el push channel. + +Stack tests: brahman-demo (0 unit), workspace verde. + ### feat(brahman-handshake): ListMatches endpoint + timeline en broker-explorer Iter 21. Cierra el loop de observabilidad iniciado en iter 20: ahora se ven no sólo las sesiones registradas sino también qué matches diff --git a/Cargo.lock b/Cargo.lock index 39f68e3..d35c30c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1288,6 +1288,20 @@ dependencies = [ "yahweh-meta-schema", ] +[[package]] +name = "brahman-demo" +version = "0.1.0" +dependencies = [ + "brahman-broker", + "brahman-card", + "brahman-handshake", + "brahman-sidecar", + "tokio", + "tracing", + "tracing-subscriber", + "ulid", +] + [[package]] name = "brahman-handshake" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ee99d5d..ab92a77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ members = [ "crates/apps/nakui-ui", "crates/apps/minga-explorer", "crates/apps/brahman-broker-explorer", + "crates/apps/brahman-demo", ] [workspace.package] diff --git a/crates/apps/brahman-demo/Cargo.toml b/crates/apps/brahman-demo/Cargo.toml new file mode 100644 index 0000000..e3fc6bc --- /dev/null +++ b/crates/apps/brahman-demo/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "brahman-demo" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Demo binaries de brahman: broker standalone + producer/consumer dummy. Pensados para que `scripts/bootstrap-demo.sh` arranque un escenario reproducible donde los 5 explorers ven sesiones, matches, y timeline." + +[dependencies] +brahman-broker = { path = "../../core/brahman-broker" } +brahman-card = { path = "../../core/brahman-card" } +brahman-handshake = { path = "../../core/brahman-handshake" } +brahman-sidecar = { path = "../../shared/brahman-sidecar" } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +ulid = { workspace = true } + +[[bin]] +name = "brahman-demo-broker" +path = "src/bin/broker.rs" + +[[bin]] +name = "brahman-demo-producer" +path = "src/bin/producer.rs" + +[[bin]] +name = "brahman-demo-consumer" +path = "src/bin/consumer.rs" diff --git a/crates/apps/brahman-demo/src/bin/broker.rs b/crates/apps/brahman-demo/src/bin/broker.rs new file mode 100644 index 0000000..d8781e1 --- /dev/null +++ b/crates/apps/brahman-demo/src/bin/broker.rs @@ -0,0 +1,68 @@ +//! `brahman-demo-broker` — broker standalone para demos. +//! +//! ente-zero (PID 1) es el broker "real" pero pesa toneladas +//! (kernel surface, child subreaper, bus, brain, audit, etc). Este +//! binario sólo arma `brahman_handshake::Server::bind` con un broker +//! configurado, escucha en el socket default, y corre forever. +//! +//! Suficiente para que el script de bootstrap (y los 4 explorers) +//! tengan algo a qué conectarse sin necesitar el bootstrap PID 1 +//! completo. +//! +//! Variables de entorno respetadas: +//! - `BRAHMAN_INIT_SOCKET` — path del Unix socket. Default: el +//! resuelto por `brahman_handshake::transport::default_socket_path`. +//! - `BRAHMAN_BROKER_CONTEXT` — context bias del broker (igual que +//! ente-zero); afecta priority_contexts si las Cards lo declaran. +//! - `RUST_LOG` — filtro de tracing (default `info`). + +use std::sync::Arc; + +use brahman_broker::{Broker, BrokerConfig, MatchStrategy}; +use brahman_handshake::server::{Server, ServerConfig}; +use brahman_handshake::transport; +use tokio::sync::Mutex; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> std::io::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok(); + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: context.clone(), + }))); + + let socket = transport::default_socket_path(); + tracing::info!( + socket = %socket.display(), + context = ?context, + "brahman-demo-broker arranca" + ); + + let server = Server::bind( + &socket, + ServerConfig { + init_attached: false, + broker: Some(broker), + net: None, + policy: None, + }, + )?; + + // Loop accept-forever. Cada conexión va a su propia tokio task — + // sesiones independientes, ninguna bloquea a las otras. + loop { + let session = server.accept_one().await?; + tokio::spawn(async move { + if let Err(e) = session.handle().await { + tracing::warn!(?e, "session terminó con error"); + } + }); + } +} diff --git a/crates/apps/brahman-demo/src/bin/consumer.rs b/crates/apps/brahman-demo/src/bin/consumer.rs new file mode 100644 index 0000000..9399416 --- /dev/null +++ b/crates/apps/brahman-demo/src/bin/consumer.rs @@ -0,0 +1,99 @@ +//! `brahman-demo-consumer` — registra una Card con un `flow.input` +//! que matchea al producer demo, se queda escuchando MatchEvents. +//! +//! Variables de entorno: +//! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-consumer`. +//! - `BRAHMAN_DEMO_FLOW` — nombre del flow.input. Default `demo-feed`. +//! - `BRAHMAN_DEMO_TYPE` — type primitive. Default `json` (debe matchear +//! con el producer para que veamos un Match en el broker). + +use std::collections::BTreeSet; +use std::time::Duration; + +use brahman_card::{ + Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, + TypeRef, CARD_SCHEMA_VERSION, +}; +use brahman_handshake::client::Client; +use brahman_handshake::transport; +use ulid::Ulid; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let label = + std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-consumer".to_string()); + let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-feed".into()); + let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into()); + + let card = build_consumer_card(&label, &flow_name, &type_name); + let socket = transport::default_socket_path(); + + tracing::info!( + socket = %socket.display(), + label = %label, + flow = %flow_name, + ty = %type_name, + "demo-consumer conecta y queda escuchando MatchEvents" + ); + + let mut client = Client::connect(&socket, card).await?; + tracing::info!(session = %client.session(), "session asignada"); + + // Drenamos eventos cada 5s. Cada tick los logueamos si hay alguno; + // si no, ping de heartbeat (mismo razonamiento que producer). + loop { + let evt = client.await_event(Duration::from_secs(5)).await?; + match evt { + Some(ev) => tracing::info!(?ev, "MatchEvent recibido"), + None => { + if let Err(e) = client.ping().await { + tracing::warn!(?e, "ping falló tras quiet period — saliendo"); + break; + } + } + } + } + + Ok(()) +} + +fn build_consumer_card(label: &str, flow_name: &str, type_name: &str) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + lineage: None, + label: label.into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + soma: SomaSpec { + cgroup: CgroupSpec { + path: "ente.slice/demo".into(), + cpu_weight: None, + io_weight: None, + }, + namespaces: NamespaceSet::default(), + rlimits: ResourceLimits::default(), + cpu_affinity: None, + }, + payload: Payload::Virtual, + supervision: Supervision::OneShot, + flow: Flows { + input: vec![Flow { + name: flow_name.into(), + ty: TypeRef::Primitive { + name: type_name.into(), + }, + pin_to: None, + }], + output: vec![], + }, + ..Default::default() + } +} diff --git a/crates/apps/brahman-demo/src/bin/producer.rs b/crates/apps/brahman-demo/src/bin/producer.rs new file mode 100644 index 0000000..ec6993d --- /dev/null +++ b/crates/apps/brahman-demo/src/bin/producer.rs @@ -0,0 +1,100 @@ +//! `brahman-demo-producer` — registra una Card con un `flow.output` +//! y se queda pingueando indefinidamente. +//! +//! Sirve para que el broker compute matches con el `consumer` demo +//! y los explorers vean algo en sesiones + timeline. +//! +//! Variables de entorno: +//! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-producer`. +//! - `BRAHMAN_DEMO_FLOW` — nombre del flow.output. Default `demo-stream`. +//! - `BRAHMAN_DEMO_TYPE` — type primitive del flow. Default `json`. + +use std::collections::BTreeSet; +use std::time::Duration; + +use brahman_card::{ + Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, + TypeRef, CARD_SCHEMA_VERSION, +}; +use brahman_handshake::client::Client; +use brahman_handshake::transport; +use ulid::Ulid; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let label = + std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-producer".to_string()); + let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-stream".into()); + let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into()); + + let card = build_producer_card(&label, &flow_name, &type_name); + let socket = transport::default_socket_path(); + + tracing::info!( + socket = %socket.display(), + label = %label, + flow = %flow_name, + ty = %type_name, + "demo-producer conecta y queda registrado" + ); + + let mut client = Client::connect(&socket, card).await?; + tracing::info!(session = %client.session(), "session asignada"); + + // Ping cada 10s para mantener la sesión viva sin spammear el log. + // El broker nos descontaría con un Farewell o EOF, no con timeout + // de Ping — pero el ping da una señal visible en `info` logs. + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + match client.ping().await { + Ok(ts) => tracing::debug!(server_ts_ms = ts, "ping ok"), + Err(e) => { + tracing::warn!(?e, "ping falló — saliendo"); + break; + } + } + } + + Ok(()) +} + +fn build_producer_card(label: &str, flow_name: &str, type_name: &str) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + lineage: None, + label: label.into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + soma: SomaSpec { + cgroup: CgroupSpec { + path: "ente.slice/demo".into(), + cpu_weight: None, + io_weight: None, + }, + namespaces: NamespaceSet::default(), + rlimits: ResourceLimits::default(), + cpu_affinity: None, + }, + payload: Payload::Virtual, + supervision: Supervision::OneShot, + flow: Flows { + input: vec![], + output: vec![Flow { + name: flow_name.into(), + ty: TypeRef::Primitive { + name: type_name.into(), + }, + pin_to: None, + }], + }, + ..Default::default() + } +} diff --git a/scripts/bootstrap-demo.sh b/scripts/bootstrap-demo.sh new file mode 100755 index 0000000..1f8e35c --- /dev/null +++ b/scripts/bootstrap-demo.sh @@ -0,0 +1,146 @@ +#!/usr/bin/env bash +# bootstrap-demo.sh — arranca un escenario reproducible de brahman: +# broker standalone + 1 producer demo + 1 consumer demo (mismo flow +# tipo, así el broker computa un Match y el broker-explorer lo ve en +# sesiones + timeline) + abre los 4 explorers GPUI. +# +# Uso: +# scripts/bootstrap-demo.sh # default: abre todos +# scripts/bootstrap-demo.sh broker # sólo broker + producers +# scripts/bootstrap-demo.sh broker only # ni siquiera producers +# +# Limpieza: Ctrl-C dispara `cleanup` que mata todos los procesos +# spawneados por este script (broker, producers, explorers). Logs +# de cada uno quedan bajo $LOG_DIR (default /tmp/brahman-demo) para +# que después se puedan revisar — el script los borra cuando se +# vuelve a invocar (no acumula). +# +# Asume `cargo build --workspace` ya hecho. Si no, descomentá la +# línea `cargo build` debajo (toma minutos en cold cache). + +set -euo pipefail + +LOG_DIR="${BRAHMAN_DEMO_LOG_DIR:-/tmp/brahman-demo}" +SOCKET="${BRAHMAN_INIT_SOCKET:-/tmp/brahman-init.sock}" +MODE="${1:-all}" + +# Path resolution: si el script vive en repo/scripts/, el repo es el padre. +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +cd "$REPO_DIR" + +# Limpieza de logs viejos. Mantenemos el dir, sólo vaciamos. +mkdir -p "$LOG_DIR" +rm -f "$LOG_DIR"/*.log + +# Track de PIDs spawneados para que el trap los mate todos. +PIDS=() + +cleanup() { + echo + echo "[bootstrap-demo] cleanup — matando ${#PIDS[@]} procesos" + for pid in "${PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" 2>/dev/null || true + fi + done + # Espera breve para que cierren cleanly antes de SIGKILL. + sleep 1 + for pid in "${PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + kill -9 "$pid" 2>/dev/null || true + fi + done + # El socket lo limpia el broker al hacer Drop, pero defendemos + # contra crashes. + rm -f "$SOCKET" + echo "[bootstrap-demo] terminado. logs: $LOG_DIR" +} +trap cleanup EXIT INT TERM + +# Si querés re-build automático, descomentá: +# echo "[bootstrap-demo] cargo build (puede tomar minutos en cold cache)" +# cargo build -p brahman-demo -p brahman-broker-explorer \ +# -p nakui-explorer -p nouser-explorer -p minga-explorer + +# 1. Broker. Bind del socket. Si ya hay un proceso ocupando el +# socket, esto falla rápido — el cleanup del trap se encarga. +echo "[bootstrap-demo] arrancando broker → $SOCKET" +BRAHMAN_INIT_SOCKET="$SOCKET" \ + cargo run --quiet -p brahman-demo --bin brahman-demo-broker \ + > "$LOG_DIR/broker.log" 2>&1 & +PIDS+=($!) + +# Esperar a que el socket aparezca (el broker hace bind tras boot). +# Sin esto, los siguientes connects rebotan ENOENT. +for _ in $(seq 1 50); do + [ -S "$SOCKET" ] && break + sleep 0.1 +done +if [ ! -S "$SOCKET" ]; then + echo "[bootstrap-demo] ERROR: broker no creó el socket en 5s — ver $LOG_DIR/broker.log" + exit 1 +fi +echo "[bootstrap-demo] broker UP (pid ${PIDS[-1]})" + +# 2. Producer + consumer (a menos que MODE=only). +if [ "$MODE" != "only" ]; then + echo "[bootstrap-demo] arrancando producer (flow demo-stream/json)" + BRAHMAN_INIT_SOCKET="$SOCKET" \ + BRAHMAN_DEMO_LABEL=demo-producer \ + BRAHMAN_DEMO_FLOW=demo-stream \ + BRAHMAN_DEMO_TYPE=json \ + cargo run --quiet -p brahman-demo --bin brahman-demo-producer \ + > "$LOG_DIR/producer.log" 2>&1 & + PIDS+=($!) + + echo "[bootstrap-demo] arrancando consumer (flow demo-feed/json — matchea con producer)" + BRAHMAN_INIT_SOCKET="$SOCKET" \ + BRAHMAN_DEMO_LABEL=demo-consumer \ + BRAHMAN_DEMO_FLOW=demo-feed \ + BRAHMAN_DEMO_TYPE=json \ + cargo run --quiet -p brahman-demo --bin brahman-demo-consumer \ + > "$LOG_DIR/consumer.log" 2>&1 & + PIDS+=($!) + + # Pequeño grace para que los handshakes completen antes que los + # explorers se conecten (sino el primer ListSessions devuelve algo + # transitorio sin las sesiones demo). + sleep 1 +fi + +# 3. Explorers GPUI (a menos que MODE=broker). +if [ "$MODE" != "broker" ] && [ "$MODE" != "only" ]; then + echo "[bootstrap-demo] abriendo brahman-broker-explorer" + BRAHMAN_INIT_SOCKET="$SOCKET" \ + cargo run --quiet -p brahman-broker-explorer \ + > "$LOG_DIR/broker-explorer.log" 2>&1 & + PIDS+=($!) + + echo "[bootstrap-demo] abriendo nouser-explorer (descubrirá vía broker si nouserd corre)" + BRAHMAN_INIT_SOCKET="$SOCKET" \ + cargo run --quiet -p nouser-explorer \ + > "$LOG_DIR/nouser-explorer.log" 2>&1 & + PIDS+=($!) + + # nakui-explorer y minga-explorer son standalone (no usan broker) + # — los abrimos igual para tener el dashboard completo. Apuntan a + # paths default; si no hay nakui-events.jsonl o repo minga, sólo + # mostrarán el banner "esperando…". + echo "[bootstrap-demo] abriendo nakui-explorer (standalone)" + cargo run --quiet -p nakui-explorer > "$LOG_DIR/nakui-explorer.log" 2>&1 & + PIDS+=($!) + + echo "[bootstrap-demo] abriendo minga-explorer (standalone)" + cargo run --quiet -p minga-explorer > "$LOG_DIR/minga-explorer.log" 2>&1 & + PIDS+=($!) +fi + +echo +echo "[bootstrap-demo] todo arriba — Ctrl-C para parar todo" +echo "[bootstrap-demo] logs en $LOG_DIR/" +echo + +# Wait sobre el broker: si éste cae, todo el demo deja de tener +# sentido. El trap se encarga del resto. +wait "${PIDS[0]}"