feat(brahman-handshake): ListSessions endpoint + cliente + UI broker-explorer
Iter 20. Nuevo flujo end-to-end para observabilidad: cualquier módulo
conectado puede pedir al broker la lista de sesiones activas y mostrar
labels + flows in/out por cada una.
brahman-handshake/messages:
- Frame::ListSessions(ListSessions{session}) → Frame::SessionList(SessionList{entries}).
- SessionEntry: session, label, schema_version, outputs, inputs, conscious.
brahman-handshake/server:
- run_post_handshake pasa SessionRegistry a handle_inbound_frame.
- build_session_list helper proyecta el snapshot bajo lock.
- Validación session_id mismatched → Unauthorized.
brahman-handshake/client:
- Client::list_sessions() async, drena MatchEvents intermedios al
pending_events buffer, mismo patrón que ping().
brahman-sidecar/discovery:
- list_sessions / list_sessions_blocking arman Card observer mínima,
piden, Farewell.
brahman-broker-explorer:
- Poll-tick agrega list_sessions_blocking cuando broker está UP*.
- stat_card "Sesiones activas" con count + items ordenados por Ulid:
label · in:[flows] out:[flows] (wit)?.
Test list_sessions_returns_currently_registered: 3 clientes
conectados, observer pide list, verifica labels + schema_version
+ conscious=false. 24 handshake tests + sidecar + broker-explorer
verde.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -162,6 +162,16 @@ where
|
||||
got: "MatchEvent (pre-handshake)",
|
||||
});
|
||||
}
|
||||
Frame::ListSessions(_) => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "ListSessions (pre-handshake)",
|
||||
});
|
||||
}
|
||||
Frame::SessionList(_) => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "SessionList (pre-handshake)",
|
||||
});
|
||||
}
|
||||
};
|
||||
Ok(Self {
|
||||
stream,
|
||||
@@ -227,6 +237,32 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Pide al servidor el listado de sesiones activas. Pensado para
|
||||
/// observadores (broker-explorer, CLIs de diagnóstico). Como
|
||||
/// `ping`, los `MatchEvent` que lleguen intercalados se bufean
|
||||
/// en `pending_events` y no rompen la respuesta.
|
||||
pub async fn list_sessions(&mut self) -> Result<crate::messages::SessionList, ClientError> {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::ListSessions(crate::messages::ListSessions {
|
||||
session: self.session,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
loop {
|
||||
match read_frame(&mut self.stream).await? {
|
||||
Frame::SessionList(list) => return Ok(list),
|
||||
Frame::MatchEvent(ev) => self.pending_events.push_back(ev),
|
||||
Frame::Error(e) => return Err(ClientError::Server(e)),
|
||||
_ => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "non-session-list",
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cierre cooperativo. Consume el cliente.
|
||||
pub async fn farewell(mut self) -> Result<(), ClientError> {
|
||||
write_frame(
|
||||
|
||||
@@ -154,12 +154,53 @@ pub enum MatchEventKind {
|
||||
Lost,
|
||||
}
|
||||
|
||||
/// Pedido de listado de sesiones activas registradas en el broker. La
|
||||
/// `session` es el id propio del que pregunta — el server lo valida
|
||||
/// contra la sesión actual de la conexión, mismo patrón que `Ping`.
|
||||
///
|
||||
/// Pensado para herramientas de observabilidad (broker-explorer y
|
||||
/// CLIs de diagnóstico). No expone secrets: sólo metadata pública
|
||||
/// que el módulo ya anunció en su `Hello`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ListSessions {
|
||||
pub session: SessionId,
|
||||
}
|
||||
|
||||
/// Una entrada en la respuesta a `ListSessions`. Slim por diseño —
|
||||
/// el observer arma la UI con esto sin tener que abrir conexiones
|
||||
/// adicionales por sesión.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SessionEntry {
|
||||
pub session: SessionId,
|
||||
/// Label declarado en `WireCard.label` — el "nombre humano" del
|
||||
/// módulo.
|
||||
pub label: String,
|
||||
/// Versión del schema de Card que el módulo declaró.
|
||||
pub schema_version: u16,
|
||||
/// Nombres de los `flow.output` que la Card declara producir.
|
||||
pub outputs: Vec<String>,
|
||||
/// Nombres de los `flow.input` que la Card declara consumir.
|
||||
pub inputs: Vec<String>,
|
||||
/// `true` si el módulo se anunció como "consciente" (trajo
|
||||
/// `WitInterface` extraída en el Hello).
|
||||
pub conscious: bool,
|
||||
}
|
||||
|
||||
/// Respuesta a `ListSessions`. El orden no está garantizado — los
|
||||
/// clientes que necesiten estabilidad pueden ordenar por `session`
|
||||
/// (Ulid es ordenable temporal).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SessionList {
|
||||
pub entries: Vec<SessionEntry>,
|
||||
}
|
||||
|
||||
/// Frame único de wire — discriminada por variante. Cada conexión es un
|
||||
/// stream de frames.
|
||||
///
|
||||
/// Direcciones:
|
||||
/// - Cliente → Server: `Hello`, `Ping`, `Farewell`.
|
||||
/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`.
|
||||
/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`.
|
||||
/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`,
|
||||
/// `SessionList`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Frame {
|
||||
Hello(Hello),
|
||||
@@ -169,4 +210,6 @@ pub enum Frame {
|
||||
Farewell(Farewell),
|
||||
Error(HandshakeError),
|
||||
MatchEvent(MatchEvent),
|
||||
ListSessions(ListSessions),
|
||||
SessionList(SessionList),
|
||||
}
|
||||
|
||||
@@ -254,6 +254,7 @@ where
|
||||
let result = run_post_handshake(
|
||||
stream,
|
||||
session_id,
|
||||
sessions.clone(),
|
||||
push_table.clone(),
|
||||
last_matches.clone(),
|
||||
config.clone(),
|
||||
@@ -282,6 +283,7 @@ where
|
||||
async fn run_post_handshake<S>(
|
||||
stream: S,
|
||||
session_id: SessionId,
|
||||
sessions: SessionRegistry,
|
||||
push_table: SessionTxTable,
|
||||
last_matches: LastMatches,
|
||||
config: ServerConfig,
|
||||
@@ -317,11 +319,13 @@ where
|
||||
// Reader loop principal.
|
||||
let result: std::io::Result<()> = loop {
|
||||
match read_frame(&mut reader).await {
|
||||
Ok(frame) => match handle_inbound_frame(session_id, frame, &writer).await {
|
||||
Ok(true) => continue,
|
||||
Ok(false) => break Ok(()),
|
||||
Err(e) => break Err(e),
|
||||
},
|
||||
Ok(frame) => {
|
||||
match handle_inbound_frame(session_id, frame, &writer, &sessions).await {
|
||||
Ok(true) => continue,
|
||||
Ok(false) => break Ok(()),
|
||||
Err(e) => break Err(e),
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
debug!(session = %session_id, "cliente cerró sin Farewell");
|
||||
break Ok(());
|
||||
@@ -345,6 +349,7 @@ async fn handle_inbound_frame<S>(
|
||||
session_id: SessionId,
|
||||
frame: Frame,
|
||||
writer: &Arc<Mutex<WriteHalf<S>>>,
|
||||
sessions: &SessionRegistry,
|
||||
) -> std::io::Result<bool>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
@@ -381,6 +386,25 @@ where
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::ListSessions(crate::messages::ListSessions { session })
|
||||
if session == session_id =>
|
||||
{
|
||||
let list = build_session_list(sessions).await;
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(&mut *w, &Frame::SessionList(list)).await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::ListSessions(_) => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
@@ -395,6 +419,37 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Snapshot read-only de la `SessionRegistry` proyectado a la forma
|
||||
/// de wire para el frame `SessionList`. Suelta el lock antes de
|
||||
/// retornar para que el writer del frame no contenga el mutex.
|
||||
async fn build_session_list(sessions: &SessionRegistry) -> crate::messages::SessionList {
|
||||
let table = sessions.lock().await;
|
||||
let entries = table
|
||||
.iter()
|
||||
.map(|(id, resolved)| crate::messages::SessionEntry {
|
||||
session: *id,
|
||||
label: resolved.card.label.clone(),
|
||||
schema_version: resolved.card.schema_version,
|
||||
outputs: resolved
|
||||
.card
|
||||
.flow
|
||||
.output
|
||||
.iter()
|
||||
.map(|f| f.name.clone())
|
||||
.collect(),
|
||||
inputs: resolved
|
||||
.card
|
||||
.flow
|
||||
.input
|
||||
.iter()
|
||||
.map(|f| f.name.clone())
|
||||
.collect(),
|
||||
conscious: resolved.wit.is_some(),
|
||||
})
|
||||
.collect();
|
||||
crate::messages::SessionList { entries }
|
||||
}
|
||||
|
||||
/// Limpieza atómica de las vistas registradas + (si net activo) retiro
|
||||
/// de anuncios DHT de los outputs de la Card. Se ejecuta tanto si la
|
||||
/// sesión cierra por Farewell, EOF, o error. Tras desregistrar, emite
|
||||
|
||||
@@ -89,6 +89,71 @@ async fn full_handshake_roundtrip() {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_sessions_returns_currently_registered() {
|
||||
// Levantamos un server con broker (requerido para que el registro
|
||||
// pase por el path real) y conectamos 3 clientes. El último pide
|
||||
// ListSessions y debe ver a los 2 anteriores + a sí mismo.
|
||||
let path = sock_path("listsess");
|
||||
let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default())));
|
||||
let server = Server::bind(
|
||||
&path,
|
||||
ServerConfig {
|
||||
init_attached: true,
|
||||
broker: Some(broker),
|
||||
net: None,
|
||||
policy: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Una task accept loop genérica para los 3 clientes.
|
||||
let server_handle = tokio::spawn(async move {
|
||||
for _ in 0..3 {
|
||||
let session = server.accept_one().await.unwrap();
|
||||
tokio::spawn(async move {
|
||||
let _ = session.handle().await;
|
||||
});
|
||||
}
|
||||
// Mantener el server vivo para que las sesiones puedan
|
||||
// mantenerse abiertas mientras el observer pregunta.
|
||||
std::future::pending::<()>().await;
|
||||
});
|
||||
|
||||
let mut alpha = Client::connect(&path, sample_card("producer-alpha"))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut beta = Client::connect(&path, sample_card("producer-beta"))
|
||||
.await
|
||||
.unwrap();
|
||||
// observer es el que va a preguntar.
|
||||
let mut observer = Client::connect(&path, sample_card("observer"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let list = observer.list_sessions().await.unwrap();
|
||||
assert_eq!(list.entries.len(), 3, "deberían verse 3 sesiones activas");
|
||||
|
||||
let labels: BTreeSet<&str> = list.entries.iter().map(|e| e.label.as_str()).collect();
|
||||
assert!(labels.contains("producer-alpha"));
|
||||
assert!(labels.contains("producer-beta"));
|
||||
assert!(labels.contains("observer"));
|
||||
|
||||
// schema_version + conscious sanity en la propia entry del observer.
|
||||
let me = list
|
||||
.entries
|
||||
.iter()
|
||||
.find(|e| e.label == "observer")
|
||||
.unwrap();
|
||||
assert_eq!(me.schema_version, brahman_card::CARD_SCHEMA_VERSION);
|
||||
assert!(!me.conscious, "observer no envió WIT — debería ser agnostic");
|
||||
|
||||
alpha.farewell().await.unwrap();
|
||||
beta.farewell().await.unwrap();
|
||||
observer.farewell().await.unwrap();
|
||||
server_handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_invalid_card_client_side() {
|
||||
let path = sock_path("invalid");
|
||||
|
||||
Reference in New Issue
Block a user