feat(brahman-demo): bootstrap script reproducible — broker + producer + consumer + 4 explorers
Iter 22. Cierra el set de hoy: future-me (o cualquier nuevo collab)
levanta el escenario completo con un comando.
crates/apps/brahman-demo/ con 3 binarios:
- brahman-demo-broker: Server::bind standalone con Broker. Reemplaza
a ente-zero para demos (ente-zero es PID 1 con kernel surface,
child subreaper, bus, brain, audit — overkill).
- brahman-demo-producer: Card con flow.output[demo-stream:json].
- brahman-demo-consumer: Card con flow.input[demo-feed:json] —
mismo type → matchea con producer.
Env vars en los 3: BRAHMAN_INIT_SOCKET, BRAHMAN_BROKER_CONTEXT,
BRAHMAN_DEMO_LABEL/FLOW/TYPE, RUST_LOG.
scripts/bootstrap-demo.sh:
- Modes: all (default) / broker / only.
- Cleanup-safe: trap mata todos los PIDs spawneados (SIGTERM grace
+ SIGKILL fallback) y borra el socket.
- Espera al socket antes de spawnear (evita ENOENT en handshake).
- Logs separados por proceso bajo $BRAHMAN_DEMO_LOG_DIR.
Smoke end-to-end (sin DISPLAY): consumer recibe MatchEvent
{ Available, demo-feed ← demo-stream, via: Exact, pinned: false }
automáticamente cuando entra el producer. Match fluye por el push
channel del broker.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
//! `brahman-demo-broker` — broker standalone para demos.
|
||||
//!
|
||||
//! ente-zero (PID 1) es el broker "real" pero pesa toneladas
|
||||
//! (kernel surface, child subreaper, bus, brain, audit, etc). Este
|
||||
//! binario sólo arma `brahman_handshake::Server::bind` con un broker
|
||||
//! configurado, escucha en el socket default, y corre forever.
|
||||
//!
|
||||
//! Suficiente para que el script de bootstrap (y los 4 explorers)
|
||||
//! tengan algo a qué conectarse sin necesitar el bootstrap PID 1
|
||||
//! completo.
|
||||
//!
|
||||
//! Variables de entorno respetadas:
|
||||
//! - `BRAHMAN_INIT_SOCKET` — path del Unix socket. Default: el
|
||||
//! resuelto por `brahman_handshake::transport::default_socket_path`.
|
||||
//! - `BRAHMAN_BROKER_CONTEXT` — context bias del broker (igual que
|
||||
//! ente-zero); afecta priority_contexts si las Cards lo declaran.
|
||||
//! - `RUST_LOG` — filtro de tracing (default `info`).
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use brahman_broker::{Broker, BrokerConfig, MatchStrategy};
|
||||
use brahman_handshake::server::{Server, ServerConfig};
|
||||
use brahman_handshake::transport;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.init();
|
||||
|
||||
let context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok();
|
||||
let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig {
|
||||
strategy: MatchStrategy::default(),
|
||||
current_context: context.clone(),
|
||||
})));
|
||||
|
||||
let socket = transport::default_socket_path();
|
||||
tracing::info!(
|
||||
socket = %socket.display(),
|
||||
context = ?context,
|
||||
"brahman-demo-broker arranca"
|
||||
);
|
||||
|
||||
let server = Server::bind(
|
||||
&socket,
|
||||
ServerConfig {
|
||||
init_attached: false,
|
||||
broker: Some(broker),
|
||||
net: None,
|
||||
policy: None,
|
||||
},
|
||||
)?;
|
||||
|
||||
// Loop accept-forever. Cada conexión va a su propia tokio task —
|
||||
// sesiones independientes, ninguna bloquea a las otras.
|
||||
loop {
|
||||
let session = server.accept_one().await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = session.handle().await {
|
||||
tracing::warn!(?e, "session terminó con error");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
//! `brahman-demo-consumer` — registra una Card con un `flow.input`
|
||||
//! que matchea al producer demo, se queda escuchando MatchEvents.
|
||||
//!
|
||||
//! Variables de entorno:
|
||||
//! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-consumer`.
|
||||
//! - `BRAHMAN_DEMO_FLOW` — nombre del flow.input. Default `demo-feed`.
|
||||
//! - `BRAHMAN_DEMO_TYPE` — type primitive. Default `json` (debe matchear
|
||||
//! con el producer para que veamos un Match en el broker).
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::time::Duration;
|
||||
|
||||
use brahman_card::{
|
||||
Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision,
|
||||
TypeRef, CARD_SCHEMA_VERSION,
|
||||
};
|
||||
use brahman_handshake::client::Client;
|
||||
use brahman_handshake::transport;
|
||||
use ulid::Ulid;
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.init();
|
||||
|
||||
let label =
|
||||
std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-consumer".to_string());
|
||||
let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-feed".into());
|
||||
let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into());
|
||||
|
||||
let card = build_consumer_card(&label, &flow_name, &type_name);
|
||||
let socket = transport::default_socket_path();
|
||||
|
||||
tracing::info!(
|
||||
socket = %socket.display(),
|
||||
label = %label,
|
||||
flow = %flow_name,
|
||||
ty = %type_name,
|
||||
"demo-consumer conecta y queda escuchando MatchEvents"
|
||||
);
|
||||
|
||||
let mut client = Client::connect(&socket, card).await?;
|
||||
tracing::info!(session = %client.session(), "session asignada");
|
||||
|
||||
// Drenamos eventos cada 5s. Cada tick los logueamos si hay alguno;
|
||||
// si no, ping de heartbeat (mismo razonamiento que producer).
|
||||
loop {
|
||||
let evt = client.await_event(Duration::from_secs(5)).await?;
|
||||
match evt {
|
||||
Some(ev) => tracing::info!(?ev, "MatchEvent recibido"),
|
||||
None => {
|
||||
if let Err(e) = client.ping().await {
|
||||
tracing::warn!(?e, "ping falló tras quiet period — saliendo");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_consumer_card(label: &str, flow_name: &str, type_name: &str) -> Card {
|
||||
Card {
|
||||
schema_version: CARD_SCHEMA_VERSION,
|
||||
id: Ulid::new(),
|
||||
lineage: None,
|
||||
label: label.into(),
|
||||
provides: BTreeSet::new(),
|
||||
requires: BTreeSet::new(),
|
||||
soma: SomaSpec {
|
||||
cgroup: CgroupSpec {
|
||||
path: "ente.slice/demo".into(),
|
||||
cpu_weight: None,
|
||||
io_weight: None,
|
||||
},
|
||||
namespaces: NamespaceSet::default(),
|
||||
rlimits: ResourceLimits::default(),
|
||||
cpu_affinity: None,
|
||||
},
|
||||
payload: Payload::Virtual,
|
||||
supervision: Supervision::OneShot,
|
||||
flow: Flows {
|
||||
input: vec![Flow {
|
||||
name: flow_name.into(),
|
||||
ty: TypeRef::Primitive {
|
||||
name: type_name.into(),
|
||||
},
|
||||
pin_to: None,
|
||||
}],
|
||||
output: vec![],
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
//! `brahman-demo-producer` — registra una Card con un `flow.output`
|
||||
//! y se queda pingueando indefinidamente.
|
||||
//!
|
||||
//! Sirve para que el broker compute matches con el `consumer` demo
|
||||
//! y los explorers vean algo en sesiones + timeline.
|
||||
//!
|
||||
//! Variables de entorno:
|
||||
//! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-producer`.
|
||||
//! - `BRAHMAN_DEMO_FLOW` — nombre del flow.output. Default `demo-stream`.
|
||||
//! - `BRAHMAN_DEMO_TYPE` — type primitive del flow. Default `json`.
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::time::Duration;
|
||||
|
||||
use brahman_card::{
|
||||
Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision,
|
||||
TypeRef, CARD_SCHEMA_VERSION,
|
||||
};
|
||||
use brahman_handshake::client::Client;
|
||||
use brahman_handshake::transport;
|
||||
use ulid::Ulid;
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.init();
|
||||
|
||||
let label =
|
||||
std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-producer".to_string());
|
||||
let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-stream".into());
|
||||
let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into());
|
||||
|
||||
let card = build_producer_card(&label, &flow_name, &type_name);
|
||||
let socket = transport::default_socket_path();
|
||||
|
||||
tracing::info!(
|
||||
socket = %socket.display(),
|
||||
label = %label,
|
||||
flow = %flow_name,
|
||||
ty = %type_name,
|
||||
"demo-producer conecta y queda registrado"
|
||||
);
|
||||
|
||||
let mut client = Client::connect(&socket, card).await?;
|
||||
tracing::info!(session = %client.session(), "session asignada");
|
||||
|
||||
// Ping cada 10s para mantener la sesión viva sin spammear el log.
|
||||
// El broker nos descontaría con un Farewell o EOF, no con timeout
|
||||
// de Ping — pero el ping da una señal visible en `info` logs.
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
match client.ping().await {
|
||||
Ok(ts) => tracing::debug!(server_ts_ms = ts, "ping ok"),
|
||||
Err(e) => {
|
||||
tracing::warn!(?e, "ping falló — saliendo");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_producer_card(label: &str, flow_name: &str, type_name: &str) -> Card {
|
||||
Card {
|
||||
schema_version: CARD_SCHEMA_VERSION,
|
||||
id: Ulid::new(),
|
||||
lineage: None,
|
||||
label: label.into(),
|
||||
provides: BTreeSet::new(),
|
||||
requires: BTreeSet::new(),
|
||||
soma: SomaSpec {
|
||||
cgroup: CgroupSpec {
|
||||
path: "ente.slice/demo".into(),
|
||||
cpu_weight: None,
|
||||
io_weight: None,
|
||||
},
|
||||
namespaces: NamespaceSet::default(),
|
||||
rlimits: ResourceLimits::default(),
|
||||
cpu_affinity: None,
|
||||
},
|
||||
payload: Payload::Virtual,
|
||||
supervision: Supervision::OneShot,
|
||||
flow: Flows {
|
||||
input: vec![],
|
||||
output: vec![Flow {
|
||||
name: flow_name.into(),
|
||||
ty: TypeRef::Primitive {
|
||||
name: type_name.into(),
|
||||
},
|
||||
pin_to: None,
|
||||
}],
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user