//! `brahman-demo-producer` — registra una Card con un `flow.output` //! y se queda pingueando indefinidamente. //! //! Sirve para que el broker compute matches con el `consumer` demo //! y los explorers vean algo en sesiones + timeline. //! //! Variables de entorno: //! - `BRAHMAN_DEMO_LABEL` — label de la Card. Default `demo-producer`. //! - `BRAHMAN_DEMO_FLOW` — nombre del flow.output. Default `demo-stream`. //! - `BRAHMAN_DEMO_TYPE` — type primitive del flow. Default `json`. use std::collections::BTreeSet; use std::time::Duration; use brahman_card::{ Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, TypeRef, CARD_SCHEMA_VERSION, }; use brahman_handshake::client::Client; use brahman_handshake::transport; use ulid::Ulid; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); let label = std::env::var("BRAHMAN_DEMO_LABEL").unwrap_or_else(|_| "demo-producer".to_string()); let flow_name = std::env::var("BRAHMAN_DEMO_FLOW").unwrap_or_else(|_| "demo-stream".into()); let type_name = std::env::var("BRAHMAN_DEMO_TYPE").unwrap_or_else(|_| "json".into()); let card = build_producer_card(&label, &flow_name, &type_name); let socket = transport::default_socket_path(); tracing::info!( socket = %socket.display(), label = %label, flow = %flow_name, ty = %type_name, "demo-producer conecta y queda registrado" ); let mut client = Client::connect(&socket, card).await?; tracing::info!(session = %client.session(), "session asignada"); // Ping cada 10s para mantener la sesión viva sin spammear el log. // El broker nos descontaría con un Farewell o EOF, no con timeout // de Ping — pero el ping da una señal visible en `info` logs. loop { tokio::time::sleep(Duration::from_secs(10)).await; match client.ping().await { Ok(ts) => tracing::debug!(server_ts_ms = ts, "ping ok"), Err(e) => { tracing::warn!(?e, "ping falló — saliendo"); break; } } } Ok(()) } fn build_producer_card(label: &str, flow_name: &str, type_name: &str) -> Card { Card { schema_version: CARD_SCHEMA_VERSION, id: Ulid::new(), lineage: None, label: label.into(), provides: BTreeSet::new(), requires: BTreeSet::new(), soma: SomaSpec { cgroup: CgroupSpec { path: "ente.slice/demo".into(), cpu_weight: None, io_weight: None, }, namespaces: NamespaceSet::default(), rlimits: ResourceLimits::default(), cpu_affinity: None, }, payload: Payload::Virtual, supervision: Supervision::OneShot, flow: Flows { input: vec![], output: vec![Flow { name: flow_name.into(), ty: TypeRef::Primitive { name: type_name.into(), }, pin_to: None, }], }, ..Default::default() } }