diff --git a/CHANGELOG.md b/CHANGELOG.md index eb29652..64f76ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,56 @@ ratio/diff ver `git show `. ## 2026-05-10 +### feat(brahman-handshake): ListSessions endpoint + cliente + UI broker-explorer +Iter 20. Nuevo flujo end-to-end para observabilidad: cualquier +módulo conectado puede preguntar al broker la lista de sesiones +activas y mostrar labels + flows in/out por cada una. + +`brahman-handshake/messages.rs`: +- **`Frame::ListSessions(ListSessions { session })`**: request del + cliente (server valida que `session` coincida con la sesión vigente, + mismo patrón que Ping/Farewell). +- **`Frame::SessionList(SessionList { entries })`**: respuesta. + Cada `SessionEntry` lleva: `session`, `label`, `schema_version`, + `outputs` (nombres de flow.output), `inputs` (nombres de + flow.input), `conscious` (`true` si la Card vino con WIT). + +`brahman-handshake/server.rs`: +- `run_post_handshake` ahora pasa `SessionRegistry` a + `handle_inbound_frame` (necesario para consultar el snapshot de + sesiones en respuesta a `ListSessions`). +- Helper `build_session_list(sessions)` que toma el snapshot bajo + el lock, lo proyecta a `SessionList`, y suelta el lock antes de + escribir el frame al wire. +- Validación `session_id` mismatched → `HandshakeError::Unauthorized`. + +`brahman-handshake/client.rs`: +- `Client::list_sessions()` async: envía el request, drena + `MatchEvent`s intermedios al `pending_events` buffer (mismo patrón + que `ping`), retorna el `SessionList`. + +`brahman-sidecar/discovery.rs`: +- `pub async fn list_sessions(observer_label)` y + `pub fn list_sessions_blocking(observer_label)`: arman una Card + observer mínima (sin flow.input/output), conectan, piden la lista, + Farewell. Para CLIs y módulos std-thread. + +`brahman-broker-explorer`: +- Cada poll-tick (cuando el broker está UP*) ahora también pide + `list_sessions_blocking` y guarda el snapshot en `Explorer.sessions`. +- Render extiende el body con un `stat_card` "Sesiones activas" que + muestra el count + lista ordenada por `session` (Ulid temporal), + cada item: `label · in:[flows] out:[flows] (wit?)`. + +Tests: +- `list_sessions_returns_currently_registered`: levanta server con + broker, conecta 3 clientes (alpha, beta, observer), observer pide + `list_sessions`, verifica los 3 labels presentes y que la entry + del observer reporte `conscious=false` y el `schema_version` + esperado. +- Stack: handshake suite (24 tests), sidecar (3+8 unit + integ), + broker-explorer (4 tests). Todo verde. + ### feat(yahweh-launcher): F3 — extracción del shell standard de explorers Iter 19. Patrón con 4 consumers idénticos (nakui-explorer, nouser-explorer, minga-explorer, brahman-broker-explorer) declaraban diff --git a/crates/apps/brahman-broker-explorer/src/main.rs b/crates/apps/brahman-broker-explorer/src/main.rs index 17e6763..f777415 100644 --- a/crates/apps/brahman-broker-explorer/src/main.rs +++ b/crates/apps/brahman-broker-explorer/src/main.rs @@ -26,8 +26,11 @@ use std::path::PathBuf; use std::time::{Duration, Instant}; +use brahman_handshake::messages::SessionList; use brahman_handshake::transport; -use brahman_sidecar::{await_provider_blocking, build_consumer_card, ConsumerError}; +use brahman_sidecar::{ + await_provider_blocking, build_consumer_card, list_sessions_blocking, ConsumerError, +}; use gpui::{ div, prelude::*, px, Context, IntoElement, Render, SharedString, Window, }; @@ -67,6 +70,9 @@ struct Explorer { state: ProbeState, last_probe_ms: u64, last_probe_at: Option, + /// Última `SessionList` recibida del broker (None = aún sin pedir + /// o último intento falló). + sessions: Option, } impl Explorer { @@ -110,8 +116,22 @@ impl Explorer { }, }; + // Si el broker está reachable (UP*), aprovechar el + // round-trip para pedir la lista de sesiones. Si está + // DOWN, ni intentar — la lista serviría de nada con + // connect failed igual. + let sessions_snapshot = match &new_state { + ProbeState::Down { .. } | ProbeState::Pending => None, + _ => bg + .spawn(async move { + list_sessions_blocking("brahman-broker-explorer").ok() + }) + .await, + }; + let _ = this.update(cx, |me, cx| { me.state = new_state; + me.sessions = sessions_snapshot; me.last_probe_ms = elapsed; me.last_probe_at = Some(Instant::now()); cx.notify(); @@ -129,6 +149,7 @@ impl Explorer { state: ProbeState::Pending, last_probe_ms: 0, last_probe_at: None, + sessions: None, } } } @@ -178,6 +199,41 @@ impl Render for Explorer { )), }; + let sessions_items: Vec = self + .sessions + .as_ref() + .map(|list| { + let mut entries: Vec<_> = list.entries.iter().collect(); + // Orden estable por session id (Ulid es ordenable + // temporal); útil para que la UI no se reordene + // entre ticks aunque el HashMap del server sí. + entries.sort_by_key(|e| e.session); + entries + .iter() + .map(|e| { + format!( + "{} · in:[{}] out:[{}]{}", + e.label, + e.inputs.join(","), + e.outputs.join(","), + if e.conscious { " (wit)" } else { "" } + ) + }) + .collect() + }) + .unwrap_or_default(); + + let sessions_count_value = self + .sessions + .as_ref() + .map(|l| l.entries.len().to_string()) + .unwrap_or_else(|| "—".into()); + let sessions_descr = match &self.sessions { + None => "lista no disponible (broker DOWN o pendiente)".to_string(), + Some(l) if l.entries.is_empty() => "sin sesiones registradas en el broker".into(), + Some(_) => "labels visibles + flows in/out · (wit) = consciente".into(), + }; + let body = div() .flex() .flex_col() @@ -185,7 +241,17 @@ impl Render for Explorer { .px(px(16.)) .py(px(16.)) .child(state_card(cx, &self.state, text, text_dim, accent_up, - accent_partial, accent_down, accent_pending)); + accent_partial, accent_down, accent_pending)) + .child(stat_card( + cx, + "Sesiones activas", + sessions_count_value, + &sessions_descr, + accent_up, + text, + text_dim, + &sessions_items, + )); div() .flex() diff --git a/crates/core/brahman-handshake/src/client.rs b/crates/core/brahman-handshake/src/client.rs index 8977f37..efe27de 100644 --- a/crates/core/brahman-handshake/src/client.rs +++ b/crates/core/brahman-handshake/src/client.rs @@ -162,6 +162,16 @@ where got: "MatchEvent (pre-handshake)", }); } + Frame::ListSessions(_) => { + return Err(ClientError::UnexpectedFrame { + got: "ListSessions (pre-handshake)", + }); + } + Frame::SessionList(_) => { + return Err(ClientError::UnexpectedFrame { + got: "SessionList (pre-handshake)", + }); + } }; Ok(Self { stream, @@ -227,6 +237,32 @@ where } } + /// Pide al servidor el listado de sesiones activas. Pensado para + /// observadores (broker-explorer, CLIs de diagnóstico). Como + /// `ping`, los `MatchEvent` que lleguen intercalados se bufean + /// en `pending_events` y no rompen la respuesta. + pub async fn list_sessions(&mut self) -> Result { + write_frame( + &mut self.stream, + &Frame::ListSessions(crate::messages::ListSessions { + session: self.session, + }), + ) + .await?; + loop { + match read_frame(&mut self.stream).await? { + Frame::SessionList(list) => return Ok(list), + Frame::MatchEvent(ev) => self.pending_events.push_back(ev), + Frame::Error(e) => return Err(ClientError::Server(e)), + _ => { + return Err(ClientError::UnexpectedFrame { + got: "non-session-list", + }); + } + } + } + } + /// Cierre cooperativo. Consume el cliente. pub async fn farewell(mut self) -> Result<(), ClientError> { write_frame( diff --git a/crates/core/brahman-handshake/src/messages.rs b/crates/core/brahman-handshake/src/messages.rs index 05e31b1..5b8bcd0 100644 --- a/crates/core/brahman-handshake/src/messages.rs +++ b/crates/core/brahman-handshake/src/messages.rs @@ -154,12 +154,53 @@ pub enum MatchEventKind { Lost, } +/// Pedido de listado de sesiones activas registradas en el broker. La +/// `session` es el id propio del que pregunta — el server lo valida +/// contra la sesión actual de la conexión, mismo patrón que `Ping`. +/// +/// Pensado para herramientas de observabilidad (broker-explorer y +/// CLIs de diagnóstico). No expone secrets: sólo metadata pública +/// que el módulo ya anunció en su `Hello`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListSessions { + pub session: SessionId, +} + +/// Una entrada en la respuesta a `ListSessions`. Slim por diseño — +/// el observer arma la UI con esto sin tener que abrir conexiones +/// adicionales por sesión. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionEntry { + pub session: SessionId, + /// Label declarado en `WireCard.label` — el "nombre humano" del + /// módulo. + pub label: String, + /// Versión del schema de Card que el módulo declaró. + pub schema_version: u16, + /// Nombres de los `flow.output` que la Card declara producir. + pub outputs: Vec, + /// Nombres de los `flow.input` que la Card declara consumir. + pub inputs: Vec, + /// `true` si el módulo se anunció como "consciente" (trajo + /// `WitInterface` extraída en el Hello). + pub conscious: bool, +} + +/// Respuesta a `ListSessions`. El orden no está garantizado — los +/// clientes que necesiten estabilidad pueden ordenar por `session` +/// (Ulid es ordenable temporal). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionList { + pub entries: Vec, +} + /// Frame único de wire — discriminada por variante. Cada conexión es un /// stream de frames. /// /// Direcciones: -/// - Cliente → Server: `Hello`, `Ping`, `Farewell`. -/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`. +/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`. +/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`, +/// `SessionList`. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Frame { Hello(Hello), @@ -169,4 +210,6 @@ pub enum Frame { Farewell(Farewell), Error(HandshakeError), MatchEvent(MatchEvent), + ListSessions(ListSessions), + SessionList(SessionList), } diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index a04d68c..9e681aa 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -254,6 +254,7 @@ where let result = run_post_handshake( stream, session_id, + sessions.clone(), push_table.clone(), last_matches.clone(), config.clone(), @@ -282,6 +283,7 @@ where async fn run_post_handshake( stream: S, session_id: SessionId, + sessions: SessionRegistry, push_table: SessionTxTable, last_matches: LastMatches, config: ServerConfig, @@ -317,11 +319,13 @@ where // Reader loop principal. let result: std::io::Result<()> = loop { match read_frame(&mut reader).await { - Ok(frame) => match handle_inbound_frame(session_id, frame, &writer).await { - Ok(true) => continue, - Ok(false) => break Ok(()), - Err(e) => break Err(e), - }, + Ok(frame) => { + match handle_inbound_frame(session_id, frame, &writer, &sessions).await { + Ok(true) => continue, + Ok(false) => break Ok(()), + Err(e) => break Err(e), + } + } Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { debug!(session = %session_id, "cliente cerró sin Farewell"); break Ok(()); @@ -345,6 +349,7 @@ async fn handle_inbound_frame( session_id: SessionId, frame: Frame, writer: &Arc>>, + sessions: &SessionRegistry, ) -> std::io::Result where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -381,6 +386,25 @@ where .await?; Ok(true) } + Frame::ListSessions(crate::messages::ListSessions { session }) + if session == session_id => + { + let list = build_session_list(sessions).await; + let mut w = writer.lock().await; + write_frame(&mut *w, &Frame::SessionList(list)).await?; + Ok(true) + } + Frame::ListSessions(_) => { + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } _ => { let mut w = writer.lock().await; write_frame( @@ -395,6 +419,37 @@ where } } +/// Snapshot read-only de la `SessionRegistry` proyectado a la forma +/// de wire para el frame `SessionList`. Suelta el lock antes de +/// retornar para que el writer del frame no contenga el mutex. +async fn build_session_list(sessions: &SessionRegistry) -> crate::messages::SessionList { + let table = sessions.lock().await; + let entries = table + .iter() + .map(|(id, resolved)| crate::messages::SessionEntry { + session: *id, + label: resolved.card.label.clone(), + schema_version: resolved.card.schema_version, + outputs: resolved + .card + .flow + .output + .iter() + .map(|f| f.name.clone()) + .collect(), + inputs: resolved + .card + .flow + .input + .iter() + .map(|f| f.name.clone()) + .collect(), + conscious: resolved.wit.is_some(), + }) + .collect(); + crate::messages::SessionList { entries } +} + /// Limpieza atómica de las vistas registradas + (si net activo) retiro /// de anuncios DHT de los outputs de la Card. Se ejecuta tanto si la /// sesión cierra por Farewell, EOF, o error. Tras desregistrar, emite diff --git a/crates/core/brahman-handshake/tests/handshake.rs b/crates/core/brahman-handshake/tests/handshake.rs index ba173de..ebe8cd1 100644 --- a/crates/core/brahman-handshake/tests/handshake.rs +++ b/crates/core/brahman-handshake/tests/handshake.rs @@ -89,6 +89,71 @@ async fn full_handshake_roundtrip() { .unwrap(); } +#[tokio::test] +async fn list_sessions_returns_currently_registered() { + // Levantamos un server con broker (requerido para que el registro + // pase por el path real) y conectamos 3 clientes. El último pide + // ListSessions y debe ver a los 2 anteriores + a sí mismo. + let path = sock_path("listsess"); + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let server = Server::bind( + &path, + ServerConfig { + init_attached: true, + broker: Some(broker), + net: None, + policy: None, + }, + ) + .unwrap(); + + // Una task accept loop genérica para los 3 clientes. + let server_handle = tokio::spawn(async move { + for _ in 0..3 { + let session = server.accept_one().await.unwrap(); + tokio::spawn(async move { + let _ = session.handle().await; + }); + } + // Mantener el server vivo para que las sesiones puedan + // mantenerse abiertas mientras el observer pregunta. + std::future::pending::<()>().await; + }); + + let mut alpha = Client::connect(&path, sample_card("producer-alpha")) + .await + .unwrap(); + let mut beta = Client::connect(&path, sample_card("producer-beta")) + .await + .unwrap(); + // observer es el que va a preguntar. + let mut observer = Client::connect(&path, sample_card("observer")) + .await + .unwrap(); + + let list = observer.list_sessions().await.unwrap(); + assert_eq!(list.entries.len(), 3, "deberían verse 3 sesiones activas"); + + let labels: BTreeSet<&str> = list.entries.iter().map(|e| e.label.as_str()).collect(); + assert!(labels.contains("producer-alpha")); + assert!(labels.contains("producer-beta")); + assert!(labels.contains("observer")); + + // schema_version + conscious sanity en la propia entry del observer. + let me = list + .entries + .iter() + .find(|e| e.label == "observer") + .unwrap(); + assert_eq!(me.schema_version, brahman_card::CARD_SCHEMA_VERSION); + assert!(!me.conscious, "observer no envió WIT — debería ser agnostic"); + + alpha.farewell().await.unwrap(); + beta.farewell().await.unwrap(); + observer.farewell().await.unwrap(); + server_handle.abort(); +} + #[tokio::test] async fn rejects_invalid_card_client_side() { let path = sock_path("invalid"); diff --git a/crates/shared/brahman-sidecar/src/discovery.rs b/crates/shared/brahman-sidecar/src/discovery.rs index 2f3da92..24ab3f9 100644 --- a/crates/shared/brahman-sidecar/src/discovery.rs +++ b/crates/shared/brahman-sidecar/src/discovery.rs @@ -142,9 +142,61 @@ pub fn await_provider_blocking( .enable_time() .build() .map_err(|e| ConsumerError::Runtime(e.to_string()))?; + rt.block_on(await_provider(consumer_card, timeout)) } +/// Conecta al brahman-init con una Card observer (sin inputs ni +/// outputs) y pide la lista de sesiones activas. Útil para +/// herramientas de observabilidad (broker-explorer, CLIs). +/// +/// El observer se identifica con `observer_label`. La sesión se +/// cierra con Farewell antes de retornar (best-effort). +pub async fn list_sessions( + observer_label: impl Into, +) -> Result { + let init_path = transport::default_socket_path(); + // Card mínima sin flow.input/output: el observer no participa en + // matching, sólo establece sesión para poder consultar. + let card = Card { + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::Oneshot, + priority: Priority::Normal, + kind: CardKind::Ente, + flow: Flows { + input: vec![], + output: vec![], + }, + ..Card::new(observer_label) + }; + + let mut client = Client::connect(&init_path, card) + .await + .map_err(|source| ConsumerError::Connect { + socket: init_path.clone(), + source, + })?; + + let list = client.list_sessions().await?; + let _ = client.farewell().await; + Ok(list) +} + +/// Wrapper bloqueante de [`list_sessions`]. Idéntico patrón a +/// `await_provider_blocking`: runtime current_thread efímero. +pub fn list_sessions_blocking( + observer_label: impl Into, +) -> Result { + let label = observer_label.into(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .map_err(|e| ConsumerError::Runtime(e.to_string()))?; + rt.block_on(list_sessions(label)) +} + fn describe_first_input(card: &Card) -> (String, String) { match card.flow.input.first() { Some(flow) => { diff --git a/crates/shared/brahman-sidecar/src/lib.rs b/crates/shared/brahman-sidecar/src/lib.rs index 2ea7f67..988de1a 100644 --- a/crates/shared/brahman-sidecar/src/lib.rs +++ b/crates/shared/brahman-sidecar/src/lib.rs @@ -18,7 +18,8 @@ pub mod discovery; pub use discovery::{ - await_provider, await_provider_blocking, build_consumer_card, ConsumerError, + await_provider, await_provider_blocking, build_consumer_card, list_sessions, + list_sessions_blocking, ConsumerError, }; use std::collections::HashMap;