From 8a83a26de0a393b0d4e6cbe1c22c4bdf986bbe58 Mon Sep 17 00:00:00 2001 From: Sergio Date: Fri, 8 May 2026 15:43:41 +0000 Subject: [PATCH] =?UTF-8?q?feat(handshake):=20notificaci=C3=B3n=20push=20d?= =?UTF-8?q?e=20matches=20del=20broker=20al=20cliente?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit El servidor empuja MatchEvent (Available | Lost) a los consumers cuando sus inputs cambian de match — sea porque un productor llegó, porque otro mejor lo desplazó, o porque desapareció. Mecánica: - Frame::MatchEvent con MatchEventKind { Available, Lost } y los datos del match (consumer_flow, producer_session/label/flow, ty, via, pinned). - Server: SessionTxTable (Arc>>) + LastMatches (último match conocido por consumer/input). En cada register/unregister, broadcast_match_diffs recomputa con el broker y emite SOLO los diffs respecto al estado anterior. - Session::run_post_handshake usa tokio::select! para multiplexar read_frame del cliente y rx.recv() de su tx push. - Cleanup ahora también limpia push_table y last_matches y dispara un broadcast (para notificar a quienes pierden el match). - Client: VecDeque bufferea eventos que llegan mezclados con respuestas a Ping. API: - take_event() — non-blocking, drena buffer - await_event(timeout) — bloquea hasta evento o timeout - ping() ahora drena MatchEvents intermedios hasta encontrar el Pong. Capacity del canal push por sesión: 32 frames (try_send no-blocking; si se llena, los eventos extra se descartan — se documenta como ephemeral, el cliente puede re-consultar via brahman-status). Test nuevo en brahman-handshake/tests/handshake.rs: - match_event_pushed_on_producer_arrival: consumer espera, no recibe evento → llega productor → recibe Available → productor se va → recibe Lost. Example nuevo: brahman-handshake/examples/subscriber.rs — cliente que loguea cada MatchEvent en tiempo real. Útil para ver la dinámica del broker. Pings cada 25s para keepalive. Demo end-to-end verificada (4 eventos, 3 ya cubren el ciclo completo): T+0.3 alpha llega → Available ← demo.alpha.out T+0.8 beta llega → (sin evento: alpha gana por orden alfabético) T+1.3 alpha killed → Available ← demo.beta.out (re-evaluación) T+1.8 beta killed → Lost ← El broker emite diff: ningún evento cuando un nuevo productor llega sin desplazar al ganador actual. Tests: 28/28 (handshake integ 6→7). cargo check --workspace: 0 errores. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/brahman-handshake/Cargo.toml | 4 + .../brahman-handshake/examples/subscriber.rs | 83 ++++++ crates/core/brahman-handshake/src/client.rs | 55 +++- crates/core/brahman-handshake/src/messages.rs | 38 +++ crates/core/brahman-handshake/src/server.rs | 249 ++++++++++++++---- .../core/brahman-handshake/tests/handshake.rs | 76 ++++++ 6 files changed, 449 insertions(+), 56 deletions(-) create mode 100644 crates/core/brahman-handshake/examples/subscriber.rs diff --git a/crates/core/brahman-handshake/Cargo.toml b/crates/core/brahman-handshake/Cargo.toml index 1770386..0674620 100644 --- a/crates/core/brahman-handshake/Cargo.toml +++ b/crates/core/brahman-handshake/Cargo.toml @@ -26,3 +26,7 @@ anyhow = { workspace = true } [[example]] name = "probe" path = "examples/probe.rs" + +[[example]] +name = "subscriber" +path = "examples/subscriber.rs" diff --git a/crates/core/brahman-handshake/examples/subscriber.rs b/crates/core/brahman-handshake/examples/subscriber.rs new file mode 100644 index 0000000..b16aa62 --- /dev/null +++ b/crates/core/brahman-handshake/examples/subscriber.rs @@ -0,0 +1,83 @@ +//! `subscriber` — cliente brahman que loguea cada `MatchEvent` recibido. +//! +//! Declara una Card con un input `in` de tipo `json`. Cada vez que el +//! broker matchea (o desmatch) ese input contra un productor, imprime +//! una línea. Útil para visualizar la dinámica del broker en vivo. +//! +//! Uso: +//! ```sh +//! cargo run -p brahman-handshake --example subscriber [label] +//! ``` + +use std::collections::BTreeSet; +use std::time::Duration; + +use brahman_card::{ + ulid::Ulid, Card, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef, + CARD_SCHEMA_VERSION, +}; +use brahman_handshake::{client::Client, transport}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + let label = std::env::args() + .nth(1) + .unwrap_or_else(|| "subscriber".into()); + + let card = Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.clone(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::Daemon, + priority: Priority::Normal, + flow: Flows { + input: vec![Flow { + name: "in".into(), + ty: TypeRef::Primitive { + name: "json".into(), + }, + pin_to: None, + }], + output: vec![], + }, + ..Default::default() + }; + + let path = transport::default_socket_path(); + eprintln!("[{label}] connecting to {}", path.display()); + let mut client = Client::connect(&path, card).await?; + eprintln!( + "[{label}] attached: session={} init={}", + client.session(), + client.server_info().init_attached + ); + + // Loop: espera hasta 25s por un MatchEvent. Si timeout, ping para + // mantener la conexión viva. + loop { + match client.await_event(Duration::from_secs(25)).await? { + Some(ev) => { + eprintln!( + "[{label}] {:?} {} ← {}.{} via={:?}{}", + ev.kind, + ev.consumer_flow, + if ev.producer_label.is_empty() { + "" + } else { + &ev.producer_label + }, + ev.producer_flow, + ev.via, + if ev.pinned { " 📌" } else { "" } + ); + } + None => { + let _ts = client.ping().await?; + } + } + } +} diff --git a/crates/core/brahman-handshake/src/client.rs b/crates/core/brahman-handshake/src/client.rs index 4992b7b..6050dd6 100644 --- a/crates/core/brahman-handshake/src/client.rs +++ b/crates/core/brahman-handshake/src/client.rs @@ -1,13 +1,15 @@ //! Cliente de handshake. Conecta a un Unix socket y mantiene la sesión. +use std::collections::VecDeque; use std::path::Path; +use std::time::Duration; use brahman_card::{Card, CARD_SCHEMA_VERSION}; use thiserror::Error; use tokio::net::UnixStream; use crate::codec::{read_frame, write_frame}; -use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, SessionId}; +use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, MatchEvent, Ping, SessionId}; /// Errores del cliente. #[derive(Debug, Error)] @@ -29,12 +31,15 @@ pub enum ClientError { } /// Cliente conectado y autenticado. Tras `connect` ya completó el handshake -/// y tiene su `SessionId`. +/// y tiene su `SessionId`. Los `MatchEvent` recibidos durante operaciones +/// request/response se buferean en `pending_events` y se obtienen vía +/// [`Client::take_event`] o [`Client::await_event`]. #[derive(Debug)] pub struct Client { stream: UnixStream, session: SessionId, server_info: HelloAck, + pending_events: VecDeque, } impl Client { @@ -60,11 +65,17 @@ impl Client { Frame::Ping(_) => return Err(ClientError::UnexpectedFrame { got: "Ping" }), Frame::Pong(_) => return Err(ClientError::UnexpectedFrame { got: "Pong" }), Frame::Farewell(_) => return Err(ClientError::UnexpectedFrame { got: "Farewell" }), + Frame::MatchEvent(_) => { + return Err(ClientError::UnexpectedFrame { + got: "MatchEvent (pre-handshake)", + }); + } }; Ok(Self { stream, session: ack.session, server_info: ack, + pending_events: VecDeque::new(), }) } @@ -78,7 +89,8 @@ impl Client { &self.server_info } - /// Envía un Ping y devuelve el timestamp del servidor. + /// Envía un Ping y devuelve el timestamp del servidor. Los frames + /// `MatchEvent` que lleguen mezclados se buferean en `pending_events`. pub async fn ping(&mut self) -> Result { write_frame( &mut self.stream, @@ -87,10 +99,39 @@ impl Client { }), ) .await?; - match read_frame(&mut self.stream).await? { - Frame::Pong(p) => Ok(p.timestamp_ms), - Frame::Error(e) => Err(ClientError::Server(e)), - _ => Err(ClientError::UnexpectedFrame { got: "non-pong" }), + loop { + match read_frame(&mut self.stream).await? { + Frame::Pong(p) => return Ok(p.timestamp_ms), + Frame::MatchEvent(ev) => self.pending_events.push_back(ev), + Frame::Error(e) => return Err(ClientError::Server(e)), + _ => return Err(ClientError::UnexpectedFrame { got: "non-pong" }), + } + } + } + + /// Saca un evento pendiente del buffer, sin bloquear ni leer del wire. + pub fn take_event(&mut self) -> Option { + self.pending_events.pop_front() + } + + /// Espera un `MatchEvent` con timeout. Drena primero el buffer; si + /// está vacío, lee del wire hasta el timeout. Otros frames recibidos + /// (Pong huérfano, Error) cortan la espera con error. + pub async fn await_event( + &mut self, + timeout: Duration, + ) -> Result, ClientError> { + if let Some(ev) = self.pending_events.pop_front() { + return Ok(Some(ev)); + } + match tokio::time::timeout(timeout, read_frame(&mut self.stream)).await { + Err(_) => Ok(None), + Ok(Err(e)) => Err(ClientError::Io(e)), + Ok(Ok(Frame::MatchEvent(ev))) => Ok(Some(ev)), + Ok(Ok(Frame::Error(e))) => Err(ClientError::Server(e)), + Ok(Ok(_)) => Err(ClientError::UnexpectedFrame { + got: "non-event en await_event", + }), } } diff --git a/crates/core/brahman-handshake/src/messages.rs b/crates/core/brahman-handshake/src/messages.rs index 2b64e66..2ace614 100644 --- a/crates/core/brahman-handshake/src/messages.rs +++ b/crates/core/brahman-handshake/src/messages.rs @@ -2,6 +2,8 @@ //! //! Todos los mensajes que cruzan el wire son variantes de [`Frame`]. +use brahman_broker::MatchStrategy; +use brahman_card::TypeRef; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -71,8 +73,43 @@ pub enum HandshakeError { Internal(String), } +/// Notificación push del server al consumer: un match disponible o perdido. +/// +/// El server emite `Available` cuando un productor empieza a satisfacer un +/// `flow.input` del consumer (ya sea porque el productor acaba de +/// registrarse, o porque cambió el match anterior). Emite `Lost` cuando +/// el productor previo dejó de satisfacer el input (desregistro o +/// cambio de match). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MatchEvent { + pub kind: MatchEventKind, + /// Nombre del input del consumer al que aplica el evento. + pub consumer_flow: String, + /// Sesión y label del productor (en `Lost` puede ser nil/vacío). + pub producer_session: SessionId, + pub producer_label: String, + pub producer_flow: String, + /// Tipo del flujo matcheado. + pub ty: TypeRef, + /// Estrategia que ganó (relevante en `Available`). + pub via: MatchStrategy, + /// `true` si fue resuelto por `pin_to`. + pub pinned: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum MatchEventKind { + Available, + Lost, +} + /// Frame único de wire — discriminada por variante. Cada conexión es un /// stream de frames. +/// +/// Direcciones: +/// - Cliente → Server: `Hello`, `Ping`, `Farewell`. +/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Frame { Hello(Hello), @@ -81,4 +118,5 @@ pub enum Frame { Pong(Pong), Farewell(Farewell), Error(HandshakeError), + MatchEvent(MatchEvent), } diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index dffd9c9..1a84916 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -5,15 +5,18 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use brahman_broker::Broker; +use brahman_broker::{Broker, Endpoint}; use brahman_card::{Card, ResolvedCard, CARD_SCHEMA_VERSION}; use tokio::net::{UnixListener, UnixStream}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tracing::{debug, warn}; use ulid::Ulid; use crate::codec::{read_frame, write_frame}; -use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Pong, SessionId}; +use crate::messages::{ + Farewell, Frame, HandshakeError, Hello, HelloAck, MatchEvent, MatchEventKind, Ping, Pong, + SessionId, +}; /// Tabla de sesiones vivas indexada por `SessionId`. pub type SessionRegistry = Arc>>; @@ -22,6 +25,18 @@ pub type SessionRegistry = Arc>>; /// el ciclo de vida de las sesiones. pub type SharedBroker = Arc>; +/// Tabla de canales push por sesión: el server inyecta frames hacia el +/// cliente (p. ej. `MatchEvent`) sin requerir que el cliente haga request. +type SessionTxTable = Arc>>>; + +/// Por sesión, último match conocido por nombre de input. Se usa para +/// emitir diffs (Available/Lost) en lugar del estado completo. +type LastMatches = Arc>>>; + +/// Capacidad del canal push por sesión. Si se llena (cliente lento), los +/// eventos extra se descartan — el cliente puede re-consultar el estado. +const PUSH_CHANNEL_CAPACITY: usize = 32; + /// Configuración del servidor. #[derive(Debug, Clone, Default)] pub struct ServerConfig { @@ -38,6 +53,8 @@ pub struct Server { listener: UnixListener, socket_path: PathBuf, sessions: SessionRegistry, + push_table: SessionTxTable, + last_matches: LastMatches, config: ServerConfig, } @@ -59,6 +76,8 @@ impl Server { listener, socket_path, sessions: Arc::new(Mutex::new(HashMap::new())), + push_table: Arc::new(Mutex::new(HashMap::new())), + last_matches: Arc::new(Mutex::new(HashMap::new())), config, }) } @@ -81,6 +100,8 @@ impl Server { Ok(Session { stream, sessions: self.sessions.clone(), + push_table: self.push_table.clone(), + last_matches: self.last_matches.clone(), config: self.config.clone(), }) } @@ -114,6 +135,8 @@ impl Drop for Server { pub struct Session { stream: UnixStream, sessions: SessionRegistry, + push_table: SessionTxTable, + last_matches: LastMatches, config: ServerConfig, } @@ -132,64 +155,192 @@ impl Session { } async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> { + // Canal por donde el server inyecta frames push (MatchEvent, etc.). + let (tx, mut rx) = mpsc::channel::(PUSH_CHANNEL_CAPACITY); + self.push_table.lock().await.insert(session_id, tx); + + // Tras registrar el canal, recomputar matches y emitir diffs a + // todas las sesiones afectadas (incluida ésta, si tiene inputs). + self.broadcast_match_diffs().await; + loop { - let frame = match read_frame(&mut self.stream).await { - Ok(f) => f, - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { - debug!(session = %session_id, "cliente cerró conexión sin Farewell"); - return Ok(()); + tokio::select! { + // Frame entrante del cliente. + res = read_frame(&mut self.stream) => { + match res { + Ok(frame) => { + if !self.handle_inbound_frame(session_id, frame).await? { + return Ok(()); + } + } + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + debug!(session = %session_id, "cliente cerró sin Farewell"); + return Ok(()); + } + Err(e) => return Err(e), + } } - Err(e) => return Err(e), - }; - match frame { - Frame::Ping(Ping { session }) if session == session_id => { - let pong = Pong { - timestamp_ms: now_ms(), - }; - write_frame(&mut self.stream, &Frame::Pong(pong)).await?; - } - Frame::Ping(_) => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Unauthorized( - "session-id no coincide".into(), - )), - ) - .await?; - } - Frame::Farewell(Farewell { session }) if session == session_id => { - return Ok(()); - } - Frame::Farewell(_) => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Unauthorized( - "session-id no coincide".into(), - )), - ) - .await?; - } - _ => { - // Frame inesperado en estado post-handshake. - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Rejected( - "frame inesperado tras handshake".into(), - )), - ) - .await?; + // Frame push desde el server (MatchEvent). + Some(frame) = rx.recv() => { + write_frame(&mut self.stream, &frame).await?; } } } } - /// Limpieza atómica de las dos vistas: registro de sesiones + broker. - /// Se ejecuta tanto si la sesión cierra por Farewell, EOF, o error. + /// Maneja un frame entrante. Devuelve `Ok(false)` si la sesión debe + /// cerrarse limpiamente (Farewell con session-id correcto). + async fn handle_inbound_frame( + &mut self, + session_id: SessionId, + frame: Frame, + ) -> std::io::Result { + match frame { + Frame::Ping(Ping { session }) if session == session_id => { + let pong = Pong { + timestamp_ms: now_ms(), + }; + write_frame(&mut self.stream, &Frame::Pong(pong)).await?; + Ok(true) + } + Frame::Ping(_) => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } + Frame::Farewell(Farewell { session }) if session == session_id => Ok(false), + Frame::Farewell(_) => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } + _ => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Rejected( + "frame inesperado tras handshake".into(), + )), + ) + .await?; + Ok(true) + } + } + } + + /// Limpieza atómica de TRES vistas: registro de sesiones + broker + + /// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF, + /// o error. Tras desregistrar, emite diffs a las sesiones que perdieron + /// el match contra ésta. async fn cleanup(&self, session_id: SessionId) { self.sessions.lock().await.remove(&session_id); + self.push_table.lock().await.remove(&session_id); + self.last_matches.lock().await.remove(&session_id); if let Some(broker) = &self.config.broker { broker.lock().await.unregister(session_id); } + self.broadcast_match_diffs().await; + } + + /// Recomputa los matches para todas las sesiones registradas y empuja + /// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron + /// respecto al último estado conocido. + /// + /// Se llama tras cada `register_session` y `cleanup`. Las inserciones + /// al canal usan `try_send` (non-blocking); si el cliente está lento + /// y se llena el buffer, los eventos extra se pierden — es ephemeral + /// y el cliente puede re-consultar el estado vía `brahman-status`. + async fn broadcast_match_diffs(&self) { + let broker = match &self.config.broker { + Some(b) => b, + None => return, + }; + + let b = broker.lock().await; + let push_table = self.push_table.lock().await; + let mut last = self.last_matches.lock().await; + + debug!( + target: "brahman_handshake::broadcast", + cards = b.len(), + push_subscribers = push_table.len(), + "broadcast_match_diffs" + ); + + // Snapshot de cards para no tener que sostener el lock del broker. + let cards: Vec<_> = b.cards().cloned().collect(); + + for cons in &cards { + let cons_session = cons.session; + let tx = match push_table.get(&cons_session) { + Some(tx) => tx, + None => continue, // todavía no tiene canal push + }; + let cons_last = last.entry(cons_session).or_default(); + + for input in &cons.inputs { + let new_match = b.find_producer_for(cons_session, &input.name); + let new_endpoint = new_match.as_ref().map(|m| m.producer.clone()); + let old_endpoint = cons_last.get(&input.name).cloned(); + + if old_endpoint == new_endpoint { + continue; + } + + if let Some(m) = &new_match { + let event = MatchEvent { + kind: MatchEventKind::Available, + consumer_flow: input.name.clone(), + producer_session: m.producer.session, + producer_label: m.producer_label.clone(), + producer_flow: m.producer.flow_name.clone(), + ty: m.ty.clone(), + via: m.via, + pinned: m.pinned, + }; + let send_res = tx.try_send(Frame::MatchEvent(event)); + debug!( + target: "brahman_handshake::broadcast", + consumer = %cons_session, + flow = %input.name, + producer = %m.producer_label, + result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e { + tokio::sync::mpsc::error::TrySendError::Full(_) => "full", + tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed", + }), + "Available pushed" + ); + } else { + // Tenía match, ahora no. + let event = MatchEvent { + kind: MatchEventKind::Lost, + consumer_flow: input.name.clone(), + producer_session: Ulid::nil(), + producer_label: String::new(), + producer_flow: String::new(), + ty: input.ty.clone(), + via: brahman_broker::MatchStrategy::Exact, + pinned: false, + }; + let _ = tx.try_send(Frame::MatchEvent(event)); + } + + if let Some(ep) = new_endpoint { + cons_last.insert(input.name.clone(), ep); + } else { + cons_last.remove(&input.name); + } + } + } } /// Lee el Hello, valida, registra la sesión y emite HelloAck. diff --git a/crates/core/brahman-handshake/tests/handshake.rs b/crates/core/brahman-handshake/tests/handshake.rs index 5462bc1..88adc91 100644 --- a/crates/core/brahman-handshake/tests/handshake.rs +++ b/crates/core/brahman-handshake/tests/handshake.rs @@ -298,6 +298,82 @@ async fn broker_matches_two_live_modules() { server_handle.abort(); } +#[tokio::test] +async fn match_event_pushed_on_producer_arrival() { + use brahman_handshake::messages::MatchEventKind; + + let path = sock_path("push-match"); + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let server = Server::bind( + &path, + ServerConfig { + init_attached: false, + broker: Some(broker.clone()), + }, + ) + .unwrap(); + + let server_handle = tokio::spawn(async move { + let _ = server.run().await; + }); + + // El consumidor llega primero — sin productor, no hay match aún. + let consumer_card = card_with_flows( + "ui", + vec![flow( + "in", + TypeRef::Primitive { + name: "json".into(), + }, + )], + vec![], + ); + let mut consumer = Client::connect(&path, consumer_card).await.unwrap(); + + // No debería haber evento todavía. + let no_event = consumer + .await_event(Duration::from_millis(100)) + .await + .unwrap(); + assert!(no_event.is_none(), "evento inesperado: {no_event:?}"); + + // Llega el productor → consumer debe recibir Available. + let producer_card = card_with_flows( + "dht", + vec![], + vec![flow( + "out", + TypeRef::Primitive { + name: "json".into(), + }, + )], + ); + let mut producer = Client::connect(&path, producer_card).await.unwrap(); + + let ev = consumer + .await_event(Duration::from_secs(2)) + .await + .unwrap() + .expect("Available no llegó"); + assert_eq!(ev.kind, MatchEventKind::Available); + assert_eq!(ev.consumer_flow, "in"); + assert_eq!(ev.producer_label, "dht"); + assert_eq!(ev.producer_flow, "out"); + + // El productor se va → consumer debe recibir Lost. + producer.farewell().await.unwrap(); + let ev = consumer + .await_event(Duration::from_secs(2)) + .await + .unwrap() + .expect("Lost no llegó"); + assert_eq!(ev.kind, MatchEventKind::Lost); + assert_eq!(ev.consumer_flow, "in"); + + consumer.farewell().await.unwrap(); + server_handle.abort(); +} + #[tokio::test] async fn ping_before_hello_rejected() { let path = sock_path("ping-no-hello");