feat(sidecar): API reusable de discovery via broker

Promueve el patron ad-hoc discover_producer_socket que vivia inline en
'nouser attract --remote' a un modulo publico brahman_sidecar::discovery.
Cualquier consumer ahora puede preguntar al broker "quien provee este
TypeRef?" sin reimplementar el patron a mano.

API:
- build_consumer_card(label, flow_name, type_name) construye una Card
  minima (Ente, Oneshot, Virtual) con un input flow. Asigna Ulid::new()
  real (no nil), evitando colisiones en el broker.
- await_provider(card, timeout) async: conecta al init, espera
  MatchEvent::Available, devuelve producer_service_socket, manda
  Farewell. Ignora eventos Lost durante el await.
- await_provider_blocking(card, timeout) wrapper para mundos no-async
  (CLIs, std-thread loops). Crea su propio runtime current_thread.
- ConsumerError tipado: Connect{socket,source}, NoProvider{flow,type_ref,
  timeout}, Client(ClientError), Runtime(String). Adios al Box<dyn Error>.

Refactor en nouser daemon: discover_producer_socket inline (60 LOC) ->
5 LOC delegando en el helper. remote_embed ya no construye su propio
runtime.

Tests: 4 unitarios (id no-nil, id unico por llamada, formateo de Wit
TypeRef, fallback sin input). Build verde para sidecar y nouser-core.
This commit is contained in:
Sergio
2026-05-09 02:30:48 +00:00
parent 2725d6a297
commit 006640057a
6 changed files with 294 additions and 75 deletions
+56
View File
@@ -6,6 +6,62 @@ ratio/diff ver `git show <sha>`.
## 2026-05-09 ## 2026-05-09
### feat(sidecar): API reusable de discovery vía broker
Promueve el patrón ad-hoc `discover_producer_socket` (que vivía
inline en `nouser attract --remote`) a un módulo público
`brahman_sidecar::discovery`. Cualquier consumer puede ahora
preguntar al broker "¿quién provee este TypeRef?" con dos llamadas:
// Construir un consumer Card mínimo (Ente, Oneshot, Virtual)
let card = brahman_sidecar::build_consumer_card(
"mi-cli",
"embed-result", // flow.input.name
"json", // TypeRef::Primitive { name }
);
// Bloqueante (CLIs, std-thread loops):
let socket: PathBuf = brahman_sidecar::await_provider_blocking(
card, Duration::from_secs(3),
)?;
// O async (módulos con runtime tokio propio):
let socket = brahman_sidecar::await_provider(card, timeout).await?;
API:
- `build_consumer_card(label, flow_name, type_name) -> Card`
abstrae la verbosidad del struct-literal repetido en cada caller.
Genera un `id: Ulid::new()` real (no nil → seguro contra
colisiones en el broker).
- `await_provider(card, timeout) -> Result<PathBuf, ConsumerError>`
conecta al init, espera `MatchEvent::Available`, devuelve
`producer_service_socket`, manda Farewell. Ignora eventos
`Lost` durante el await (no aplican al arranque).
- `await_provider_blocking(card, timeout)` arma su propio
runtime `current_thread` para mundos no-async.
- `ConsumerError` con variantes tipadas: `Connect { socket, source }`,
`NoProvider { flow, type_ref, timeout }`, `Client(ClientError)`,
`Runtime(String)`. Adiós al `Box<dyn Error>` de antes.
Refactor en `nouser daemon`:
- `discover_producer_socket` (60 LOC inline en `bin/nouser.rs`) → 5
líneas que delegan en el helper.
- `remote_embed` ya no construye su propio runtime tokio.
Próximo consumer natural: `nouser-explorer`. Hoy renderea
`StatusSnapshot` vía socket admin (introspección pura). El día que
quiera **interactuar** con un Ente — p. ej., disparar un re-embed
desde la UI — usa este helper para resolver el socket del provider
sin hardcodear paths.
Nota sobre identidad: este commit fuerza `Ulid::new()` para los
consumer Cards generados, evitando la trampa documentada del
`Card::default()` que devuelve `Ulid::nil()`. La fijación global de
`Default` queda como cleanup separado (requiere auditar que ningún
caller dependa del determinismo de `nil`).
Tests: 4 unitarios nuevos en `discovery::tests` (id no-nil, id
único por llamada, formateo de TypeRef::Wit, fallback sin input).
Workspace verde.
### feat(nouser+sidecar): watcher con debounce + re-publish al broker ### feat(nouser+sidecar): watcher con debounce + re-publish al broker
Cierra las dos limitaciones del watcher previo: ya no spamea N veces por Cierra las dos limitaciones del watcher previo: ya no spamea N veces por
una sola edición, y el broker ve los cambios estructurales en lugar de una sola edición, y el broker ve los cambios estructurales en lugar de
Generated
+1
View File
@@ -1217,6 +1217,7 @@ dependencies = [
"brahman-card", "brahman-card",
"brahman-card-wit", "brahman-card-wit",
"brahman-handshake", "brahman-handshake",
"thiserror 2.0.18",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
+15 -75
View File
@@ -651,12 +651,12 @@ fn cmd_attract(args: &[String]) -> Cmd {
/// Pipeline completo del modo `--remote`: /// Pipeline completo del modo `--remote`:
/// 1. Si `NOUSER_NOUS_SOCKET` está set, lo usa directo (override /// 1. Si `NOUSER_NOUS_SOCKET` está set, lo usa directo (override
/// explícito, atajo para tests). /// explícito, atajo para tests).
/// 2. Si no, abre Client al brahman-init, anuncia un consumer Card /// 2. Si no, delega en `brahman_sidecar::await_provider_blocking` —
/// con `flow.input = embed-result:json`, espera el primer /// el sidecar se conecta al broker, registra un consumer Card con
/// `MatchEvent::Available`, y usa el `producer_service_socket` /// `flow.input = embed-result:json`, espera el primer
/// del evento. Esto activa la lógica de `priority_contexts`: si /// `MatchEvent::Available` y devuelve el socket. Esto activa la
/// el broker corre bajo `BRAHMAN_BROKER_CONTEXT=test/prod`, el /// lógica de `priority_contexts`: bajo `BRAHMAN_BROKER_CONTEXT=test/prod`,
/// proveedor electo cambia sin que este consumer toque su código. /// el proveedor electo cambia sin que este código toque nada.
/// 3. Con el socket resuelto, dispara la RPC `EmbedFile`. /// 3. Con el socket resuelto, dispara la RPC `EmbedFile`.
/// ///
/// Devuelve `(embedding, model_id)` — el caller necesita ambos para /// Devuelve `(embedding, model_id)` — el caller necesita ambos para
@@ -669,11 +669,15 @@ fn remote_embed(
return embed_via(&sock, file); return embed_via(&sock, file);
} }
let rt = tokio::runtime::Builder::new_current_thread() let consumer = brahman_sidecar::build_consumer_card(
.enable_io() "nouser.attract-cli",
.enable_time() nouser_nous::FLOW_EMBED_RESULT,
.build()?; nouser_nous::FLOW_TYPE_NAME,
let producer_sock = rt.block_on(discover_producer_socket())?; );
let producer_sock = brahman_sidecar::await_provider_blocking(
consumer,
std::time::Duration::from_secs(3),
)?;
embed_via(&producer_sock, file) embed_via(&producer_sock, file)
} }
@@ -720,70 +724,6 @@ fn embed_via(
Err(format!("nouser-nous: {}", err.error).into()) Err(format!("nouser-nous: {}", err.error).into())
} }
/// Conecta al brahman-init, anuncia un consumer Card y espera el
/// primer `MatchEvent::Available`. Devuelve el `producer_service_socket`
/// que el broker emite. Timeout 3s.
async fn discover_producer_socket() -> Result<std::path::PathBuf, Box<dyn std::error::Error>> {
use brahman_card::{
ulid::Ulid, Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision,
TypeRef,
};
use brahman_handshake::client::Client;
use brahman_handshake::messages::MatchEventKind;
let consumer_card = Card {
schema_version: brahman_card::CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: "nouser.attract-cli".into(),
payload: Payload::Virtual,
supervision: Supervision::OneShot,
lifecycle: Lifecycle::Oneshot,
priority: Priority::Normal,
kind: CardKind::Ente,
flow: Flows {
input: vec![Flow {
name: nouser_nous::FLOW_EMBED_RESULT.into(),
ty: TypeRef::Primitive {
name: nouser_nous::FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
output: vec![],
},
..Default::default()
};
let init_path = brahman_handshake::transport::default_socket_path();
let mut client = Client::connect(&init_path, consumer_card)
.await
.map_err(|e| format!("conectar a brahman-init en {}: {e}", init_path.display()))?;
// El broker empuja MatchEvents tras registrar la sesión. Iteramos
// hasta encontrar Available; ignoramos Lost (no aplica al arranque).
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
let socket = loop {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break None;
}
match client.await_event(remaining).await? {
Some(ev) if ev.kind == MatchEventKind::Available => {
break ev.producer_service_socket;
}
Some(_) => continue, // Lost u otros — seguir esperando
None => break None,
}
};
let _ = client.farewell().await; // best-effort cleanup
socket.ok_or_else(|| {
"ningún proveedor con service_socket matcheó el input embed-result \
(¿está corriendo nouser-nous-mock o nouser-nous-real?)"
.into()
})
}
/// Card del propio engine (kind=Ente). Es el "ser" que produce y /// Card del propio engine (kind=Ente). Es el "ser" que produce y
/// administra Mónadas; aparece en brahman-status junto a sus Mónadas. /// administra Mónadas; aparece en brahman-status junto a sus Mónadas.
fn build_engine_card() -> brahman_card::Card { fn build_engine_card() -> brahman_card::Card {
+1
View File
@@ -11,6 +11,7 @@ description = "Brahman — sidecar reusable: thread + tokio runtime que mantiene
[dependencies] [dependencies]
brahman-card = { path = "../../core/brahman-card" } brahman-card = { path = "../../core/brahman-card" }
brahman-handshake = { path = "../../core/brahman-handshake" } brahman-handshake = { path = "../../core/brahman-handshake" }
thiserror = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
@@ -0,0 +1,216 @@
//! `brahman-sidecar::discovery` — API reusable para que un módulo
//! consumer encuentre proveedores vivos vía broker, sin hardcodear
//! sockets ni reimplementar el patrón a mano.
//!
//! Es la generalización de `discover_producer_socket` del CLI
//! `nouser attract --remote`: declarás el `TypeRef` que querés
//! consumir y el broker te empuja un `MatchEvent::Available` con el
//! `producer_service_socket` del primer proveedor matched.
//!
//! Pipeline:
//! 1. `build_consumer_card(label, flow_name, type_name)` arma una
//! Card mínima (Ente, Oneshot, Virtual) con un input flow.
//! 2. `await_provider(card, timeout)` se conecta al brahman-init,
//! espera hasta `timeout` por `MatchEvent::Available`, devuelve
//! el socket del proveedor electo, y envía Farewell.
//! 3. Para mundos blocking (CLIs, tests, std-thread loops) hay
//! `await_provider_blocking` que arma su propio runtime
//! `current_thread`.
//!
//! Quién elige al proveedor es el broker, no este módulo. Si el
//! broker tiene `priority_contexts` activo, podés cambiar de
//! proveedor sin tocar el consumer; el matching dinámico se respeta.
use std::path::PathBuf;
use std::time::{Duration, Instant};
use brahman_card::{
ulid::Ulid, Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef,
CARD_SCHEMA_VERSION,
};
use brahman_handshake::client::{Client, ClientError};
use brahman_handshake::messages::MatchEventKind;
use brahman_handshake::transport;
#[derive(Debug, thiserror::Error)]
pub enum ConsumerError {
#[error("no se pudo conectar al init en {socket}: {source}")]
Connect {
socket: PathBuf,
#[source]
source: ClientError,
},
#[error("error en cliente brahman: {0}")]
Client(#[from] ClientError),
#[error("timeout {timeout:?} sin proveedor disponible para flow '{flow}' (type '{type_ref}')")]
NoProvider {
flow: String,
type_ref: String,
timeout: Duration,
},
#[error("no se pudo crear runtime tokio: {0}")]
Runtime(String),
}
/// Construye una Card mínima de consumer que declara un input flow
/// con el `TypeRef::Primitive { name }` solicitado. Usá esto para
/// el caso común; si necesitás algo más rico (output flows,
/// permissions, references) construí la Card a mano y pasala a
/// [`await_provider`] directamente.
pub fn build_consumer_card(
consumer_label: impl Into<String>,
flow_name: impl Into<String>,
type_name: impl Into<String>,
) -> Card {
Card {
schema_version: CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: consumer_label.into(),
payload: Payload::Virtual,
supervision: Supervision::OneShot,
lifecycle: Lifecycle::Oneshot,
priority: Priority::Normal,
kind: CardKind::Ente,
flow: Flows {
input: vec![Flow {
name: flow_name.into(),
ty: TypeRef::Primitive {
name: type_name.into(),
},
pin_to: None,
}],
output: vec![],
},
..Default::default()
}
}
/// Conecta al brahman-init, registra `consumer_card`, espera el
/// primer `MatchEvent::Available` y devuelve el `producer_service_socket`
/// que el broker emitió. Cierra la sesión con Farewell antes de
/// retornar (best-effort).
///
/// La `consumer_card` debe declarar al menos un `flow.input`; si no,
/// el broker no puede hacer matching y el await siempre dará timeout.
pub async fn await_provider(
consumer_card: Card,
timeout: Duration,
) -> Result<PathBuf, ConsumerError> {
let init_path = transport::default_socket_path();
// Capturamos descriptor para el mensaje de error antes de mover
// la card al cliente.
let (flow_name, type_ref_name) = describe_first_input(&consumer_card);
let mut client = Client::connect(&init_path, consumer_card)
.await
.map_err(|source| ConsumerError::Connect {
socket: init_path.clone(),
source,
})?;
let deadline = Instant::now() + timeout;
let socket = loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break None;
}
match client.await_event(remaining).await? {
Some(ev) if ev.kind == MatchEventKind::Available => {
break ev.producer_service_socket;
}
Some(_) => continue, // Lost u otros: seguir esperando hasta el deadline
None => break None,
}
};
let _ = client.farewell().await; // best-effort cleanup
socket.ok_or(ConsumerError::NoProvider {
flow: flow_name,
type_ref: type_ref_name,
timeout,
})
}
/// Wrapper bloqueante de [`await_provider`]. Crea un runtime tokio
/// `current_thread` efímero y bloquea el thread llamador. Útil para
/// CLIs, tests y módulos std-thread (p. ej. el frontend GPUI antes
/// de tener su propio runtime async).
pub fn await_provider_blocking(
consumer_card: Card,
timeout: Duration,
) -> Result<PathBuf, ConsumerError> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.map_err(|e| ConsumerError::Runtime(e.to_string()))?;
rt.block_on(await_provider(consumer_card, timeout))
}
fn describe_first_input(card: &Card) -> (String, String) {
match card.flow.input.first() {
Some(flow) => {
let type_name = match &flow.ty {
TypeRef::Primitive { name } => name.clone(),
TypeRef::Wit { package, name, .. } => format!("{package}#{name}"),
};
(flow.name.clone(), type_name)
}
None => ("(sin input)".into(), "(sin tipo)".into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_sets_input_flow_with_primitive_type() {
let c = build_consumer_card("nouser.cli", "embed-result", "json");
assert_eq!(c.label, "nouser.cli");
assert_eq!(c.kind, CardKind::Ente);
assert!(matches!(c.lifecycle, Lifecycle::Oneshot));
assert!(matches!(c.supervision, Supervision::OneShot));
assert_eq!(c.flow.input.len(), 1);
let f = &c.flow.input[0];
assert_eq!(f.name, "embed-result");
match &f.ty {
TypeRef::Primitive { name } => assert_eq!(name, "json"),
_ => panic!("expected primitive type"),
}
assert!(c.flow.output.is_empty());
// El builder asigna un id real (no nil) — fundamental para que
// el broker no colisione con otros consumers.
assert!(c.id != Ulid::nil(), "consumer card id no debe ser nil");
}
#[test]
fn builder_assigns_distinct_ids_per_call() {
let a = build_consumer_card("a", "f", "t");
let b = build_consumer_card("a", "f", "t");
assert_ne!(a.id, b.id, "cada Card debería tener id propio");
}
#[test]
fn describe_falls_back_when_no_input_flow() {
let mut c = build_consumer_card("x", "f", "t");
c.flow.input.clear();
let (flow, ty) = describe_first_input(&c);
assert_eq!(flow, "(sin input)");
assert_eq!(ty, "(sin tipo)");
}
#[test]
fn describe_formats_wit_type() {
let mut c = build_consumer_card("x", "f", "t");
c.flow.input[0].ty = TypeRef::Wit {
package: "brahman:dht".into(),
interface: None,
name: "entity-result".into(),
};
let (_, ty) = describe_first_input(&c);
assert_eq!(ty, "brahman:dht#entity-result");
}
}
+5
View File
@@ -16,6 +16,11 @@
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
pub mod discovery;
pub use discovery::{
await_provider, await_provider_blocking, build_consumer_card, ConsumerError,
};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use std::thread::JoinHandle; use std::thread::JoinHandle;