From 07d77a335f2adcd4dca2a6c685dfa6342294e61e Mon Sep 17 00:00:00 2001 From: Sergio Date: Fri, 8 May 2026 14:54:45 +0000 Subject: [PATCH] feat(handshake): integra el broker con el ciclo de sesiones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ServerConfig acepta un Option>> compartido. Cuando está presente, el servidor lo mantiene en sincronía con las sesiones: - Tras un Hello aceptado, register_session indexa la Card en el broker ANTES de insertar en el SessionRegistry y de emitir HelloAck. - Al cerrar la sesión (Farewell, EOF, o error en run_post_handshake), un cleanup() unificado llama unregister en el broker y remove en el SessionRegistry. Garantizado por refactor de Session::handle a do_handshake → run_post_handshake → cleanup. Tests nuevos en handshake.rs: - broker_registers_and_unregisters_with_session: confirma el ciclo register → farewell → unregister. - broker_matches_two_live_modules: dos clientes (productor + consumidor) conectados; el broker resuelve find_producer_for(consumer.session, "in") → producer "dht". Tras farewell del productor, el match desaparece. Fix colateral: brahman-card::TypeRef pasa de internally-tagged (#[serde(tag = "kind")]) a externally-tagged (default). Postcard no soporta internally-tagged en formatos no self-describing — sin este cambio el wire de Hello con Cards que tengan flujos no codificaba. JSON cambia de {"kind":"primitive","name":"x"} a {"primitive":{"name":"x"}}. Documentado en el doc-comment de TypeRef. 26/26 tests verdes (broker 11 + card 8 + handshake codec 1 + integ 6). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + crates/core/brahman-card/src/lib.rs | 15 +- crates/core/brahman-handshake/Cargo.toml | 1 + crates/core/brahman-handshake/src/server.rs | 69 ++++--- .../core/brahman-handshake/tests/handshake.rs | 169 +++++++++++++++++- 5 files changed, 228 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ab86cf..79a2e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1164,6 +1164,7 @@ dependencies = [ name = "brahman-handshake" version = "0.1.0" dependencies = [ + "brahman-broker", "brahman-card", "postcard", "serde", diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index de43442..65d058b 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -398,8 +398,17 @@ pub struct Flow { } /// Referencia a un tipo, discriminada para distinguir primitivas de tipos WIT. +/// +/// **Wire format (JSON / TOML / postcard):** externally-tagged. Ejemplo JSON: +/// ```json +/// { "primitive": { "name": "string" } } +/// { "wit": { "package": "brahman:dht", "name": "entity-result" } } +/// ``` +/// Se eligió externally-tagged por compatibilidad con postcard, que no +/// soporta `#[serde(tag = "...")]` (internally-tagged) en formatos no +/// self-describing. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "kind", rename_all = "lowercase")] +#[serde(rename_all = "lowercase")] pub enum TypeRef { /// Tipo primitivo del runtime. Primitive { name: String }, @@ -674,11 +683,11 @@ mod tests { "priority": "high", "flow": { "input": [ - { "name": "search-query", "type": { "kind": "primitive", "name": "string" } } + { "name": "search-query", "type": { "primitive": { "name": "string" } } } ], "output": [ { "name": "dht-results", - "type": { "kind": "wit", "package": "brahman:dht", "name": "entity-result" } } + "type": { "wit": { "package": "brahman:dht", "name": "entity-result" } } } ] }, "genesis": [] diff --git a/crates/core/brahman-handshake/Cargo.toml b/crates/core/brahman-handshake/Cargo.toml index d4eeb27..e1361da 100644 --- a/crates/core/brahman-handshake/Cargo.toml +++ b/crates/core/brahman-handshake/Cargo.toml @@ -10,6 +10,7 @@ description = "Brahman — handshake runtime Init↔módulo sobre Unix socket (p [dependencies] brahman-card = { path = "../brahman-card" } +brahman-broker = { path = "../brahman-broker" } serde = { workspace = true } postcard = { workspace = true } tokio = { workspace = true } diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 8f6f776..dffd9c9 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -5,7 +5,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use brahman_card::{ResolvedCard, CARD_SCHEMA_VERSION}; +use brahman_broker::Broker; +use brahman_card::{Card, ResolvedCard, CARD_SCHEMA_VERSION}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::Mutex; use tracing::{debug, warn}; @@ -17,19 +18,19 @@ use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Po /// Tabla de sesiones vivas indexada por `SessionId`. pub type SessionRegistry = Arc>>; +/// Broker compartido (opcional) que el servidor mantiene en sincronía con +/// el ciclo de vida de las sesiones. +pub type SharedBroker = Arc>; + /// Configuración del servidor. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ServerConfig { /// `true` si el Init está atado al servidor (se reporta en `HelloAck`). pub init_attached: bool, -} - -impl Default for ServerConfig { - fn default() -> Self { - Self { - init_attached: false, - } - } + /// Broker compartido. Si está presente, el servidor llama + /// `register` tras un Hello aceptado y `unregister` al cerrar la + /// sesión (Farewell o EOF). Si es `None`, el broker no se usa. + pub broker: Option, } /// Servidor de handshake escuchando en un Unix socket. @@ -118,18 +119,24 @@ pub struct Session { impl Session { /// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings. + /// Garantiza cleanup (sessions + broker) sin importar la rama de salida. pub async fn handle(mut self) -> std::io::Result<()> { let session_id = match self.do_handshake().await? { Some(id) => id, - None => return Ok(()), // hello rechazado, conexión cerrada + None => return Ok(()), // Hello rechazado, no se registró nada }; + let result = self.run_post_handshake(session_id).await; + self.cleanup(session_id).await; + result + } + + async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> { loop { let frame = match read_frame(&mut self.stream).await { Ok(f) => f, Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { debug!(session = %session_id, "cliente cerró conexión sin Farewell"); - self.sessions.lock().await.remove(&session_id); return Ok(()); } Err(e) => return Err(e), @@ -150,12 +157,18 @@ impl Session { ) .await?; } - Frame::Farewell(Farewell { session }) => { - if session == session_id { - self.sessions.lock().await.remove(&session_id); - } + Frame::Farewell(Farewell { session }) if session == session_id => { return Ok(()); } + Frame::Farewell(_) => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + } _ => { // Frame inesperado en estado post-handshake. write_frame( @@ -170,6 +183,15 @@ impl Session { } } + /// Limpieza atómica de las dos vistas: registro de sesiones + broker. + /// Se ejecuta tanto si la sesión cierra por Farewell, EOF, o error. + async fn cleanup(&self, session_id: SessionId) { + self.sessions.lock().await.remove(&session_id); + if let Some(broker) = &self.config.broker { + broker.lock().await.unregister(session_id); + } + } + /// Lee el Hello, valida, registra la sesión y emite HelloAck. /// Devuelve `Some(session_id)` si el handshake fue exitoso. async fn do_handshake(&mut self) -> std::io::Result> { @@ -193,12 +215,8 @@ impl Session { return Ok(None); } - let resolved = ResolvedCard::from_agnostic(hello.card); let session_id = Ulid::new(); - self.sessions - .lock() - .await - .insert(session_id, resolved); + self.register_session(session_id, hello.card).await; let ack = HelloAck { server_version: crate::HANDSHAKE_VERSION.to_string(), @@ -211,6 +229,15 @@ impl Session { Ok(Some(session_id)) } + /// Indexa la sesión: ResolvedCard en sessions + Card en broker (si hay). + async fn register_session(&self, session_id: SessionId, card: Card) { + if let Some(broker) = &self.config.broker { + broker.lock().await.register(session_id, &card); + } + let resolved = ResolvedCard::from_agnostic(card); + self.sessions.lock().await.insert(session_id, resolved); + } + /// Validaciones que el servidor aplica al Hello del cliente. fn validate_hello(&self, hello: &Hello) -> Option { if hello.schema_version != CARD_SCHEMA_VERSION { diff --git a/crates/core/brahman-handshake/tests/handshake.rs b/crates/core/brahman-handshake/tests/handshake.rs index 7562b5c..5462bc1 100644 --- a/crates/core/brahman-handshake/tests/handshake.rs +++ b/crates/core/brahman-handshake/tests/handshake.rs @@ -2,11 +2,13 @@ //! ejercita el round-trip completo del protocolo. use std::collections::BTreeSet; +use std::sync::Arc; use std::time::Duration; +use brahman_broker::{Broker, BrokerConfig}; use brahman_card::{ - Card, CgroupSpec, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, - CARD_SCHEMA_VERSION, + Card, CgroupSpec, Flow, Flows, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, + TypeRef, CARD_SCHEMA_VERSION, }; use brahman_handshake::{ client::{Client, ClientError}, @@ -15,6 +17,7 @@ use brahman_handshake::{ server::{Server, ServerConfig}, }; use tokio::net::UnixStream; +use tokio::sync::Mutex; use ulid::Ulid; fn sample_card(label: &str) -> Card { @@ -55,7 +58,7 @@ fn sock_path(name: &str) -> std::path::PathBuf { #[tokio::test] async fn full_handshake_roundtrip() { let path = sock_path("happy"); - let server = Server::bind(&path, ServerConfig { init_attached: true }).unwrap(); + let server = Server::bind(&path, ServerConfig { init_attached: true, broker: None }).unwrap(); let session_handle = tokio::spawn({ async move { @@ -135,6 +138,166 @@ async fn server_rejects_protocol_mismatch() { .unwrap(); } +// ===================================================================== +// Integración handshake ↔ broker +// ===================================================================== + +fn card_with_flows(label: &str, input: Vec, output: Vec) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.into(), + soma: SomaSpec { + cgroup: CgroupSpec { + path: "ente.slice/test".into(), + cpu_weight: None, + io_weight: None, + }, + namespaces: NamespaceSet::default(), + rlimits: ResourceLimits::default(), + cpu_affinity: None, + }, + payload: Payload::Virtual, + supervision: Supervision::OneShot, + flow: Flows { input, output }, + ..Default::default() + } +} + +fn flow(name: &str, ty: TypeRef) -> Flow { + Flow { + name: name.into(), + ty, + pin_to: None, + } +} + +/// Espera hasta que `broker.len() >= n` o timeout. +async fn wait_for_broker_len(broker: &Arc>, n: usize) { + for _ in 0..50 { + if broker.lock().await.len() >= n { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("broker no alcanzó {n} entradas en 500ms"); +} + +#[tokio::test] +async fn broker_registers_and_unregisters_with_session() { + let path = sock_path("broker-lifecycle"); + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let server = Server::bind( + &path, + ServerConfig { + init_attached: false, + broker: Some(broker.clone()), + }, + ) + .unwrap(); + + let session_handle = tokio::spawn(async move { + let session = server.accept_one().await.unwrap(); + session.handle().await.unwrap(); + }); + + let mut client = Client::connect(&path, sample_card("alpha")).await.unwrap(); + let session_id = client.session(); + + // Tras el handshake, la Card debe estar registrada en el broker. + wait_for_broker_len(&broker, 1).await; + { + let b = broker.lock().await; + assert_eq!(b.len(), 1); + assert!(b.sessions().any(|s| s == session_id)); + } + + client.farewell().await.unwrap(); + tokio::time::timeout(Duration::from_secs(2), session_handle) + .await + .expect("server colgó tras farewell") + .unwrap(); + + // Tras el cleanup, el broker queda vacío. + { + let b = broker.lock().await; + assert_eq!(b.len(), 0); + } +} + +#[tokio::test] +async fn broker_matches_two_live_modules() { + let path = sock_path("broker-match"); + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let server = Server::bind( + &path, + ServerConfig { + init_attached: false, + broker: Some(broker.clone()), + }, + ) + .unwrap(); + + // Server loop: usa la API run() para manejar accept+spawn. + let server_handle = tokio::spawn(async move { + let _ = server.run().await; + }); + + // Productor: emite "out" tipo string. + let producer_card = card_with_flows( + "dht", + vec![], + vec![flow( + "out", + TypeRef::Primitive { + name: "string".into(), + }, + )], + ); + let mut producer = Client::connect(&path, producer_card).await.unwrap(); + wait_for_broker_len(&broker, 1).await; + + // Consumidor: pide "in" tipo string. + let consumer_card = card_with_flows( + "ui", + vec![flow( + "in", + TypeRef::Primitive { + name: "string".into(), + }, + )], + vec![], + ); + let mut consumer = Client::connect(&path, consumer_card).await.unwrap(); + wait_for_broker_len(&broker, 2).await; + + // El broker debe encontrar el match consumer.in ← producer.out. + let m = { + let b = broker.lock().await; + b.find_producer_for(consumer.session(), "in") + } + .expect("broker no encontró match"); + assert_eq!(m.consumer_label, "ui"); + assert_eq!(m.producer_label, "dht"); + assert_eq!(m.producer.flow_name, "out"); + + // Cuando el productor se va, el match desaparece. + producer.farewell().await.unwrap(); + for _ in 0..50 { + if broker.lock().await.len() < 2 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + { + let b = broker.lock().await; + assert!(b.find_producer_for(consumer.session(), "in").is_none()); + } + + consumer.farewell().await.unwrap(); + server_handle.abort(); +} + #[tokio::test] async fn ping_before_hello_rejected() { let path = sock_path("ping-no-hello");