Files
brahman/crates/apps/brahman-demo/src/bin/consumer.rs
T
Sergio 3d55f189c0 feat(brahman-demo): bootstrap script reproducible — broker + producer + consumer + 4 explorers
Iter 22. Cierra el set de hoy: future-me (o cualquier nuevo collab)
levanta el escenario completo con un comando.

crates/apps/brahman-demo/ con 3 binarios:
- brahman-demo-broker: Server::bind standalone con Broker. Reemplaza
  a ente-zero para demos (ente-zero es PID 1 con kernel surface,
  child subreaper, bus, brain, audit — overkill).
- brahman-demo-producer: Card con flow.output[demo-stream:json].
- brahman-demo-consumer: Card con flow.input[demo-feed:json] —
  mismo type → matchea con producer.

Env vars en los 3: BRAHMAN_INIT_SOCKET, BRAHMAN_BROKER_CONTEXT,
BRAHMAN_DEMO_LABEL/FLOW/TYPE, RUST_LOG.

scripts/bootstrap-demo.sh:
- Modes: all (default) / broker / only.
- Cleanup-safe: trap mata todos los PIDs spawneados (SIGTERM grace
  + SIGKILL fallback) y borra el socket.
- Espera al socket antes de spawnear (evita ENOENT en handshake).
- Logs separados por proceso bajo $BRAHMAN_DEMO_LOG_DIR.

Smoke end-to-end (sin DISPLAY): consumer recibe MatchEvent
{ Available, demo-feed ← demo-stream, via: Exact, pinned: false }
automáticamente cuando entra el producer. Match fluye por el push
channel del broker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 16:06:11 +00:00

100 lines
3.3 KiB
Rust

//! `brahman-demo-consumer` — registra una Card con un `flow.input`
//! que matchea al producer demo, se queda escuchando MatchEvents.
//!
//! Variables de entorno:
//! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-consumer`.
//! - `BRAHMAN_DEMO_FLOW` — nombre del flow.input. Default `demo-feed`.
//! - `BRAHMAN_DEMO_TYPE` — type primitive. Default `json` (debe matchear
//! con el producer para que veamos un Match en el broker).
use std::collections::BTreeSet;
use std::time::Duration;
use brahman_card::{
Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision,
TypeRef, CARD_SCHEMA_VERSION,
};
use brahman_handshake::client::Client;
use brahman_handshake::transport;
use ulid::Ulid;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let label =
std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-consumer".to_string());
let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-feed".into());
let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into());
let card = build_consumer_card(&label, &flow_name, &type_name);
let socket = transport::default_socket_path();
tracing::info!(
socket = %socket.display(),
label = %label,
flow = %flow_name,
ty = %type_name,
"demo-consumer conecta y queda escuchando MatchEvents"
);
let mut client = Client::connect(&socket, card).await?;
tracing::info!(session = %client.session(), "session asignada");
// Drenamos eventos cada 5s. Cada tick los logueamos si hay alguno;
// si no, ping de heartbeat (mismo razonamiento que producer).
loop {
let evt = client.await_event(Duration::from_secs(5)).await?;
match evt {
Some(ev) => tracing::info!(?ev, "MatchEvent recibido"),
None => {
if let Err(e) = client.ping().await {
tracing::warn!(?e, "ping falló tras quiet period — saliendo");
break;
}
}
}
}
Ok(())
}
fn build_consumer_card(label: &str, flow_name: &str, type_name: &str) -> Card {
Card {
schema_version: CARD_SCHEMA_VERSION,
id: Ulid::new(),
lineage: None,
label: label.into(),
provides: BTreeSet::new(),
requires: BTreeSet::new(),
soma: SomaSpec {
cgroup: CgroupSpec {
path: "ente.slice/demo".into(),
cpu_weight: None,
io_weight: None,
},
namespaces: NamespaceSet::default(),
rlimits: ResourceLimits::default(),
cpu_affinity: None,
},
payload: Payload::Virtual,
supervision: Supervision::OneShot,
flow: Flows {
input: vec![Flow {
name: flow_name.into(),
ty: TypeRef::Primitive {
name: type_name.into(),
},
pin_to: None,
}],
output: vec![],
},
..Default::default()
}
}