From a97f6b98f39e1ab9d4982ce0fda907a19c1ffa83 Mon Sep 17 00:00:00 2001 From: Sergio Date: Sun, 10 May 2026 15:53:38 +0000 Subject: [PATCH] feat(brahman-handshake): ListMatches endpoint + timeline en broker-explorer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 21. Cierra el loop iniciado en iter 20: ahora se ven sesiones + matches actuales + cómo cambian a través del tiempo. brahman-handshake/messages: - Frame::ListMatches → Frame::MatchList(Vec). brahman-handshake/server: - run_post_handshake pasa Option<&SharedBroker> a handle_inbound_frame. - Sin broker configurado → MatchList vacía (no error). brahman-handshake/client + brahman-sidecar: - Client::list_matches() análogo a list_sessions, drena MatchEvents. - list_matches / list_matches_blocking, mismo patrón. brahman-broker-explorer: - Poll-tick agrega list_matches_blocking además de list_sessions. - last_match_keys: HashSet para diff entre ticks. - timeline: VecDeque cap 50. - diff_matches (free fn): Available para keys nuevas, Lost para desaparecidas. Primer tick marca todo Available (boot UX). - Render: stat_card "Timeline" con HH:MM:SS {+/-} formato compacto. 5 tests broker-explorer (3 nuevos del diff). Stack verde. Decisión: timeline polled cada POLL_INTERVAL=5s, no push. MatchEvents del broker son consumer-céntricos (cada session ve sólo SUS matches); "system-wide timeline" requeriría broker subscribe-all (mucho scope). Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 54 +++ Cargo.lock | 3 + .../apps/brahman-broker-explorer/Cargo.toml | 3 + .../apps/brahman-broker-explorer/src/main.rs | 309 +++++++++++++++++- crates/core/brahman-handshake/src/client.rs | 35 ++ crates/core/brahman-handshake/src/messages.rs | 23 +- crates/core/brahman-handshake/src/server.rs | 38 ++- .../shared/brahman-sidecar/src/discovery.rs | 46 +++ crates/shared/brahman-sidecar/src/lib.rs | 4 +- 9 files changed, 499 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64f76ee..b2bd633 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,60 @@ ratio/diff ver `git show `. ## 2026-05-10 +### feat(brahman-handshake): ListMatches endpoint + timeline en broker-explorer +Iter 21. Cierra el loop de observabilidad iniciado en iter 20: ahora +se ven no sólo las sesiones registradas sino también qué matches +consumer↔producer está computando el broker en cada momento, y la +historia de cómo cambian. + +`brahman-handshake/messages.rs`: +- **`Frame::ListMatches(ListMatches{session})`**: pedido (mismo + patrón de validación session-id). +- **`Frame::MatchList(MatchList{matches: Vec})`**: + respuesta. Cada `Match` ya es serializable y lleva `consumer`, + `consumer_label`, `producer`, `producer_label`, `ty`, `via`, `pinned`. + +`brahman-handshake/server.rs`: +- `run_post_handshake` ahora pasa también `broker_for_match: Option<&SharedBroker>` + al `handle_inbound_frame`. +- Si el server tiene broker configurado, `ListMatches` responde con + `broker.all_matches()`. Si no (server sin broker), responde + `MatchList { matches: vec![] }` — refleja "no hay matching activo", + no es un error. + +`brahman-handshake/client.rs`: `Client::list_matches()` análogo a +`list_sessions()`, drena `MatchEvent`s intermedios al buffer. + +`brahman-sidecar/discovery.rs`: `list_matches` y `list_matches_blocking` +con la misma forma de Card observer minimalista. + +`brahman-broker-explorer`: +- Cada poll-tick ahora pide TANTO `list_sessions` COMO `list_matches`. +- `Explorer.last_match_keys: HashSet` mantiene el estado + del último snapshot. La key es `(consumer.session, consumer.flow, + producer.session, producer.flow)`. +- `Explorer.timeline: VecDeque` con cap `TIMELINE_CAP=50`. +- Función pura `diff_matches(last_keys, list) -> (entries, new_keys)`: + emite `Available` para keys nuevas y `Lost` para keys desaparecidas. + Primer tick (last_keys vacío) marca todo como Available — cubre + el boot sin que la UI quede vacía. +- Render: `stat_card` "Timeline de matches" con count + 20 entries + formateadas como `HH:MM:SS {+/-} consumer.flow ← producer.flow [via]`. + Más reciente arriba. + +Tests broker-explorer: 5 totales. +- `diff_matches_first_snapshot_marks_everything_available` +- `diff_matches_emits_lost_when_match_disappears` +- `diff_matches_no_change_emits_nothing` +- `pending_is_default_state_at_boot` (existente) +- `poll_and_probe_constants_are_sane` (existente) + +Decisión: timeline polled (cada `POLL_INTERVAL=5s`), no push. +Razón: los `MatchEvent` push del broker son consumer-céntricos +(cada session sólo ve sus propios matches). Para "system-wide +timeline" haría falta una API broker-level "subscribe a todos" — +mucho más scope. Polling cada 5s es suficiente para observabilidad. + ### feat(brahman-handshake): ListSessions endpoint + cliente + UI broker-explorer Iter 20. Nuevo flujo end-to-end para observabilidad: cualquier módulo conectado puede preguntar al broker la lista de sesiones diff --git a/Cargo.lock b/Cargo.lock index 23e415a..39f68e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1239,9 +1239,12 @@ dependencies = [ name = "brahman-broker-explorer" version = "0.1.0" dependencies = [ + "brahman-broker", + "brahman-card", "brahman-handshake", "brahman-sidecar", "gpui", + "ulid", "yahweh-launcher", "yahweh-theme", "yahweh-widget-app-header", diff --git a/crates/apps/brahman-broker-explorer/Cargo.toml b/crates/apps/brahman-broker-explorer/Cargo.toml index 1f6baa3..584ecaf 100644 --- a/crates/apps/brahman-broker-explorer/Cargo.toml +++ b/crates/apps/brahman-broker-explorer/Cargo.toml @@ -6,8 +6,11 @@ license.workspace = true description = "Probe GUI del broker brahman: conecta cada N segundos vía await_provider_blocking con un Card observer agnóstico, reporta 3 estados (down / up sin provider / up con provider)." [dependencies] +brahman-broker = { path = "../../core/brahman-broker" } +brahman-card = { path = "../../core/brahman-card" } brahman-handshake = { path = "../../core/brahman-handshake" } brahman-sidecar = { path = "../../shared/brahman-sidecar" } +ulid = { workspace = true } yahweh-theme = { path = "../../modules/ui_engine/libs/theme" } yahweh-launcher = { path = "../../modules/ui_engine/libs/launcher" } yahweh-widget-banner = { path = "../../modules/ui_engine/widgets/banner" } diff --git a/crates/apps/brahman-broker-explorer/src/main.rs b/crates/apps/brahman-broker-explorer/src/main.rs index f777415..5e14333 100644 --- a/crates/apps/brahman-broker-explorer/src/main.rs +++ b/crates/apps/brahman-broker-explorer/src/main.rs @@ -26,11 +26,15 @@ use std::path::PathBuf; use std::time::{Duration, Instant}; +use std::collections::HashSet; + use brahman_handshake::messages::SessionList; use brahman_handshake::transport; use brahman_sidecar::{ - await_provider_blocking, build_consumer_card, list_sessions_blocking, ConsumerError, + await_provider_blocking, build_consumer_card, list_matches_blocking, list_sessions_blocking, + ConsumerError, }; +use ulid::Ulid; use gpui::{ div, prelude::*, px, Context, IntoElement, Render, SharedString, Window, }; @@ -43,6 +47,11 @@ use yahweh_widget_stat_card::stat_card; const POLL_INTERVAL: Duration = Duration::from_secs(5); const PROBE_TIMEOUT: Duration = Duration::from_secs(1); +/// Cap del buffer del timeline. Mantenemos las últimas N entries — +/// más viejo se descarta. 50 cubre ~4 minutos de actividad densa +/// sin saturar el panel; subir si hace falta historia más larga. +const TIMELINE_CAP: usize = 50; + fn main() { launch_app("Brahman Broker — Probe", (720., 480.), Explorer::new); } @@ -73,6 +82,38 @@ struct Explorer { /// Última `SessionList` recibida del broker (None = aún sin pedir /// o último intento falló). sessions: Option, + /// Set de matches presentes en el último snapshot del broker. + /// Identificado por `(consumer.session, consumer.flow, + /// producer.session, producer.flow)` para que la diff entre + /// ticks distinga "nuevo match" vs "match perdido". Un cambio + /// de producer (otro session/flow para mismo consumer) cuenta + /// como Lost del previo + Available del nuevo. + last_match_keys: HashSet, + /// Timeline FIFO: los más nuevos al frente. Cada entry tiene un + /// timestamp local + el evento sintético (Available/Lost) que + /// surgió de la diff del tick. + timeline: std::collections::VecDeque, +} + +/// Key estable para un match. Tupla porque (consumer, producer) +/// determina el match unívocamente; los campos derivados (`label`, +/// `via`, `pinned`) viajan en la entry pero no en la key. +type MatchKey = (Ulid, String, Ulid, String); + +#[derive(Clone, Debug)] +struct TimelineEntry { + /// Cuándo lo observó el explorer. Es tiempo local de wall-clock, + /// no del broker — el broker no timestampa los matches. + at: std::time::SystemTime, + /// Available = nuevo en este tick. Lost = estaba en el tick + /// anterior y desapareció. + kind: brahman_handshake::messages::MatchEventKind, + consumer_label: String, + consumer_flow: String, + producer_label: String, + producer_flow: String, + via: brahman_broker::MatchStrategy, + pinned: bool, } impl Explorer { @@ -117,21 +158,32 @@ impl Explorer { }; // Si el broker está reachable (UP*), aprovechar el - // round-trip para pedir la lista de sesiones. Si está - // DOWN, ni intentar — la lista serviría de nada con - // connect failed igual. - let sessions_snapshot = match &new_state { - ProbeState::Down { .. } | ProbeState::Pending => None, - _ => bg - .spawn(async move { - list_sessions_blocking("brahman-broker-explorer").ok() - }) - .await, + // round-trip para pedir la lista de sesiones + matches. + // Si está DOWN, ni intentar — la lista serviría de nada + // con connect failed igual. + let (sessions_snapshot, matches_snapshot) = match &new_state { + ProbeState::Down { .. } | ProbeState::Pending => (None, None), + _ => { + let s = bg + .spawn(async move { + list_sessions_blocking("brahman-broker-explorer").ok() + }) + .await; + let m = bg + .spawn(async move { + list_matches_blocking("brahman-broker-explorer").ok() + }) + .await; + (s, m) + } }; let _ = this.update(cx, |me, cx| { me.state = new_state; me.sessions = sessions_snapshot; + if let Some(matches) = matches_snapshot { + me.diff_matches_into_timeline(&matches); + } me.last_probe_ms = elapsed; me.last_probe_at = Some(Instant::now()); cx.notify(); @@ -150,6 +202,35 @@ impl Explorer { last_probe_ms: 0, last_probe_at: None, sessions: None, + last_match_keys: HashSet::new(), + timeline: std::collections::VecDeque::new(), + } + } + + /// Diffea el snapshot recibido contra el último set de keys. + /// Genera entries `Available` para keys nuevas y `Lost` para + /// keys que estaban antes y no están ahora. Cada entry se + /// prepende al timeline; el cap se aplica desde la cola. + /// + /// El primer tick del explorer (cuando `last_match_keys` está + /// vacío) hace que TODOS los matches actuales aparezcan como + /// `Available` — es el comportamiento querido (UI muestra el + /// estado al boot sin que parezca "no pasa nada"). + fn diff_matches_into_timeline( + &mut self, + list: &brahman_handshake::messages::MatchList, + ) { + let (new_entries, new_keys) = diff_matches(&self.last_match_keys, list); + for entry in new_entries { + self.push_timeline(entry); + } + self.last_match_keys = new_keys; + } + + fn push_timeline(&mut self, entry: TimelineEntry) { + self.timeline.push_front(entry); + while self.timeline.len() > TIMELINE_CAP { + self.timeline.pop_back(); } } } @@ -234,6 +315,19 @@ impl Render for Explorer { Some(_) => "labels visibles + flows in/out · (wit) = consciente".into(), }; + let timeline_items: Vec = self + .timeline + .iter() + .take(20) + .map(|e| format_timeline_entry(e)) + .collect(); + let timeline_value = self.timeline.len().to_string(); + let timeline_descr = if self.timeline.is_empty() { + "esperando primer match…".to_string() + } else { + "↑ más reciente · ↓ más viejo · cap 50 entries".to_string() + }; + let body = div() .flex() .flex_col() @@ -251,6 +345,16 @@ impl Render for Explorer { text, text_dim, &sessions_items, + )) + .child(stat_card( + cx, + "Timeline de matches", + timeline_value, + &timeline_descr, + accent_partial, + text, + text_dim, + &timeline_items, )); div() @@ -311,6 +415,106 @@ fn state_card( stat_card(cx, "Estado", value, &description, accent, text, text_dim, &[]) } +/// Diff puro entre snapshots de matches. Devuelve la lista de +/// entries nuevas (Available + Lost) en orden Available-primero, y +/// el set actualizado de keys. Extraído como free fn para que sea +/// testeable sin instanciar `Explorer`. +/// +/// El primer tick (last_keys vacío) marca todos los matches como +/// Available. Esto es deliberado: la UI muestra el estado al boot +/// como "todo recién apareció" en vez de quedarse vacía. +fn diff_matches( + last_keys: &HashSet, + list: &brahman_handshake::messages::MatchList, +) -> (Vec, HashSet) { + use brahman_handshake::messages::MatchEventKind; + let now = std::time::SystemTime::now(); + let current_keys: HashSet = list + .matches + .iter() + .map(|m| { + ( + m.consumer.session, + m.consumer.flow_name.clone(), + m.producer.session, + m.producer.flow_name.clone(), + ) + }) + .collect(); + + let mut entries = Vec::new(); + for m in &list.matches { + let key = ( + m.consumer.session, + m.consumer.flow_name.clone(), + m.producer.session, + m.producer.flow_name.clone(), + ); + if !last_keys.contains(&key) { + entries.push(TimelineEntry { + at: now, + kind: MatchEventKind::Available, + consumer_label: m.consumer_label.clone(), + consumer_flow: m.consumer.flow_name.clone(), + producer_label: m.producer_label.clone(), + producer_flow: m.producer.flow_name.clone(), + via: m.via, + pinned: m.pinned, + }); + } + } + for key in last_keys.iter() { + if !current_keys.contains(key) { + entries.push(TimelineEntry { + at: now, + kind: MatchEventKind::Lost, + consumer_label: String::new(), + consumer_flow: key.1.clone(), + producer_label: String::new(), + producer_flow: key.3.clone(), + via: brahman_broker::MatchStrategy::Exact, + pinned: false, + }); + } + } + (entries, current_keys) +} + +/// Renderiza una entry del timeline en una sola línea: `HH:MM:SS +/// {kind} consumer.flow ← producer.flow [via]`. Compact por diseño +/// — el panel es vertical y las líneas largas se cortan. +fn format_timeline_entry(e: &TimelineEntry) -> String { + use brahman_handshake::messages::MatchEventKind; + // Wall-clock local en HH:MM:SS — sin zoneinfo (chrono es heavy). + // Aproximación: total_seconds % 86400 → hora del día UTC. + let secs_today = e + .at + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs() % 86_400) + .unwrap_or(0); + let h = secs_today / 3600; + let m = (secs_today % 3600) / 60; + let s = secs_today % 60; + let kind = match e.kind { + MatchEventKind::Available => "+", + MatchEventKind::Lost => "-", + }; + let pinned = if e.pinned { " (pinned)" } else { "" }; + match e.kind { + MatchEventKind::Available => format!( + "{:02}:{:02}:{:02} {} {}.{} ← {}.{} [{:?}]{}", + h, m, s, kind, + e.consumer_label, e.consumer_flow, + e.producer_label, e.producer_flow, + e.via, pinned, + ), + MatchEventKind::Lost => format!( + "{:02}:{:02}:{:02} {} ?.{} ← ?.{} (lost)", + h, m, s, kind, e.consumer_flow, e.producer_flow, + ), + } +} + #[cfg(test)] mod tests { use super::*; @@ -332,4 +536,87 @@ mod tests { // El intervalo no debería ser tan corto que sature al broker. assert!(POLL_INTERVAL >= Duration::from_secs(2)); } + + fn synth_match( + consumer_label: &str, + consumer_flow: &str, + producer_label: &str, + producer_flow: &str, + ) -> brahman_broker::Match { + use brahman_broker::{Endpoint, Match, MatchStrategy}; + use brahman_card::TypeRef; + Match { + consumer: Endpoint { + session: Ulid::new(), + flow_name: consumer_flow.into(), + }, + consumer_label: consumer_label.into(), + producer: Endpoint { + session: Ulid::new(), + flow_name: producer_flow.into(), + }, + producer_label: producer_label.into(), + ty: TypeRef::Primitive { name: "json".into() }, + via: MatchStrategy::Exact, + pinned: false, + } + } + + #[test] + fn diff_matches_first_snapshot_marks_everything_available() { + use brahman_handshake::messages::{MatchEventKind, MatchList}; + let list = MatchList { + matches: vec![ + synth_match("a", "x", "b", "x"), + synth_match("c", "y", "d", "y"), + ], + }; + let last = HashSet::new(); + let (entries, keys) = diff_matches(&last, &list); + assert_eq!(entries.len(), 2); + assert!(entries + .iter() + .all(|e| matches!(e.kind, MatchEventKind::Available))); + assert_eq!(keys.len(), 2); + } + + #[test] + fn diff_matches_emits_lost_when_match_disappears() { + use brahman_handshake::messages::{MatchEventKind, MatchList}; + let m = synth_match("a", "x", "b", "x"); + let prev_key = ( + m.consumer.session, + m.consumer.flow_name.clone(), + m.producer.session, + m.producer.flow_name.clone(), + ); + let last: HashSet<_> = std::iter::once(prev_key.clone()).collect(); + let list = MatchList { matches: vec![] }; + let (entries, keys) = diff_matches(&last, &list); + assert_eq!(entries.len(), 1); + assert!(matches!(entries[0].kind, MatchEventKind::Lost)); + assert_eq!(entries[0].consumer_flow, "x"); + assert_eq!(entries[0].producer_flow, "x"); + assert!(keys.is_empty()); + } + + #[test] + fn diff_matches_no_change_emits_nothing() { + use brahman_handshake::messages::MatchList; + let m = synth_match("a", "x", "b", "x"); + let key = ( + m.consumer.session, + m.consumer.flow_name.clone(), + m.producer.session, + m.producer.flow_name.clone(), + ); + let last: HashSet<_> = std::iter::once(key.clone()).collect(); + let list = MatchList { + matches: vec![m.clone()], + }; + let (entries, keys) = diff_matches(&last, &list); + assert!(entries.is_empty(), "match unchanged → no events"); + assert_eq!(keys.len(), 1); + assert!(keys.contains(&key)); + } } diff --git a/crates/core/brahman-handshake/src/client.rs b/crates/core/brahman-handshake/src/client.rs index efe27de..9052dae 100644 --- a/crates/core/brahman-handshake/src/client.rs +++ b/crates/core/brahman-handshake/src/client.rs @@ -172,6 +172,16 @@ where got: "SessionList (pre-handshake)", }); } + Frame::ListMatches(_) => { + return Err(ClientError::UnexpectedFrame { + got: "ListMatches (pre-handshake)", + }); + } + Frame::MatchList(_) => { + return Err(ClientError::UnexpectedFrame { + got: "MatchList (pre-handshake)", + }); + } }; Ok(Self { stream, @@ -263,6 +273,31 @@ where } } + /// Pide al servidor el listado de matches actuales del broker + /// (consumer↔producer pares con tipo y estrategia). Mismo patrón + /// de drenado de `MatchEvent`s intermedios. + pub async fn list_matches(&mut self) -> Result { + write_frame( + &mut self.stream, + &Frame::ListMatches(crate::messages::ListMatches { + session: self.session, + }), + ) + .await?; + loop { + match read_frame(&mut self.stream).await? { + Frame::MatchList(list) => return Ok(list), + Frame::MatchEvent(ev) => self.pending_events.push_back(ev), + Frame::Error(e) => return Err(ClientError::Server(e)), + _ => { + return Err(ClientError::UnexpectedFrame { + got: "non-match-list", + }); + } + } + } + } + /// Cierre cooperativo. Consume el cliente. pub async fn farewell(mut self) -> Result<(), ClientError> { write_frame( diff --git a/crates/core/brahman-handshake/src/messages.rs b/crates/core/brahman-handshake/src/messages.rs index 5b8bcd0..50eb7ab 100644 --- a/crates/core/brahman-handshake/src/messages.rs +++ b/crates/core/brahman-handshake/src/messages.rs @@ -194,13 +194,30 @@ pub struct SessionList { pub entries: Vec, } +/// Pedido del listado de matches actuales del broker. La `session` +/// se valida igual que `ListSessions`. Si el server no tiene broker +/// configurado, devuelve la lista vacía (no es un error — refleja +/// que no hay matching activo). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListMatches { + pub session: SessionId, +} + +/// Respuesta a `ListMatches` con el snapshot de matches consumidor↔productor +/// actualmente computados por el broker. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MatchList { + pub matches: Vec, +} + /// Frame único de wire — discriminada por variante. Cada conexión es un /// stream de frames. /// /// Direcciones: -/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`. +/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`, +/// `ListMatches`. /// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`, -/// `SessionList`. +/// `SessionList`, `MatchList`. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Frame { Hello(Hello), @@ -212,4 +229,6 @@ pub enum Frame { MatchEvent(MatchEvent), ListSessions(ListSessions), SessionList(SessionList), + ListMatches(ListMatches), + MatchList(MatchList), } diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 9e681aa..a52093a 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -317,10 +317,19 @@ where }); // Reader loop principal. + let broker_for_loop = config.broker.clone(); let result: std::io::Result<()> = loop { match read_frame(&mut reader).await { Ok(frame) => { - match handle_inbound_frame(session_id, frame, &writer, &sessions).await { + match handle_inbound_frame( + session_id, + frame, + &writer, + &sessions, + broker_for_loop.as_ref(), + ) + .await + { Ok(true) => continue, Ok(false) => break Ok(()), Err(e) => break Err(e), @@ -350,6 +359,7 @@ async fn handle_inbound_frame( frame: Frame, writer: &Arc>>, sessions: &SessionRegistry, + broker_for_match: Option<&SharedBroker>, ) -> std::io::Result where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -405,6 +415,32 @@ where .await?; Ok(true) } + Frame::ListMatches(crate::messages::ListMatches { session }) + if session == session_id => + { + let matches = match &broker_for_match { + Some(b) => b.lock().await.all_matches(), + None => Vec::new(), + }; + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::MatchList(crate::messages::MatchList { matches }), + ) + .await?; + Ok(true) + } + Frame::ListMatches(_) => { + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } _ => { let mut w = writer.lock().await; write_frame( diff --git a/crates/shared/brahman-sidecar/src/discovery.rs b/crates/shared/brahman-sidecar/src/discovery.rs index 24ab3f9..000a604 100644 --- a/crates/shared/brahman-sidecar/src/discovery.rs +++ b/crates/shared/brahman-sidecar/src/discovery.rs @@ -197,6 +197,52 @@ pub fn list_sessions_blocking( rt.block_on(list_sessions(label)) } +/// Análogo a `list_sessions` pero pide los matches activos del +/// broker. La Card observer es la misma forma minimalista (sin +/// flow.input/output) — el endpoint no requiere participar en +/// matching. +pub async fn list_matches( + observer_label: impl Into, +) -> Result { + let init_path = transport::default_socket_path(); + let card = Card { + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::Oneshot, + priority: Priority::Normal, + kind: CardKind::Ente, + flow: Flows { + input: vec![], + output: vec![], + }, + ..Card::new(observer_label) + }; + + let mut client = Client::connect(&init_path, card) + .await + .map_err(|source| ConsumerError::Connect { + socket: init_path.clone(), + source, + })?; + + let list = client.list_matches().await?; + let _ = client.farewell().await; + Ok(list) +} + +/// Wrapper bloqueante de [`list_matches`]. +pub fn list_matches_blocking( + observer_label: impl Into, +) -> Result { + let label = observer_label.into(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .map_err(|e| ConsumerError::Runtime(e.to_string()))?; + rt.block_on(list_matches(label)) +} + fn describe_first_input(card: &Card) -> (String, String) { match card.flow.input.first() { Some(flow) => { diff --git a/crates/shared/brahman-sidecar/src/lib.rs b/crates/shared/brahman-sidecar/src/lib.rs index 988de1a..aef5c66 100644 --- a/crates/shared/brahman-sidecar/src/lib.rs +++ b/crates/shared/brahman-sidecar/src/lib.rs @@ -18,8 +18,8 @@ pub mod discovery; pub use discovery::{ - await_provider, await_provider_blocking, build_consumer_card, list_sessions, - list_sessions_blocking, ConsumerError, + await_provider, await_provider_blocking, build_consumer_card, list_matches, + list_matches_blocking, list_sessions, list_sessions_blocking, ConsumerError, }; use std::collections::HashMap;