diff --git a/Cargo.lock b/Cargo.lock index ae516db..a8a61bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1139,6 +1139,22 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "brahman-admin" +version = "0.1.0" +dependencies = [ + "anyhow", + "brahman-broker", + "brahman-card", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", + "ulid", +] + [[package]] name = "brahman-broker" version = "0.1.0" @@ -1176,6 +1192,17 @@ dependencies = [ "ulid", ] +[[package]] +name = "brahman-sidecar" +version = "0.1.0" +dependencies = [ + "brahman-card", + "brahman-handshake", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "bs58" version = "0.5.1" @@ -2772,6 +2799,7 @@ name = "ente-zero" version = "0.0.1" dependencies = [ "anyhow", + "brahman-admin", "brahman-broker", "brahman-handshake", "ente-brain", @@ -5785,6 +5813,8 @@ dependencies = [ name = "nakui-core" version = "0.1.0" dependencies = [ + "brahman-card", + "brahman-sidecar", "petgraph", "rhai", "serde", @@ -5793,6 +5823,7 @@ dependencies = [ "surrealdb", "thiserror 1.0.69", "tokio", + "ulid", "uuid", ] @@ -11440,7 +11471,7 @@ name = "yahweh-shell" version = "0.1.0" dependencies = [ "brahman-card", - "brahman-handshake", + "brahman-sidecar", "gpui", "notify", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4907797..2be928c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,8 @@ members = [ "crates/core/brahman-card", "crates/core/brahman-handshake", "crates/core/brahman-broker", + "crates/core/brahman-admin", + "crates/shared/brahman-sidecar", "crates/core/ente-card", "crates/core/ente-bus", "crates/core/ente-cas", diff --git a/crates/apps/yahweh-shell/Cargo.toml b/crates/apps/yahweh-shell/Cargo.toml index f42a371..7aba7b3 100644 --- a/crates/apps/yahweh-shell/Cargo.toml +++ b/crates/apps/yahweh-shell/Cargo.toml @@ -28,7 +28,7 @@ notify = { workspace = true } # Brahman protocol — sidecar thread que se presenta al Init. brahman-card = { path = "../../core/brahman-card" } -brahman-handshake = { path = "../../core/brahman-handshake" } +brahman-sidecar = { path = "../../shared/brahman-sidecar" } ulid = { workspace = true } [[bin]] diff --git a/crates/apps/yahweh-shell/src/brahman_client.rs b/crates/apps/yahweh-shell/src/brahman_client.rs index cc87c90..525ec2d 100644 --- a/crates/apps/yahweh-shell/src/brahman_client.rs +++ b/crates/apps/yahweh-shell/src/brahman_client.rs @@ -1,84 +1,19 @@ -//! Sidecar brahman: yahweh se presenta al Init como módulo `Widget`. +//! Card de yahweh-shell + spawn del sidecar brahman compartido. //! -//! Vive en un thread aparte con tokio runtime current_thread, desacoplado -//! de GPUI. Si el Init no está disponible, loggea y termina — yahweh -//! sigue funcionando standalone. Si conecta, mantiene la sesión viva -//! con pings periódicos hasta que la GUI termine o el server caiga. -//! -//! Card declarada: -//! - label: `brahman.ui_engine` -//! - lifecycle: `Widget` -//! - flow.input: `render-data` (json) -//! - flow.output: `user-intent` (json) -//! - permissions: filesystem read-write (yahweh persiste `layout.json`), -//! IPC `wit-v1`. +//! La lógica de thread + tokio + ping-loop vive en `brahman-sidecar`; +//! aquí sólo declaramos la identidad de yahweh como módulo Widget. use std::collections::BTreeSet; -use std::time::Duration; use brahman_card::{ Card, Flow, Flows, FsPolicy, IpcPolicy, Lifecycle, Payload, Permissions, Priority, Supervision, TypeRef, CARD_SCHEMA_VERSION, }; -use brahman_handshake::{client::Client, transport}; use ulid::Ulid; -/// Período entre pings al Init. -const PING_INTERVAL: Duration = Duration::from_secs(30); - -/// Spawn del sidecar brahman. No-op si el thread no se puede crear. -/// Devuelve inmediatamente; la conexión se establece en background. +/// Spawn del sidecar con la Card de yahweh. pub fn spawn() { - let result = std::thread::Builder::new() - .name("brahman-client".into()) - .spawn(run_thread); - if let Err(e) = result { - eprintln!("[brahman] no se pudo spawnear el sidecar: {e}"); - } -} - -fn run_thread() { - let rt = match tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - { - Ok(rt) => rt, - Err(e) => { - eprintln!("[brahman] tokio runtime falló: {e}"); - return; - } - }; - rt.block_on(run_client()); -} - -async fn run_client() { - let path = transport::default_socket_path(); - let card = build_card(); - - let mut client = match Client::connect(&path, card).await { - Ok(c) => { - eprintln!( - "[brahman] attached: session={} init_attached={} server={}", - c.session(), - c.server_info().init_attached, - c.server_info().server_version - ); - c - } - Err(e) => { - eprintln!("[brahman] no conectado a {} ({e})", path.display()); - return; - } - }; - - loop { - tokio::time::sleep(PING_INTERVAL).await; - if let Err(e) = client.ping().await { - eprintln!("[brahman] ping falló: {e}"); - return; - } - } + brahman_sidecar::spawn(build_card()); } fn build_card() -> Card { diff --git a/crates/core/brahman-admin/Cargo.toml b/crates/core/brahman-admin/Cargo.toml new file mode 100644 index 0000000..3c0d90f --- /dev/null +++ b/crates/core/brahman-admin/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "brahman-admin" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "Brahman — admin API: snapshot del estado del broker (sesiones + matches) por Unix socket, formato JSON." + +[dependencies] +brahman-broker = { path = "../brahman-broker" } +brahman-card = { path = "../brahman-card" } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +ulid = { workspace = true } +anyhow = { workspace = true } + +[[example]] +name = "brahman-status" +path = "examples/brahman-status.rs" diff --git a/crates/core/brahman-admin/examples/brahman-status.rs b/crates/core/brahman-admin/examples/brahman-status.rs new file mode 100644 index 0000000..21c7f57 --- /dev/null +++ b/crates/core/brahman-admin/examples/brahman-status.rs @@ -0,0 +1,55 @@ +//! `brahman-status` — CLI para inspeccionar el estado del Init. +//! +//! Conecta al socket admin (default `$XDG_RUNTIME_DIR/brahman-admin.sock`, +//! override con `$BRAHMAN_ADMIN_SOCKET`), recibe el snapshot, y lo imprime. + +use brahman_admin::{client, transport}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + let path = transport::default_socket_path(); + let snap = client::query(&path).await?; + + println!( + "Init: server={} protocol={} attached={}", + snap.server_version, snap.protocol_version, snap.init_attached + ); + println!(); + println!("Sessions ({}):", snap.sessions.len()); + if snap.sessions.is_empty() { + println!(" (ninguna)"); + } else { + for s in &snap.sessions { + println!( + " {} {} lifecycle={:?} priority={:?}", + s.session, s.label, s.lifecycle, s.priority + ); + for f in &s.inputs { + println!(" in {}: {:?}", f.name, f.ty); + } + for f in &s.outputs { + println!(" out {}: {:?}", f.name, f.ty); + } + } + } + println!(); + println!("Matches ({}):", snap.matches.len()); + if snap.matches.is_empty() { + println!(" (ninguno)"); + } else { + for m in &snap.matches { + let pin_marker = if m.pinned { "📌" } else { " " }; + println!( + " {} {}.{} ← {}.{} via {:?}", + pin_marker, + m.consumer_label, + m.consumer.flow_name, + m.producer_label, + m.producer.flow_name, + m.via + ); + } + } + + Ok(()) +} diff --git a/crates/core/brahman-admin/src/client.rs b/crates/core/brahman-admin/src/client.rs new file mode 100644 index 0000000..2c5c7d3 --- /dev/null +++ b/crates/core/brahman-admin/src/client.rs @@ -0,0 +1,32 @@ +//! Cliente admin: lee un `StatusSnapshot` desde un socket admin. + +use std::path::Path; + +use thiserror::Error; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::UnixStream; + +use crate::snapshot::StatusSnapshot; + +#[derive(Debug, Error)] +pub enum AdminError { + #[error("E/S: {0}")] + Io(#[from] std::io::Error), + #[error("respuesta vacía")] + Empty, + #[error("JSON inválido: {0}")] + Json(#[from] serde_json::Error), +} + +/// Conecta al socket admin, lee la línea JSON y deserializa. +pub async fn query(path: impl AsRef) -> Result { + let stream = UnixStream::connect(path).await?; + let mut reader = BufReader::new(stream); + let mut line = String::new(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Err(AdminError::Empty); + } + let snapshot = serde_json::from_str(&line)?; + Ok(snapshot) +} diff --git a/crates/core/brahman-admin/src/lib.rs b/crates/core/brahman-admin/src/lib.rs new file mode 100644 index 0000000..1cb1d5b --- /dev/null +++ b/crates/core/brahman-admin/src/lib.rs @@ -0,0 +1,23 @@ +//! `brahman-admin` — observabilidad del broker. +//! +//! Expone un Unix socket separado (no se mezcla con el handshake) en el +//! que cada conexión recibe un `StatusSnapshot` JSON y se cierra. Es +//! single-shot por conexión: pensado para herramientas como +//! `brahman-status`, dashboards y health-checks. +//! +//! Wire format: una línea JSON por conexión, terminada en `\n`. Esto +//! hace trivial inspeccionar con `nc` o `socat` además del cliente +//! tipado de este crate. + +#![forbid(unsafe_code)] +#![warn(rust_2018_idioms)] + +pub mod client; +pub mod server; +pub mod snapshot; +pub mod transport; + +pub use snapshot::StatusSnapshot; + +/// Versión del crate de admin. +pub const ADMIN_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/core/brahman-admin/src/server.rs b/crates/core/brahman-admin/src/server.rs new file mode 100644 index 0000000..a04285e --- /dev/null +++ b/crates/core/brahman-admin/src/server.rs @@ -0,0 +1,107 @@ +//! Servidor admin: emite un `StatusSnapshot` JSON por conexión y cierra. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use brahman_broker::Broker; +use tokio::io::AsyncWriteExt; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::Mutex; +use tracing::warn; + +use crate::snapshot::StatusSnapshot; + +/// Configuración del servidor admin. +#[derive(Debug, Clone, Default)] +pub struct AdminConfig { + /// `true` si el Init está atado al servidor que aloja este admin. + pub init_attached: bool, +} + +/// Servidor admin escuchando en un Unix socket. +pub struct AdminServer { + listener: UnixListener, + socket_path: PathBuf, + broker: Arc>, + config: AdminConfig, +} + +impl AdminServer { + /// Crea el listener. Si `path` existe, lo elimina (asume socket stale). + pub fn bind( + path: impl Into, + broker: Arc>, + config: AdminConfig, + ) -> std::io::Result { + let socket_path = path.into(); + if socket_path.exists() { + std::fs::remove_file(&socket_path)?; + } + if let Some(parent) = socket_path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent)?; + } + } + let listener = UnixListener::bind(&socket_path)?; + Ok(Self { + listener, + socket_path, + broker, + config, + }) + } + + pub fn socket_path(&self) -> &Path { + &self.socket_path + } + + /// Loop de aceptación: cada conexión recibe un snapshot y se cierra. + pub async fn run(self) -> std::io::Result<()> { + loop { + let (stream, _addr) = self.listener.accept().await?; + let broker = self.broker.clone(); + let config = self.config.clone(); + tokio::spawn(async move { + if let Err(e) = handle_conn(stream, broker, config).await { + warn!(error = %e, "admin conn falló"); + } + }); + } + } +} + +impl Drop for AdminServer { + fn drop(&mut self) { + if let Err(e) = std::fs::remove_file(&self.socket_path) { + if e.kind() != std::io::ErrorKind::NotFound { + warn!(path = %self.socket_path.display(), error = %e, "no se pudo limpiar admin socket"); + } + } + } +} + +async fn handle_conn( + mut stream: UnixStream, + broker: Arc>, + config: AdminConfig, +) -> std::io::Result<()> { + let snapshot = build_snapshot(&broker, &config).await; + let mut json = serde_json::to_string(&snapshot)?; + json.push('\n'); + stream.write_all(json.as_bytes()).await?; + stream.shutdown().await?; + Ok(()) +} + +async fn build_snapshot(broker: &Arc>, config: &AdminConfig) -> StatusSnapshot { + let b = broker.lock().await; + let sessions: Vec<_> = b.cards().cloned().collect(); + let matches = b.all_matches(); + StatusSnapshot { + server_version: crate::ADMIN_VERSION.to_string(), + protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), + init_attached: config.init_attached, + sessions, + matches, + } +} diff --git a/crates/core/brahman-admin/src/snapshot.rs b/crates/core/brahman-admin/src/snapshot.rs new file mode 100644 index 0000000..8a4df41 --- /dev/null +++ b/crates/core/brahman-admin/src/snapshot.rs @@ -0,0 +1,19 @@ +//! Tipos del snapshot que el admin server emite. + +use brahman_broker::{BrokeredCard, Match}; +use serde::{Deserialize, Serialize}; + +/// Snapshot completo del estado del Init en un instante. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StatusSnapshot { + /// Versión del crate del Init que respondió. + pub server_version: String, + /// Versión del protocolo brahman. + pub protocol_version: String, + /// `true` si el Init está atado al servidor. + pub init_attached: bool, + /// Cards actualmente registradas (sesiones vivas). + pub sessions: Vec, + /// Matches consumer↔producer derivados del set actual. + pub matches: Vec, +} diff --git a/crates/core/brahman-admin/src/transport.rs b/crates/core/brahman-admin/src/transport.rs new file mode 100644 index 0000000..d2afed6 --- /dev/null +++ b/crates/core/brahman-admin/src/transport.rs @@ -0,0 +1,20 @@ +//! Convenciones de transporte para el socket admin. + +use std::path::PathBuf; + +/// Variable de entorno que sobreescribe la ruta del socket admin. +pub const SOCKET_ENV: &str = "BRAHMAN_ADMIN_SOCKET"; + +/// Nombre del socket admin dentro del runtime dir. +pub const SOCKET_NAME: &str = "brahman-admin.sock"; + +/// Ruta canónica al socket admin del Init. +pub fn default_socket_path() -> PathBuf { + if let Ok(p) = std::env::var(SOCKET_ENV) { + return PathBuf::from(p); + } + let base = std::env::var_os("XDG_RUNTIME_DIR") + .map(PathBuf::from) + .unwrap_or_else(std::env::temp_dir); + base.join(SOCKET_NAME) +} diff --git a/crates/core/brahman-broker/src/lib.rs b/crates/core/brahman-broker/src/lib.rs index 3a7ff4b..852c7ec 100644 --- a/crates/core/brahman-broker/src/lib.rs +++ b/crates/core/brahman-broker/src/lib.rs @@ -30,7 +30,7 @@ use std::collections::BTreeMap; -use brahman_card::{Card, Flow, Priority, TypeRef}; +use brahman_card::{Card, Flow, Lifecycle, Priority, TypeRef}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -60,10 +60,11 @@ pub struct BrokerConfig { } /// Vista mínima de una Card que el broker necesita. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct BrokeredCard { pub session: SessionId, pub label: String, + pub lifecycle: Lifecycle, pub priority: Priority, pub inputs: Vec, pub outputs: Vec, @@ -74,6 +75,7 @@ impl BrokeredCard { Self { session, label: card.label.clone(), + lifecycle: card.lifecycle, priority: card.priority, inputs: card.flow.input.clone(), outputs: card.flow.output.clone(), @@ -82,14 +84,14 @@ impl BrokeredCard { } /// Punto extremo de un flujo: qué sesión + nombre del flow dentro de su Card. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Endpoint { pub session: SessionId, pub flow_name: String, } /// Match concreto entre un consumidor y un productor. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Match { pub consumer: Endpoint, pub consumer_label: String, @@ -147,6 +149,11 @@ impl Broker { self.cards.keys().copied() } + /// Iterador sobre las Cards registradas (vista compartida). + pub fn cards(&self) -> impl Iterator + '_ { + self.cards.values() + } + /// Busca el mejor productor para un input específico de un consumidor. /// /// Algoritmo: diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index 65d058b..3c2c597 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -26,6 +26,10 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use ulid::Ulid; +// Re-export para que los consumidores no necesiten depender de `ulid` +// directamente. +pub use ::ulid; + /// Versión del esquema de la Card. pub const CARD_SCHEMA_VERSION: u16 = 1; diff --git a/crates/core/ente-zero/Cargo.toml b/crates/core/ente-zero/Cargo.toml index 04bb4bf..8ea901a 100644 --- a/crates/core/ente-zero/Cargo.toml +++ b/crates/core/ente-zero/Cargo.toml @@ -24,6 +24,7 @@ ente-echo = { path = "../ente-echo" } # solo para constantes del demo # Brahman protocol — handshake para módulos brahman conscientes brahman-handshake = { path = "../brahman-handshake" } brahman-broker = { path = "../brahman-broker" } +brahman-admin = { path = "../brahman-admin" } # Runtime / utilidades de PID 1 serde = { workspace = true } diff --git a/crates/core/ente-zero/src/main.rs b/crates/core/ente-zero/src/main.rs index 18efbe8..b0c156a 100644 --- a/crates/core/ente-zero/src/main.rs +++ b/crates/core/ente-zero/src/main.rs @@ -169,6 +169,29 @@ async fn primordial_loop( } } + // Brahman admin: socket separado para snapshots de estado (sesiones + + // matches del broker). Misma política de degradación grácil. + let admin_sock = brahman_admin::transport::default_socket_path(); + match brahman_admin::server::AdminServer::bind( + &admin_sock, + brahman_broker.clone(), + brahman_admin::server::AdminConfig { + init_attached: true, + }, + ) { + Ok(admin) => { + info!(socket = %admin_sock.display(), "brahman admin escuchando"); + tokio::spawn(async move { + if let Err(e) = admin.run().await { + warn!(?e, "brahman admin server cayó"); + } + }); + } + Err(e) => { + warn!(?e, socket = %admin_sock.display(), "brahman admin deshabilitado"); + } + } + let mut graph = EnteGraph::new(seed_card); graph.instantiate_seed_dependencies(&graph_tx).await?; diff --git a/crates/modules/nakui/core/Cargo.toml b/crates/modules/nakui/core/Cargo.toml index 3d4b869..1a566c1 100644 --- a/crates/modules/nakui/core/Cargo.toml +++ b/crates/modules/nakui/core/Cargo.toml @@ -21,6 +21,11 @@ sha2 = "0.10" surrealdb = { version = "2", default-features = false, features = ["kv-mem"] } tokio = { version = "1", features = ["rt", "macros"] } +# Brahman protocol — presencia ante el Init cuando `nakui run` arranca. +brahman-card = { path = "../../../core/brahman-card" } +brahman-sidecar = { path = "../../../shared/brahman-sidecar" } +ulid = { version = "1" } + [[bin]] name = "nakui" path = "src/bin/nakui.rs" diff --git a/crates/modules/nakui/core/src/bin/nakui.rs b/crates/modules/nakui/core/src/bin/nakui.rs index 6ee5714..52f47f6 100644 --- a/crates/modules/nakui/core/src/bin/nakui.rs +++ b/crates/modules/nakui/core/src/bin/nakui.rs @@ -313,6 +313,10 @@ fn cmd_run(args: &[String]) -> Result<(), CliError> { .unwrap_or_else(|| "".into()), ); + // Sidecar brahman: nakui se presenta al Init mientras el daemon vive. + // No bloquea; si el Init no está, el sidecar termina silenciosamente. + brahman_sidecar::spawn(brahman_card_for_nakui()); + let executor = Executor::load_module(&module_dir) .map_err(|e| CliError::Op(format!("load module {}: {}", module_dir.display(), e)))?; let log = EventLog::open(&log_path).map_err(|e| CliError::Op(format!("open log: {}", e)))?; @@ -453,3 +457,59 @@ fn cmd_verify_log(args: &[String]) -> Result<(), CliError> { Err(e) => Err(CliError::Op(format!("verify failed: {}", e))), } } + +/// Card que nakui presenta al Init brahman cuando arranca como daemon. +/// +/// Lifecycle Daemon (proceso largo). Flujos JSON: consume `command` +/// (queries del UI), produce `report` (resultados de cómputo). Los +/// nombres están escogidos para que el broker pueda matchearlos contra +/// `user-intent` / `render-data` de yahweh-shell por compatibilidad de +/// tipo (todos `json`). +fn brahman_card_for_nakui() -> brahman_card::Card { + use brahman_card::{ + Card, Flow, Flows, FsPolicy, IpcPolicy, Lifecycle, Payload, Permissions, Priority, + Supervision, TypeRef, CARD_SCHEMA_VERSION, + }; + use std::collections::BTreeSet; + use std::time::Duration; + + Card { + schema_version: CARD_SCHEMA_VERSION, + id: ulid::Ulid::new(), + lineage: None, + label: "brahman.nakui_erp".into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + payload: Payload::Virtual, + supervision: Supervision::Restart { + initial: Duration::from_millis(200), + max: Duration::from_secs(30), + }, + lifecycle: Lifecycle::Daemon, + priority: Priority::Normal, + permissions: Permissions { + filesystem: FsPolicy::ReadWrite, + ipc: IpcPolicy { + allow: vec!["wit-v1".into()], + }, + ..Default::default() + }, + flow: Flows { + input: vec![Flow { + name: "command".into(), + ty: TypeRef::Primitive { + name: "json".into(), + }, + pin_to: None, + }], + output: vec![Flow { + name: "report".into(), + ty: TypeRef::Primitive { + name: "json".into(), + }, + pin_to: None, + }], + }, + ..Default::default() + } +} diff --git a/crates/shared/brahman-sidecar/Cargo.toml b/crates/shared/brahman-sidecar/Cargo.toml new file mode 100644 index 0000000..da80c1b --- /dev/null +++ b/crates/shared/brahman-sidecar/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "brahman-sidecar" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "Brahman — sidecar reusable: thread + tokio runtime que mantiene viva la sesión de un módulo contra el Init." + +[dependencies] +brahman-card = { path = "../../core/brahman-card" } +brahman-handshake = { path = "../../core/brahman-handshake" } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tracing-subscriber = { workspace = true } + +[[example]] +name = "presence" +path = "examples/presence.rs" diff --git a/crates/shared/brahman-sidecar/examples/presence.rs b/crates/shared/brahman-sidecar/examples/presence.rs new file mode 100644 index 0000000..5eb0452 --- /dev/null +++ b/crates/shared/brahman-sidecar/examples/presence.rs @@ -0,0 +1,69 @@ +//! `presence` — módulo brahman dummy para pruebas y demos. +//! +//! Declara una Card mínima con label tomado del primer argumento (default +//! `presence-default`) y mantiene la sesión viva hasta SIGTERM/SIGINT. +//! Útil para poblar el broker con sesiones de prueba. +//! +//! Uso: +//! ```sh +//! cargo run -p brahman-sidecar --example presence -- mi-modulo +//! ``` + +use std::collections::BTreeSet; +use std::time::Duration; + +use brahman_card::{ + ulid::Ulid, Card, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef, + CARD_SCHEMA_VERSION, +}; +use brahman_sidecar::{spawn_with_handle, SidecarConfig}; + +fn main() { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info".into()), + ) + .init(); + + let label = std::env::args() + .nth(1) + .unwrap_or_else(|| "presence-default".into()); + + let card = Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.clone(), + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::Daemon, + priority: Priority::Normal, + provides: BTreeSet::new(), + requires: BTreeSet::new(), + flow: Flows { + input: vec![Flow { + name: "in".into(), + ty: TypeRef::Primitive { + name: "json".into(), + }, + pin_to: None, + }], + output: vec![Flow { + name: "out".into(), + ty: TypeRef::Primitive { + name: "json".into(), + }, + pin_to: None, + }], + }, + ..Default::default() + }; + + let _handle = spawn_with_handle(SidecarConfig { + card, + ping_interval: Duration::from_secs(5), + }); + + eprintln!("presence({label}): sidecar lanzado, durmiendo (Ctrl-C para salir)"); + std::thread::park(); +} diff --git a/crates/shared/brahman-sidecar/src/lib.rs b/crates/shared/brahman-sidecar/src/lib.rs new file mode 100644 index 0000000..0da806f --- /dev/null +++ b/crates/shared/brahman-sidecar/src/lib.rs @@ -0,0 +1,109 @@ +//! `brahman-sidecar` — boilerplate del cliente brahman extraído. +//! +//! Cualquier módulo que quiera presentarse al Init brahman pero que tenga +//! su propio runtime (GPUI, current_thread tokio, std-thread loop, etc.) +//! puede llamar [`spawn`] con su [`brahman_card::Card`]. Eso arma un +//! thread aparte con un runtime tokio current_thread, conecta al Init, +//! y mantiene la sesión viva con pings periódicos. +//! +//! Si el Init no está disponible, el thread loggea y termina — el módulo +//! sigue funcionando standalone. +//! +//! Errores de conexión / ping se loggean vía `tracing::warn!`. Si querés +//! capturar la salida del thread (por ejemplo para test), usá +//! [`spawn_with_handle`] que devuelve un `JoinHandle`. + +#![forbid(unsafe_code)] +#![warn(rust_2018_idioms)] + +use std::thread::JoinHandle; +use std::time::Duration; + +use brahman_card::Card; +use brahman_handshake::{client::Client, transport}; +use tracing::{info, warn}; + +/// Período entre pings al Init. +pub const DEFAULT_PING_INTERVAL: Duration = Duration::from_secs(30); + +/// Configuración del sidecar. +#[derive(Debug, Clone)] +pub struct SidecarConfig { + /// Card que se presenta al Init. + pub card: Card, + /// Período entre pings. + pub ping_interval: Duration, +} + +impl SidecarConfig { + /// Configuración con defaults razonables: ping cada 30s. + pub fn new(card: Card) -> Self { + Self { + card, + ping_interval: DEFAULT_PING_INTERVAL, + } + } +} + +/// Spawn fire-and-forget. Devuelve inmediatamente; el handle se descarta. +/// Si el thread no se puede crear (raro), loggea y sigue. +pub fn spawn(card: Card) { + if let Err(e) = spawn_with_handle(SidecarConfig::new(card)) { + warn!(error = %e, "no se pudo spawnear el sidecar brahman"); + } +} + +/// Spawn devolviendo el `JoinHandle` para tests o cleanup explícito. +pub fn spawn_with_handle(config: SidecarConfig) -> std::io::Result> { + std::thread::Builder::new() + .name("brahman-sidecar".into()) + .spawn(move || run_thread(config)) +} + +fn run_thread(config: SidecarConfig) { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + { + Ok(rt) => rt, + Err(e) => { + warn!(error = %e, "tokio runtime falló"); + return; + } + }; + rt.block_on(run_client(config)); +} + +async fn run_client(config: SidecarConfig) { + let path = transport::default_socket_path(); + let mut client = match Client::connect(&path, config.card).await { + Ok(c) => { + info!( + target: "brahman_sidecar", + session = %c.session(), + init_attached = c.server_info().init_attached, + server = %c.server_info().server_version, + "attached" + ); + c + } + Err(e) => { + warn!( + target: "brahman_sidecar", + error = %e, + socket = %path.display(), + "no conectado" + ); + return; + } + }; + + loop { + tokio::time::sleep(config.ping_interval).await; + if let Err(e) = client.ping().await { + warn!(target: "brahman_sidecar", error = %e, "ping falló — terminando sidecar"); + return; + } + } +}