From 006640057ac6badcb64daedc60dc5807e8a17fe8 Mon Sep 17 00:00:00 2001 From: Sergio Date: Sat, 9 May 2026 02:30:48 +0000 Subject: [PATCH] feat(sidecar): API reusable de discovery via broker Promueve el patron ad-hoc discover_producer_socket que vivia inline en 'nouser attract --remote' a un modulo publico brahman_sidecar::discovery. Cualquier consumer ahora puede preguntar al broker "quien provee este TypeRef?" sin reimplementar el patron a mano. API: - build_consumer_card(label, flow_name, type_name) construye una Card minima (Ente, Oneshot, Virtual) con un input flow. Asigna Ulid::new() real (no nil), evitando colisiones en el broker. - await_provider(card, timeout) async: conecta al init, espera MatchEvent::Available, devuelve producer_service_socket, manda Farewell. Ignora eventos Lost durante el await. - await_provider_blocking(card, timeout) wrapper para mundos no-async (CLIs, std-thread loops). Crea su propio runtime current_thread. - ConsumerError tipado: Connect{socket,source}, NoProvider{flow,type_ref, timeout}, Client(ClientError), Runtime(String). Adios al Box. Refactor en nouser daemon: discover_producer_socket inline (60 LOC) -> 5 LOC delegando en el helper. remote_embed ya no construye su propio runtime. Tests: 4 unitarios (id no-nil, id unico por llamada, formateo de Wit TypeRef, fallback sin input). Build verde para sidecar y nouser-core. --- CHANGELOG.md | 56 +++++ Cargo.lock | 1 + crates/modules/nouser/core/src/bin/nouser.rs | 90 ++------ crates/shared/brahman-sidecar/Cargo.toml | 1 + .../shared/brahman-sidecar/src/discovery.rs | 216 ++++++++++++++++++ crates/shared/brahman-sidecar/src/lib.rs | 5 + 6 files changed, 294 insertions(+), 75 deletions(-) create mode 100644 crates/shared/brahman-sidecar/src/discovery.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index b4726d3..6a80821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,62 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(sidecar): API reusable de discovery vía broker +Promueve el patrón ad-hoc `discover_producer_socket` (que vivía +inline en `nouser attract --remote`) a un módulo público +`brahman_sidecar::discovery`. Cualquier consumer puede ahora +preguntar al broker "¿quién provee este TypeRef?" con dos llamadas: + + // Construir un consumer Card mínimo (Ente, Oneshot, Virtual) + let card = brahman_sidecar::build_consumer_card( + "mi-cli", + "embed-result", // flow.input.name + "json", // TypeRef::Primitive { name } + ); + + // Bloqueante (CLIs, std-thread loops): + let socket: PathBuf = brahman_sidecar::await_provider_blocking( + card, Duration::from_secs(3), + )?; + // O async (módulos con runtime tokio propio): + let socket = brahman_sidecar::await_provider(card, timeout).await?; + +API: +- `build_consumer_card(label, flow_name, type_name) -> Card` + abstrae la verbosidad del struct-literal repetido en cada caller. + Genera un `id: Ulid::new()` real (no nil → seguro contra + colisiones en el broker). +- `await_provider(card, timeout) -> Result` + conecta al init, espera `MatchEvent::Available`, devuelve + `producer_service_socket`, manda Farewell. Ignora eventos + `Lost` durante el await (no aplican al arranque). +- `await_provider_blocking(card, timeout)` arma su propio + runtime `current_thread` para mundos no-async. +- `ConsumerError` con variantes tipadas: `Connect { socket, source }`, + `NoProvider { flow, type_ref, timeout }`, `Client(ClientError)`, + `Runtime(String)`. Adiós al `Box` de antes. + +Refactor en `nouser daemon`: +- `discover_producer_socket` (60 LOC inline en `bin/nouser.rs`) → 5 + líneas que delegan en el helper. +- `remote_embed` ya no construye su propio runtime tokio. + +Próximo consumer natural: `nouser-explorer`. Hoy renderea +`StatusSnapshot` vía socket admin (introspección pura). El día que +quiera **interactuar** con un Ente — p. ej., disparar un re-embed +desde la UI — usa este helper para resolver el socket del provider +sin hardcodear paths. + +Nota sobre identidad: este commit fuerza `Ulid::new()` para los +consumer Cards generados, evitando la trampa documentada del +`Card::default()` que devuelve `Ulid::nil()`. La fijación global de +`Default` queda como cleanup separado (requiere auditar que ningún +caller dependa del determinismo de `nil`). + +Tests: 4 unitarios nuevos en `discovery::tests` (id no-nil, id +único por llamada, formateo de TypeRef::Wit, fallback sin input). +Workspace verde. + ### feat(nouser+sidecar): watcher con debounce + re-publish al broker Cierra las dos limitaciones del watcher previo: ya no spamea N veces por una sola edición, y el broker ve los cambios estructurales en lugar de diff --git a/Cargo.lock b/Cargo.lock index a2ad1b9..6fabaee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1217,6 +1217,7 @@ dependencies = [ "brahman-card", "brahman-card-wit", "brahman-handshake", + "thiserror 2.0.18", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/modules/nouser/core/src/bin/nouser.rs b/crates/modules/nouser/core/src/bin/nouser.rs index 62d4d38..b52b12b 100644 --- a/crates/modules/nouser/core/src/bin/nouser.rs +++ b/crates/modules/nouser/core/src/bin/nouser.rs @@ -651,12 +651,12 @@ fn cmd_attract(args: &[String]) -> Cmd { /// Pipeline completo del modo `--remote`: /// 1. Si `NOUSER_NOUS_SOCKET` está set, lo usa directo (override /// explícito, atajo para tests). -/// 2. Si no, abre Client al brahman-init, anuncia un consumer Card -/// con `flow.input = embed-result:json`, espera el primer -/// `MatchEvent::Available`, y usa el `producer_service_socket` -/// del evento. Esto activa la lógica de `priority_contexts`: si -/// el broker corre bajo `BRAHMAN_BROKER_CONTEXT=test/prod`, el -/// proveedor electo cambia sin que este consumer toque su código. +/// 2. Si no, delega en `brahman_sidecar::await_provider_blocking` — +/// el sidecar se conecta al broker, registra un consumer Card con +/// `flow.input = embed-result:json`, espera el primer +/// `MatchEvent::Available` y devuelve el socket. Esto activa la +/// lógica de `priority_contexts`: bajo `BRAHMAN_BROKER_CONTEXT=test/prod`, +/// el proveedor electo cambia sin que este código toque nada. /// 3. Con el socket resuelto, dispara la RPC `EmbedFile`. /// /// Devuelve `(embedding, model_id)` — el caller necesita ambos para @@ -669,11 +669,15 @@ fn remote_embed( return embed_via(&sock, file); } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build()?; - let producer_sock = rt.block_on(discover_producer_socket())?; + let consumer = brahman_sidecar::build_consumer_card( + "nouser.attract-cli", + nouser_nous::FLOW_EMBED_RESULT, + nouser_nous::FLOW_TYPE_NAME, + ); + let producer_sock = brahman_sidecar::await_provider_blocking( + consumer, + std::time::Duration::from_secs(3), + )?; embed_via(&producer_sock, file) } @@ -720,70 +724,6 @@ fn embed_via( Err(format!("nouser-nous: {}", err.error).into()) } -/// Conecta al brahman-init, anuncia un consumer Card y espera el -/// primer `MatchEvent::Available`. Devuelve el `producer_service_socket` -/// que el broker emite. Timeout 3s. -async fn discover_producer_socket() -> Result> { - use brahman_card::{ - ulid::Ulid, Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, - TypeRef, - }; - use brahman_handshake::client::Client; - use brahman_handshake::messages::MatchEventKind; - - let consumer_card = Card { - schema_version: brahman_card::CARD_SCHEMA_VERSION, - id: Ulid::new(), - label: "nouser.attract-cli".into(), - payload: Payload::Virtual, - supervision: Supervision::OneShot, - lifecycle: Lifecycle::Oneshot, - priority: Priority::Normal, - kind: CardKind::Ente, - flow: Flows { - input: vec![Flow { - name: nouser_nous::FLOW_EMBED_RESULT.into(), - ty: TypeRef::Primitive { - name: nouser_nous::FLOW_TYPE_NAME.into(), - }, - pin_to: None, - }], - output: vec![], - }, - ..Default::default() - }; - - let init_path = brahman_handshake::transport::default_socket_path(); - let mut client = Client::connect(&init_path, consumer_card) - .await - .map_err(|e| format!("conectar a brahman-init en {}: {e}", init_path.display()))?; - - // El broker empuja MatchEvents tras registrar la sesión. Iteramos - // hasta encontrar Available; ignoramos Lost (no aplica al arranque). - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3); - let socket = loop { - let remaining = deadline.saturating_duration_since(std::time::Instant::now()); - if remaining.is_zero() { - break None; - } - match client.await_event(remaining).await? { - Some(ev) if ev.kind == MatchEventKind::Available => { - break ev.producer_service_socket; - } - Some(_) => continue, // Lost u otros — seguir esperando - None => break None, - } - }; - - let _ = client.farewell().await; // best-effort cleanup - - socket.ok_or_else(|| { - "ningún proveedor con service_socket matcheó el input embed-result \ - (¿está corriendo nouser-nous-mock o nouser-nous-real?)" - .into() - }) -} - /// Card del propio engine (kind=Ente). Es el "ser" que produce y /// administra Mónadas; aparece en brahman-status junto a sus Mónadas. fn build_engine_card() -> brahman_card::Card { diff --git a/crates/shared/brahman-sidecar/Cargo.toml b/crates/shared/brahman-sidecar/Cargo.toml index e4af588..944d033 100644 --- a/crates/shared/brahman-sidecar/Cargo.toml +++ b/crates/shared/brahman-sidecar/Cargo.toml @@ -11,6 +11,7 @@ description = "Brahman — sidecar reusable: thread + tokio runtime que mantiene [dependencies] brahman-card = { path = "../../core/brahman-card" } brahman-handshake = { path = "../../core/brahman-handshake" } +thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/shared/brahman-sidecar/src/discovery.rs b/crates/shared/brahman-sidecar/src/discovery.rs new file mode 100644 index 0000000..f044131 --- /dev/null +++ b/crates/shared/brahman-sidecar/src/discovery.rs @@ -0,0 +1,216 @@ +//! `brahman-sidecar::discovery` — API reusable para que un módulo +//! consumer encuentre proveedores vivos vía broker, sin hardcodear +//! sockets ni reimplementar el patrón a mano. +//! +//! Es la generalización de `discover_producer_socket` del CLI +//! `nouser attract --remote`: declarás el `TypeRef` que querés +//! consumir y el broker te empuja un `MatchEvent::Available` con el +//! `producer_service_socket` del primer proveedor matched. +//! +//! Pipeline: +//! 1. `build_consumer_card(label, flow_name, type_name)` arma una +//! Card mínima (Ente, Oneshot, Virtual) con un input flow. +//! 2. `await_provider(card, timeout)` se conecta al brahman-init, +//! espera hasta `timeout` por `MatchEvent::Available`, devuelve +//! el socket del proveedor electo, y envía Farewell. +//! 3. Para mundos blocking (CLIs, tests, std-thread loops) hay +//! `await_provider_blocking` que arma su propio runtime +//! `current_thread`. +//! +//! Quién elige al proveedor es el broker, no este módulo. Si el +//! broker tiene `priority_contexts` activo, podés cambiar de +//! proveedor sin tocar el consumer; el matching dinámico se respeta. + +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +use brahman_card::{ + ulid::Ulid, Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef, + CARD_SCHEMA_VERSION, +}; +use brahman_handshake::client::{Client, ClientError}; +use brahman_handshake::messages::MatchEventKind; +use brahman_handshake::transport; + +#[derive(Debug, thiserror::Error)] +pub enum ConsumerError { + #[error("no se pudo conectar al init en {socket}: {source}")] + Connect { + socket: PathBuf, + #[source] + source: ClientError, + }, + #[error("error en cliente brahman: {0}")] + Client(#[from] ClientError), + #[error("timeout {timeout:?} sin proveedor disponible para flow '{flow}' (type '{type_ref}')")] + NoProvider { + flow: String, + type_ref: String, + timeout: Duration, + }, + #[error("no se pudo crear runtime tokio: {0}")] + Runtime(String), +} + +/// Construye una Card mínima de consumer que declara un input flow +/// con el `TypeRef::Primitive { name }` solicitado. Usá esto para +/// el caso común; si necesitás algo más rico (output flows, +/// permissions, references) construí la Card a mano y pasala a +/// [`await_provider`] directamente. +pub fn build_consumer_card( + consumer_label: impl Into, + flow_name: impl Into, + type_name: impl Into, +) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: consumer_label.into(), + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::Oneshot, + priority: Priority::Normal, + kind: CardKind::Ente, + flow: Flows { + input: vec![Flow { + name: flow_name.into(), + ty: TypeRef::Primitive { + name: type_name.into(), + }, + pin_to: None, + }], + output: vec![], + }, + ..Default::default() + } +} + +/// Conecta al brahman-init, registra `consumer_card`, espera el +/// primer `MatchEvent::Available` y devuelve el `producer_service_socket` +/// que el broker emitió. Cierra la sesión con Farewell antes de +/// retornar (best-effort). +/// +/// La `consumer_card` debe declarar al menos un `flow.input`; si no, +/// el broker no puede hacer matching y el await siempre dará timeout. +pub async fn await_provider( + consumer_card: Card, + timeout: Duration, +) -> Result { + let init_path = transport::default_socket_path(); + + // Capturamos descriptor para el mensaje de error antes de mover + // la card al cliente. + let (flow_name, type_ref_name) = describe_first_input(&consumer_card); + + let mut client = Client::connect(&init_path, consumer_card) + .await + .map_err(|source| ConsumerError::Connect { + socket: init_path.clone(), + source, + })?; + + let deadline = Instant::now() + timeout; + let socket = loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + break None; + } + match client.await_event(remaining).await? { + Some(ev) if ev.kind == MatchEventKind::Available => { + break ev.producer_service_socket; + } + Some(_) => continue, // Lost u otros: seguir esperando hasta el deadline + None => break None, + } + }; + + let _ = client.farewell().await; // best-effort cleanup + + socket.ok_or(ConsumerError::NoProvider { + flow: flow_name, + type_ref: type_ref_name, + timeout, + }) +} + +/// Wrapper bloqueante de [`await_provider`]. Crea un runtime tokio +/// `current_thread` efímero y bloquea el thread llamador. Útil para +/// CLIs, tests y módulos std-thread (p. ej. el frontend GPUI antes +/// de tener su propio runtime async). +pub fn await_provider_blocking( + consumer_card: Card, + timeout: Duration, +) -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .map_err(|e| ConsumerError::Runtime(e.to_string()))?; + rt.block_on(await_provider(consumer_card, timeout)) +} + +fn describe_first_input(card: &Card) -> (String, String) { + match card.flow.input.first() { + Some(flow) => { + let type_name = match &flow.ty { + TypeRef::Primitive { name } => name.clone(), + TypeRef::Wit { package, name, .. } => format!("{package}#{name}"), + }; + (flow.name.clone(), type_name) + } + None => ("(sin input)".into(), "(sin tipo)".into()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn builder_sets_input_flow_with_primitive_type() { + let c = build_consumer_card("nouser.cli", "embed-result", "json"); + assert_eq!(c.label, "nouser.cli"); + assert_eq!(c.kind, CardKind::Ente); + assert!(matches!(c.lifecycle, Lifecycle::Oneshot)); + assert!(matches!(c.supervision, Supervision::OneShot)); + assert_eq!(c.flow.input.len(), 1); + let f = &c.flow.input[0]; + assert_eq!(f.name, "embed-result"); + match &f.ty { + TypeRef::Primitive { name } => assert_eq!(name, "json"), + _ => panic!("expected primitive type"), + } + assert!(c.flow.output.is_empty()); + // El builder asigna un id real (no nil) — fundamental para que + // el broker no colisione con otros consumers. + assert!(c.id != Ulid::nil(), "consumer card id no debe ser nil"); + } + + #[test] + fn builder_assigns_distinct_ids_per_call() { + let a = build_consumer_card("a", "f", "t"); + let b = build_consumer_card("a", "f", "t"); + assert_ne!(a.id, b.id, "cada Card debería tener id propio"); + } + + #[test] + fn describe_falls_back_when_no_input_flow() { + let mut c = build_consumer_card("x", "f", "t"); + c.flow.input.clear(); + let (flow, ty) = describe_first_input(&c); + assert_eq!(flow, "(sin input)"); + assert_eq!(ty, "(sin tipo)"); + } + + #[test] + fn describe_formats_wit_type() { + let mut c = build_consumer_card("x", "f", "t"); + c.flow.input[0].ty = TypeRef::Wit { + package: "brahman:dht".into(), + interface: None, + name: "entity-result".into(), + }; + let (_, ty) = describe_first_input(&c); + assert_eq!(ty, "brahman:dht#entity-result"); + } +} diff --git a/crates/shared/brahman-sidecar/src/lib.rs b/crates/shared/brahman-sidecar/src/lib.rs index 5fcf099..2ea7f67 100644 --- a/crates/shared/brahman-sidecar/src/lib.rs +++ b/crates/shared/brahman-sidecar/src/lib.rs @@ -16,6 +16,11 @@ #![forbid(unsafe_code)] #![warn(rust_2018_idioms)] +pub mod discovery; +pub use discovery::{ + await_provider, await_provider_blocking, build_consumer_card, ConsumerError, +}; + use std::collections::HashMap; use std::sync::{mpsc, Arc, Mutex}; use std::thread::JoinHandle;