From 5edc912ed85acafe93abe9bca04113cc836a8f90 Mon Sep 17 00:00:00 2001 From: Sergio Date: Fri, 8 May 2026 19:38:23 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=20D-3=20+=20D-4=20=E2=80=94=20ser?= =?UTF-8?q?vice=5Fsocket=20en=20Card,=20providers=20coexisten?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cierra el ciclo del swap automático Nous mock↔real: - brahman-card: Card.service_socket: Option y espejo en WireCard. Path del data plane (distinto al Init). Cualquier consumer que matchee con esta Card conecta directo, sin discovery extra. - brahman-broker: BrokeredCard propaga service_socket. Sin participación en matching — sólo metadata. - brahman-handshake::MatchEvent: nuevo campo producer_service_socket. Server lo busca en BrokeredCard al emitir Available. - nouser-nous::transport: provider_socket_path(provider: &str) devuelve nouser-nous-{provider}.sock por default. Mock y real coexisten en sockets distintos (Phase D-4). default_socket_path() conserva el comportamiento single-provider. - Mock declara nouser-nous-mock.sock; real declara nouser-nous-real.sock. La Card se construye DESPUÉS del bind. - brahman-status imprime "socket:" por sesión cuando está presente. Validación end-to-end: $ ente-zero & nouser-nous-mock & nouser-nous-real & $ ls /run/user/1001/nouser-nous-*.sock nouser-nous-mock.sock nouser-nous-real.sock $ brahman-status Sessions (2): [ente] nouser.nous_real socket: /run/user/1001/nouser-nous-real.sock [ente] nouser.nous_mock socket: /run/user/1001/nouser-nous-mock.sock Pendiente (no crítico): nouser-core attract --remote usa todavía NOUSER_NOUS_SOCKET hardcoded. Siguiente paso: subscribirse al MatchEvent del broker y usar producer_service_socket directo, así BRAHMAN_BROKER_CONTEXT=test/prod swapea provider sin tocar al consumer. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 51 +++++++++++++++++++ .../brahman-admin/examples/brahman-status.rs | 3 ++ crates/core/brahman-broker/src/lib.rs | 5 ++ crates/core/brahman-card/src/lib.rs | 15 +++++- crates/core/brahman-handshake/src/messages.rs | 8 +++ crates/core/brahman-handshake/src/server.rs | 9 ++++ crates/modules/nouser/nous-mock/src/main.rs | 23 +++++---- crates/modules/nouser/nous-real/src/main.rs | 19 ++++--- crates/modules/nouser/nous/src/lib.rs | 24 +++++++-- 9 files changed, 134 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b47dad6..873a906 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,57 @@ ratio/diff ver `git show `. ## 2026-05-08 +### feat: Phase D-3 + D-4 — service_socket en Card, providers coexisten +Cierra el ciclo del swap automático de Nous (mock↔real): + +- **Schema** (`brahman-card`): `Card.service_socket: Option` y + espejo en `WireCard`. Conversiones `From` propagan. Es el path del + **data plane** (distinto del socket del Init); cualquier consumer + que matchee con esta Card puede conectar directo sin discovery + adicional. +- **Broker** (`brahman-broker`): `BrokeredCard` propaga + `service_socket` desde la Card. Sin participación en el matching — + sólo metadata para los observadores. +- **MatchEvent** (`brahman-handshake`): nuevo campo + `producer_service_socket: Option`. Cuando el server emite + `Available`, busca la `BrokeredCard` del productor en el broker y + copia su `service_socket`. El consumer recibe la ruta completa para + conectar. +- **Transport** (`nouser-nous`): `provider_socket_path(provider: &str)` + devuelve `nouser-nous-{provider}.sock` por default — mock y real + coexisten en sockets distintos (Phase D-4). `default_socket_path()` + conserva el comportamiento single-provider. +- **Providers**: mock declara `service_socket = + /run/user/X/nouser-nous-mock.sock`; real declara + `nouser-nous-real.sock`. La Card se construye DESPUÉS del bind para + que el path declarado sea el real. +- **Status**: `brahman-status` imprime `socket:` por sesión cuando + está presente. + +Validación end-to-end: + + $ ente-zero & nouser-nous-mock & nouser-nous-real & + $ ls /run/user/1001/nouser-nous-*.sock + nouser-nous-mock.sock + nouser-nous-real.sock + + $ brahman-status + Sessions (2): + [ente] ... nouser.nous_real + socket: /run/user/1001/nouser-nous-real.sock + in embed-request: Primitive { name: "json" } + out embed-result: Primitive { name: "json" } + [ente] ... nouser.nous_mock + socket: /run/user/1001/nouser-nous-mock.sock + in embed-request, out embed-result + +Pendientes para futuro (no críticos): +- nouser-core attract --remote todavía usa NOUSER_NOUS_SOCKET hardcoded + o `default_socket_path()`. El siguiente paso es subscribirse al + MatchEvent del broker y usar `producer_service_socket` directo — + con eso `BRAHMAN_BROKER_CONTEXT=test/prod` swapea provider sin + tocar al consumer. + ### refactor(nouser): labels de Mónada con 2 componentes del path Resuelve la fricción visual de monorepos donde múltiples Mónadas se llamaban "src". Nueva función `label_from_path` toma los últimos hasta diff --git a/crates/core/brahman-admin/examples/brahman-status.rs b/crates/core/brahman-admin/examples/brahman-status.rs index b35ce97..c25347b 100644 --- a/crates/core/brahman-admin/examples/brahman-status.rs +++ b/crates/core/brahman-admin/examples/brahman-status.rs @@ -32,6 +32,9 @@ async fn main() -> anyhow::Result<()> { " [{}] {} {}{} lifecycle={:?} priority={:?}", kind_marker, s.session, s.label, conscious_marker, s.lifecycle, s.priority ); + if let Some(sock) = &s.service_socket { + println!(" socket: {}", sock.display()); + } if let Some(data) = &s.data { if !data.summary.is_empty() { println!(" summary: {}", data.summary); diff --git a/crates/core/brahman-broker/src/lib.rs b/crates/core/brahman-broker/src/lib.rs index 58e2a19..b650e63 100644 --- a/crates/core/brahman-broker/src/lib.rs +++ b/crates/core/brahman-broker/src/lib.rs @@ -29,6 +29,7 @@ #![warn(rust_2018_idioms)] use std::collections::BTreeMap; +use std::path::PathBuf; use brahman_card::{ Card, CardKind, ContextBias, DataFacet, Flow, Lifecycle, Priority, TypeRef, WitInterface, @@ -86,6 +87,9 @@ pub struct BrokeredCard { /// Faceta de datos cuando `kind != Ente`. #[serde(default, skip_serializing_if = "Option::is_none")] pub data: Option, + /// Socket de servicio (data plane) si lo declara la Card. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub service_socket: Option, } impl BrokeredCard { @@ -101,6 +105,7 @@ impl BrokeredCard { priority_contexts: card.priority_contexts.clone(), kind: card.kind, data: card.data.clone(), + service_socket: card.service_socket.clone(), } } } diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index e1b7889..1f95980 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -19,7 +19,7 @@ #![warn(rust_2018_idioms)] use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -126,6 +126,14 @@ pub struct Card { #[serde(default)] pub flow: Flows, + /// Si la entidad expone un socket Unix de servicio (data plane, + /// distinto al socket del Init), declara aquí su path. Los + /// consumidores que reciban un `MatchEvent` con este Card como + /// productor pueden conectar directo al socket sin discovery + /// adicional. + #[serde(default)] + pub service_socket: Option, + /// Naturaleza de la entidad detrás de la Card. Por defecto `Ente` /// para mantener compatibilidad con Cards existentes. #[serde(default)] @@ -176,6 +184,7 @@ impl Default for Card { priority: Priority::default(), flow: Flows::default(), genesis: Vec::new(), + service_socket: None, kind: CardKind::default(), data: None, priority_contexts: BTreeMap::new(), @@ -803,6 +812,8 @@ pub struct WireCard { #[serde(default)] pub genesis: Vec, #[serde(default)] + pub service_socket: Option, + #[serde(default)] pub kind: CardKind, #[serde(default)] pub data: Option, @@ -827,6 +838,7 @@ impl From for WireCard { priority: c.priority, flow: c.flow, genesis: c.genesis.into_iter().map(WireCard::from).collect(), + service_socket: c.service_socket, kind: c.kind, data: c.data, priority_contexts: c.priority_contexts, @@ -851,6 +863,7 @@ impl From for Card { priority: w.priority, flow: w.flow, genesis: w.genesis.into_iter().map(Card::from).collect(), + service_socket: w.service_socket, kind: w.kind, data: w.data, priority_contexts: w.priority_contexts, diff --git a/crates/core/brahman-handshake/src/messages.rs b/crates/core/brahman-handshake/src/messages.rs index bdce4dc..ca7432d 100644 --- a/crates/core/brahman-handshake/src/messages.rs +++ b/crates/core/brahman-handshake/src/messages.rs @@ -2,6 +2,8 @@ //! //! Todos los mensajes que cruzan el wire son variantes de [`Frame`]. +use std::path::PathBuf; + use brahman_broker::MatchStrategy; use brahman_card::{TypeRef, WireCard, WitInterface}; use serde::{Deserialize, Serialize}; @@ -102,6 +104,12 @@ pub struct MatchEvent { pub via: MatchStrategy, /// `true` si fue resuelto por `pin_to`. pub pinned: bool, + /// Socket de servicio (data plane) que declaró el productor. + /// Si está presente, el consumer puede conectar directo sin + /// pasar por discovery adicional. `None` si el productor no + /// declaró service_socket en su Card. + #[serde(default)] + pub producer_service_socket: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 3220119..1aba07c 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -297,6 +297,13 @@ impl Session { } if let Some(m) = &new_match { + // Resolvemos el service_socket del productor desde + // la BrokeredCard; pasarlo en el evento permite al + // consumer conectar directo sin discovery extra. + let producer_service_socket = b + .cards() + .find(|c| c.session == m.producer.session) + .and_then(|c| c.service_socket.clone()); let event = MatchEvent { kind: MatchEventKind::Available, consumer_flow: input.name.clone(), @@ -306,6 +313,7 @@ impl Session { ty: m.ty.clone(), via: m.via, pinned: m.pinned, + producer_service_socket, }; let send_res = tx.try_send(Frame::MatchEvent(event)); debug!( @@ -330,6 +338,7 @@ impl Session { ty: input.ty.clone(), via: brahman_broker::MatchStrategy::Exact, pinned: false, + producer_service_socket: None, }; let _ = tx.try_send(Frame::MatchEvent(event)); } diff --git a/crates/modules/nouser/nous-mock/src/main.rs b/crates/modules/nouser/nous-mock/src/main.rs index 013c53e..184acdb 100644 --- a/crates/modules/nouser/nous-mock/src/main.rs +++ b/crates/modules/nouser/nous-mock/src/main.rs @@ -45,13 +45,10 @@ const MODEL_ID: &str = "mock-pseudo-32d"; async fn main() -> std::io::Result<()> { init_tracing(); - // 1. Sidecar al brahman-init. - let card = build_card(); - info!(label = %card.label, "publicando Card al brahman-init"); - brahman_sidecar::spawn(card); - - // 2. Bind del socket Nous. - let sock_path = transport::default_socket_path(); + // 1. Resolver socket del data-plane ANTES de armar la Card, para + // declararlo en `Card.service_socket` y que los consumidores lo + // descubran vía MatchEvent. + let sock_path = transport::provider_socket_path("mock"); if sock_path.exists() { std::fs::remove_file(&sock_path)?; } @@ -61,6 +58,11 @@ async fn main() -> std::io::Result<()> { let listener = UnixListener::bind(&sock_path)?; info!(socket = %sock_path.display(), "nouser-nous-mock escuchando"); + // 2. Sidecar al brahman-init con la Card que declara el socket. + let card = build_card(sock_path.clone()); + info!(label = %card.label, "publicando Card al brahman-init"); + brahman_sidecar::spawn(card); + // 3. Accept loop. loop { let (stream, _addr) = listener.accept().await?; @@ -84,8 +86,10 @@ fn init_tracing() { } /// Card que el mock anuncia al brahman-init. Es kind=Ente (un proceso), -/// con flujos JSON y bias de prioridad para contexto `test`. -fn build_card() -> Card { +/// con flujos JSON, bias de prioridad para contexto `test`, y el socket +/// data-plane declarado en `service_socket` (consumidores lo reciben +/// directo en el `MatchEvent::Available`). +fn build_card(service_socket: std::path::PathBuf) -> Card { let mut priority_contexts = BTreeMap::new(); priority_contexts.insert( "test".into(), @@ -105,6 +109,7 @@ fn build_card() -> Card { lifecycle: Lifecycle::Daemon, priority: Priority::Normal, kind: CardKind::Ente, + service_socket: Some(service_socket), flow: Flows { input: vec![Flow { name: FLOW_EMBED_REQUEST.into(), diff --git a/crates/modules/nouser/nous-real/src/main.rs b/crates/modules/nouser/nous-real/src/main.rs index 1a042e2..9aa1692 100644 --- a/crates/modules/nouser/nous-real/src/main.rs +++ b/crates/modules/nouser/nous-real/src/main.rs @@ -65,13 +65,9 @@ async fn main() -> std::io::Result<()> { --features embeddings para activar el modelo)" ); - // 1. Sidecar al brahman-init (mismo patrón que el mock). - let card = build_card(); - info!(label = %card.label, mode = MODEL_ID, "publicando Card al brahman-init"); - brahman_sidecar::spawn(card); - - // 2. Bind del socket Nous (mismo path que el mock — son swappable). - let sock_path = transport::default_socket_path(); + // 1. Resolver socket del data-plane (default `nouser-nous-real.sock`, + // distinto del mock para coexistir). + let sock_path = transport::provider_socket_path("real"); if sock_path.exists() { std::fs::remove_file(&sock_path)?; } @@ -81,6 +77,11 @@ async fn main() -> std::io::Result<()> { let listener = UnixListener::bind(&sock_path)?; info!(socket = %sock_path.display(), "nouser-nous-real escuchando"); + // 2. Sidecar al brahman-init con Card declarando el socket. + let card = build_card(sock_path.clone()); + info!(label = %card.label, mode = MODEL_ID, "publicando Card al brahman-init"); + brahman_sidecar::spawn(card); + // 3. Inicializar el modelo (sólo en modo embeddings). #[cfg(feature = "embeddings")] let backend = embeddings::Backend::init().map_err(|e| { @@ -128,7 +129,8 @@ fn init_tracing() { /// Card que real-nous anuncia. Idéntica al mock excepto por: /// - label distinto (`nouser.nous_real`) para que coexistan en el broker. /// - `priority_contexts.prod = +1` (gana en contexto prod). -fn build_card() -> Card { +/// - `service_socket` propio para que clientes lo descubran directo. +fn build_card(service_socket: std::path::PathBuf) -> Card { let mut priority_contexts = BTreeMap::new(); priority_contexts.insert( "prod".into(), @@ -147,6 +149,7 @@ fn build_card() -> Card { lifecycle: Lifecycle::Daemon, priority: Priority::Normal, kind: CardKind::Ente, + service_socket: Some(service_socket), flow: Flows { input: vec![Flow { name: FLOW_EMBED_REQUEST.into(), diff --git a/crates/modules/nouser/nous/src/lib.rs b/crates/modules/nouser/nous/src/lib.rs index 8b5cd6c..839bf5a 100644 --- a/crates/modules/nouser/nous/src/lib.rs +++ b/crates/modules/nouser/nous/src/lib.rs @@ -117,18 +117,32 @@ pub mod transport { /// Variable de entorno para sobreescribir la ruta del socket. pub const SOCKET_ENV: &str = "NOUSER_NOUS_SOCKET"; - /// Nombre por default del socket dentro del runtime dir. + /// Nombre genérico del socket cuando hay un solo proveedor. pub const SOCKET_NAME: &str = "nouser-nous.sock"; - /// Ruta canónica al socket de Nous. + /// Ruta canónica al socket cuando un único proveedor está activo + /// (consumidores que no quieren elegir). pub fn default_socket_path() -> PathBuf { if let Ok(p) = std::env::var(SOCKET_ENV) { return PathBuf::from(p); } - let base = std::env::var_os("XDG_RUNTIME_DIR") + runtime_base().join(SOCKET_NAME) + } + + /// Ruta default para un proveedor identificado (`"mock"`, `"real"`, + /// etc). Permite que mock y real coexistan sin clash de socket. + /// `NOUSER_NOUS_SOCKET` igual override esta función si está set. + pub fn provider_socket_path(provider: &str) -> PathBuf { + if let Ok(p) = std::env::var(SOCKET_ENV) { + return PathBuf::from(p); + } + runtime_base().join(format!("nouser-nous-{}.sock", provider)) + } + + fn runtime_base() -> PathBuf { + std::env::var_os("XDG_RUNTIME_DIR") .map(PathBuf::from) - .unwrap_or_else(std::env::temp_dir); - base.join(SOCKET_NAME) + .unwrap_or_else(std::env::temp_dir) } }