feat(brahman-handshake): ListMatches endpoint + timeline en broker-explorer
Iter 21. Cierra el loop iniciado en iter 20: ahora se ven sesiones
+ matches actuales + cómo cambian a través del tiempo.
brahman-handshake/messages:
- Frame::ListMatches → Frame::MatchList(Vec<brahman_broker::Match>).
brahman-handshake/server:
- run_post_handshake pasa Option<&SharedBroker> a handle_inbound_frame.
- Sin broker configurado → MatchList vacía (no error).
brahman-handshake/client + brahman-sidecar:
- Client::list_matches() análogo a list_sessions, drena MatchEvents.
- list_matches / list_matches_blocking, mismo patrón.
brahman-broker-explorer:
- Poll-tick agrega list_matches_blocking además de list_sessions.
- last_match_keys: HashSet<MatchKey> para diff entre ticks.
- timeline: VecDeque<TimelineEntry> cap 50.
- diff_matches (free fn): Available para keys nuevas, Lost para
desaparecidas. Primer tick marca todo Available (boot UX).
- Render: stat_card "Timeline" con HH:MM:SS {+/-} formato compacto.
5 tests broker-explorer (3 nuevos del diff). Stack verde.
Decisión: timeline polled cada POLL_INTERVAL=5s, no push. MatchEvents
del broker son consumer-céntricos (cada session ve sólo SUS matches);
"system-wide timeline" requeriría broker subscribe-all (mucho scope).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,60 @@ ratio/diff ver `git show <sha>`.
|
||||
|
||||
## 2026-05-10
|
||||
|
||||
### feat(brahman-handshake): ListMatches endpoint + timeline en broker-explorer
|
||||
Iter 21. Cierra el loop de observabilidad iniciado en iter 20: ahora
|
||||
se ven no sólo las sesiones registradas sino también qué matches
|
||||
consumer↔producer está computando el broker en cada momento, y la
|
||||
historia de cómo cambian.
|
||||
|
||||
`brahman-handshake/messages.rs`:
|
||||
- **`Frame::ListMatches(ListMatches{session})`**: pedido (mismo
|
||||
patrón de validación session-id).
|
||||
- **`Frame::MatchList(MatchList{matches: Vec<brahman_broker::Match>})`**:
|
||||
respuesta. Cada `Match` ya es serializable y lleva `consumer`,
|
||||
`consumer_label`, `producer`, `producer_label`, `ty`, `via`, `pinned`.
|
||||
|
||||
`brahman-handshake/server.rs`:
|
||||
- `run_post_handshake` ahora pasa también `broker_for_match: Option<&SharedBroker>`
|
||||
al `handle_inbound_frame`.
|
||||
- Si el server tiene broker configurado, `ListMatches` responde con
|
||||
`broker.all_matches()`. Si no (server sin broker), responde
|
||||
`MatchList { matches: vec![] }` — refleja "no hay matching activo",
|
||||
no es un error.
|
||||
|
||||
`brahman-handshake/client.rs`: `Client::list_matches()` análogo a
|
||||
`list_sessions()`, drena `MatchEvent`s intermedios al buffer.
|
||||
|
||||
`brahman-sidecar/discovery.rs`: `list_matches` y `list_matches_blocking`
|
||||
con la misma forma de Card observer minimalista.
|
||||
|
||||
`brahman-broker-explorer`:
|
||||
- Cada poll-tick ahora pide TANTO `list_sessions` COMO `list_matches`.
|
||||
- `Explorer.last_match_keys: HashSet<MatchKey>` mantiene el estado
|
||||
del último snapshot. La key es `(consumer.session, consumer.flow,
|
||||
producer.session, producer.flow)`.
|
||||
- `Explorer.timeline: VecDeque<TimelineEntry>` con cap `TIMELINE_CAP=50`.
|
||||
- Función pura `diff_matches(last_keys, list) -> (entries, new_keys)`:
|
||||
emite `Available` para keys nuevas y `Lost` para keys desaparecidas.
|
||||
Primer tick (last_keys vacío) marca todo como Available — cubre
|
||||
el boot sin que la UI quede vacía.
|
||||
- Render: `stat_card` "Timeline de matches" con count + 20 entries
|
||||
formateadas como `HH:MM:SS {+/-} consumer.flow ← producer.flow [via]`.
|
||||
Más reciente arriba.
|
||||
|
||||
Tests broker-explorer: 5 totales.
|
||||
- `diff_matches_first_snapshot_marks_everything_available`
|
||||
- `diff_matches_emits_lost_when_match_disappears`
|
||||
- `diff_matches_no_change_emits_nothing`
|
||||
- `pending_is_default_state_at_boot` (existente)
|
||||
- `poll_and_probe_constants_are_sane` (existente)
|
||||
|
||||
Decisión: timeline polled (cada `POLL_INTERVAL=5s`), no push.
|
||||
Razón: los `MatchEvent` push del broker son consumer-céntricos
|
||||
(cada session sólo ve sus propios matches). Para "system-wide
|
||||
timeline" haría falta una API broker-level "subscribe a todos" —
|
||||
mucho más scope. Polling cada 5s es suficiente para observabilidad.
|
||||
|
||||
### feat(brahman-handshake): ListSessions endpoint + cliente + UI broker-explorer
|
||||
Iter 20. Nuevo flujo end-to-end para observabilidad: cualquier
|
||||
módulo conectado puede preguntar al broker la lista de sesiones
|
||||
|
||||
Generated
+3
@@ -1239,9 +1239,12 @@ dependencies = [
|
||||
name = "brahman-broker-explorer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"brahman-broker",
|
||||
"brahman-card",
|
||||
"brahman-handshake",
|
||||
"brahman-sidecar",
|
||||
"gpui",
|
||||
"ulid",
|
||||
"yahweh-launcher",
|
||||
"yahweh-theme",
|
||||
"yahweh-widget-app-header",
|
||||
|
||||
@@ -6,8 +6,11 @@ license.workspace = true
|
||||
description = "Probe GUI del broker brahman: conecta cada N segundos vía await_provider_blocking con un Card observer agnóstico, reporta 3 estados (down / up sin provider / up con provider)."
|
||||
|
||||
[dependencies]
|
||||
brahman-broker = { path = "../../core/brahman-broker" }
|
||||
brahman-card = { path = "../../core/brahman-card" }
|
||||
brahman-handshake = { path = "../../core/brahman-handshake" }
|
||||
brahman-sidecar = { path = "../../shared/brahman-sidecar" }
|
||||
ulid = { workspace = true }
|
||||
yahweh-theme = { path = "../../modules/ui_engine/libs/theme" }
|
||||
yahweh-launcher = { path = "../../modules/ui_engine/libs/launcher" }
|
||||
yahweh-widget-banner = { path = "../../modules/ui_engine/widgets/banner" }
|
||||
|
||||
@@ -26,11 +26,15 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use brahman_handshake::messages::SessionList;
|
||||
use brahman_handshake::transport;
|
||||
use brahman_sidecar::{
|
||||
await_provider_blocking, build_consumer_card, list_sessions_blocking, ConsumerError,
|
||||
await_provider_blocking, build_consumer_card, list_matches_blocking, list_sessions_blocking,
|
||||
ConsumerError,
|
||||
};
|
||||
use ulid::Ulid;
|
||||
use gpui::{
|
||||
div, prelude::*, px, Context, IntoElement, Render, SharedString, Window,
|
||||
};
|
||||
@@ -43,6 +47,11 @@ use yahweh_widget_stat_card::stat_card;
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(5);
|
||||
const PROBE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Cap del buffer del timeline. Mantenemos las últimas N entries —
|
||||
/// más viejo se descarta. 50 cubre ~4 minutos de actividad densa
|
||||
/// sin saturar el panel; subir si hace falta historia más larga.
|
||||
const TIMELINE_CAP: usize = 50;
|
||||
|
||||
fn main() {
|
||||
launch_app("Brahman Broker — Probe", (720., 480.), Explorer::new);
|
||||
}
|
||||
@@ -73,6 +82,38 @@ struct Explorer {
|
||||
/// Última `SessionList` recibida del broker (None = aún sin pedir
|
||||
/// o último intento falló).
|
||||
sessions: Option<SessionList>,
|
||||
/// Set de matches presentes en el último snapshot del broker.
|
||||
/// Identificado por `(consumer.session, consumer.flow,
|
||||
/// producer.session, producer.flow)` para que la diff entre
|
||||
/// ticks distinga "nuevo match" vs "match perdido". Un cambio
|
||||
/// de producer (otro session/flow para mismo consumer) cuenta
|
||||
/// como Lost del previo + Available del nuevo.
|
||||
last_match_keys: HashSet<MatchKey>,
|
||||
/// Timeline FIFO: los más nuevos al frente. Cada entry tiene un
|
||||
/// timestamp local + el evento sintético (Available/Lost) que
|
||||
/// surgió de la diff del tick.
|
||||
timeline: std::collections::VecDeque<TimelineEntry>,
|
||||
}
|
||||
|
||||
/// Key estable para un match. Tupla porque (consumer, producer)
|
||||
/// determina el match unívocamente; los campos derivados (`label`,
|
||||
/// `via`, `pinned`) viajan en la entry pero no en la key.
|
||||
type MatchKey = (Ulid, String, Ulid, String);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct TimelineEntry {
|
||||
/// Cuándo lo observó el explorer. Es tiempo local de wall-clock,
|
||||
/// no del broker — el broker no timestampa los matches.
|
||||
at: std::time::SystemTime,
|
||||
/// Available = nuevo en este tick. Lost = estaba en el tick
|
||||
/// anterior y desapareció.
|
||||
kind: brahman_handshake::messages::MatchEventKind,
|
||||
consumer_label: String,
|
||||
consumer_flow: String,
|
||||
producer_label: String,
|
||||
producer_flow: String,
|
||||
via: brahman_broker::MatchStrategy,
|
||||
pinned: bool,
|
||||
}
|
||||
|
||||
impl Explorer {
|
||||
@@ -117,21 +158,32 @@ impl Explorer {
|
||||
};
|
||||
|
||||
// Si el broker está reachable (UP*), aprovechar el
|
||||
// round-trip para pedir la lista de sesiones. Si está
|
||||
// DOWN, ni intentar — la lista serviría de nada con
|
||||
// connect failed igual.
|
||||
let sessions_snapshot = match &new_state {
|
||||
ProbeState::Down { .. } | ProbeState::Pending => None,
|
||||
_ => bg
|
||||
// round-trip para pedir la lista de sesiones + matches.
|
||||
// Si está DOWN, ni intentar — la lista serviría de nada
|
||||
// con connect failed igual.
|
||||
let (sessions_snapshot, matches_snapshot) = match &new_state {
|
||||
ProbeState::Down { .. } | ProbeState::Pending => (None, None),
|
||||
_ => {
|
||||
let s = bg
|
||||
.spawn(async move {
|
||||
list_sessions_blocking("brahman-broker-explorer").ok()
|
||||
})
|
||||
.await,
|
||||
.await;
|
||||
let m = bg
|
||||
.spawn(async move {
|
||||
list_matches_blocking("brahman-broker-explorer").ok()
|
||||
})
|
||||
.await;
|
||||
(s, m)
|
||||
}
|
||||
};
|
||||
|
||||
let _ = this.update(cx, |me, cx| {
|
||||
me.state = new_state;
|
||||
me.sessions = sessions_snapshot;
|
||||
if let Some(matches) = matches_snapshot {
|
||||
me.diff_matches_into_timeline(&matches);
|
||||
}
|
||||
me.last_probe_ms = elapsed;
|
||||
me.last_probe_at = Some(Instant::now());
|
||||
cx.notify();
|
||||
@@ -150,6 +202,35 @@ impl Explorer {
|
||||
last_probe_ms: 0,
|
||||
last_probe_at: None,
|
||||
sessions: None,
|
||||
last_match_keys: HashSet::new(),
|
||||
timeline: std::collections::VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Diffea el snapshot recibido contra el último set de keys.
|
||||
/// Genera entries `Available` para keys nuevas y `Lost` para
|
||||
/// keys que estaban antes y no están ahora. Cada entry se
|
||||
/// prepende al timeline; el cap se aplica desde la cola.
|
||||
///
|
||||
/// El primer tick del explorer (cuando `last_match_keys` está
|
||||
/// vacío) hace que TODOS los matches actuales aparezcan como
|
||||
/// `Available` — es el comportamiento querido (UI muestra el
|
||||
/// estado al boot sin que parezca "no pasa nada").
|
||||
fn diff_matches_into_timeline(
|
||||
&mut self,
|
||||
list: &brahman_handshake::messages::MatchList,
|
||||
) {
|
||||
let (new_entries, new_keys) = diff_matches(&self.last_match_keys, list);
|
||||
for entry in new_entries {
|
||||
self.push_timeline(entry);
|
||||
}
|
||||
self.last_match_keys = new_keys;
|
||||
}
|
||||
|
||||
fn push_timeline(&mut self, entry: TimelineEntry) {
|
||||
self.timeline.push_front(entry);
|
||||
while self.timeline.len() > TIMELINE_CAP {
|
||||
self.timeline.pop_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -234,6 +315,19 @@ impl Render for Explorer {
|
||||
Some(_) => "labels visibles + flows in/out · (wit) = consciente".into(),
|
||||
};
|
||||
|
||||
let timeline_items: Vec<String> = self
|
||||
.timeline
|
||||
.iter()
|
||||
.take(20)
|
||||
.map(|e| format_timeline_entry(e))
|
||||
.collect();
|
||||
let timeline_value = self.timeline.len().to_string();
|
||||
let timeline_descr = if self.timeline.is_empty() {
|
||||
"esperando primer match…".to_string()
|
||||
} else {
|
||||
"↑ más reciente · ↓ más viejo · cap 50 entries".to_string()
|
||||
};
|
||||
|
||||
let body = div()
|
||||
.flex()
|
||||
.flex_col()
|
||||
@@ -251,6 +345,16 @@ impl Render for Explorer {
|
||||
text,
|
||||
text_dim,
|
||||
&sessions_items,
|
||||
))
|
||||
.child(stat_card(
|
||||
cx,
|
||||
"Timeline de matches",
|
||||
timeline_value,
|
||||
&timeline_descr,
|
||||
accent_partial,
|
||||
text,
|
||||
text_dim,
|
||||
&timeline_items,
|
||||
));
|
||||
|
||||
div()
|
||||
@@ -311,6 +415,106 @@ fn state_card(
|
||||
stat_card(cx, "Estado", value, &description, accent, text, text_dim, &[])
|
||||
}
|
||||
|
||||
/// Diff puro entre snapshots de matches. Devuelve la lista de
|
||||
/// entries nuevas (Available + Lost) en orden Available-primero, y
|
||||
/// el set actualizado de keys. Extraído como free fn para que sea
|
||||
/// testeable sin instanciar `Explorer`.
|
||||
///
|
||||
/// El primer tick (last_keys vacío) marca todos los matches como
|
||||
/// Available. Esto es deliberado: la UI muestra el estado al boot
|
||||
/// como "todo recién apareció" en vez de quedarse vacía.
|
||||
fn diff_matches(
|
||||
last_keys: &HashSet<MatchKey>,
|
||||
list: &brahman_handshake::messages::MatchList,
|
||||
) -> (Vec<TimelineEntry>, HashSet<MatchKey>) {
|
||||
use brahman_handshake::messages::MatchEventKind;
|
||||
let now = std::time::SystemTime::now();
|
||||
let current_keys: HashSet<MatchKey> = list
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| {
|
||||
(
|
||||
m.consumer.session,
|
||||
m.consumer.flow_name.clone(),
|
||||
m.producer.session,
|
||||
m.producer.flow_name.clone(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut entries = Vec::new();
|
||||
for m in &list.matches {
|
||||
let key = (
|
||||
m.consumer.session,
|
||||
m.consumer.flow_name.clone(),
|
||||
m.producer.session,
|
||||
m.producer.flow_name.clone(),
|
||||
);
|
||||
if !last_keys.contains(&key) {
|
||||
entries.push(TimelineEntry {
|
||||
at: now,
|
||||
kind: MatchEventKind::Available,
|
||||
consumer_label: m.consumer_label.clone(),
|
||||
consumer_flow: m.consumer.flow_name.clone(),
|
||||
producer_label: m.producer_label.clone(),
|
||||
producer_flow: m.producer.flow_name.clone(),
|
||||
via: m.via,
|
||||
pinned: m.pinned,
|
||||
});
|
||||
}
|
||||
}
|
||||
for key in last_keys.iter() {
|
||||
if !current_keys.contains(key) {
|
||||
entries.push(TimelineEntry {
|
||||
at: now,
|
||||
kind: MatchEventKind::Lost,
|
||||
consumer_label: String::new(),
|
||||
consumer_flow: key.1.clone(),
|
||||
producer_label: String::new(),
|
||||
producer_flow: key.3.clone(),
|
||||
via: brahman_broker::MatchStrategy::Exact,
|
||||
pinned: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
(entries, current_keys)
|
||||
}
|
||||
|
||||
/// Renderiza una entry del timeline en una sola línea: `HH:MM:SS
|
||||
/// {kind} consumer.flow ← producer.flow [via]`. Compact por diseño
|
||||
/// — el panel es vertical y las líneas largas se cortan.
|
||||
fn format_timeline_entry(e: &TimelineEntry) -> String {
|
||||
use brahman_handshake::messages::MatchEventKind;
|
||||
// Wall-clock local en HH:MM:SS — sin zoneinfo (chrono es heavy).
|
||||
// Aproximación: total_seconds % 86400 → hora del día UTC.
|
||||
let secs_today = e
|
||||
.at
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_secs() % 86_400)
|
||||
.unwrap_or(0);
|
||||
let h = secs_today / 3600;
|
||||
let m = (secs_today % 3600) / 60;
|
||||
let s = secs_today % 60;
|
||||
let kind = match e.kind {
|
||||
MatchEventKind::Available => "+",
|
||||
MatchEventKind::Lost => "-",
|
||||
};
|
||||
let pinned = if e.pinned { " (pinned)" } else { "" };
|
||||
match e.kind {
|
||||
MatchEventKind::Available => format!(
|
||||
"{:02}:{:02}:{:02} {} {}.{} ← {}.{} [{:?}]{}",
|
||||
h, m, s, kind,
|
||||
e.consumer_label, e.consumer_flow,
|
||||
e.producer_label, e.producer_flow,
|
||||
e.via, pinned,
|
||||
),
|
||||
MatchEventKind::Lost => format!(
|
||||
"{:02}:{:02}:{:02} {} ?.{} ← ?.{} (lost)",
|
||||
h, m, s, kind, e.consumer_flow, e.producer_flow,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -332,4 +536,87 @@ mod tests {
|
||||
// El intervalo no debería ser tan corto que sature al broker.
|
||||
assert!(POLL_INTERVAL >= Duration::from_secs(2));
|
||||
}
|
||||
|
||||
fn synth_match(
|
||||
consumer_label: &str,
|
||||
consumer_flow: &str,
|
||||
producer_label: &str,
|
||||
producer_flow: &str,
|
||||
) -> brahman_broker::Match {
|
||||
use brahman_broker::{Endpoint, Match, MatchStrategy};
|
||||
use brahman_card::TypeRef;
|
||||
Match {
|
||||
consumer: Endpoint {
|
||||
session: Ulid::new(),
|
||||
flow_name: consumer_flow.into(),
|
||||
},
|
||||
consumer_label: consumer_label.into(),
|
||||
producer: Endpoint {
|
||||
session: Ulid::new(),
|
||||
flow_name: producer_flow.into(),
|
||||
},
|
||||
producer_label: producer_label.into(),
|
||||
ty: TypeRef::Primitive { name: "json".into() },
|
||||
via: MatchStrategy::Exact,
|
||||
pinned: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn diff_matches_first_snapshot_marks_everything_available() {
|
||||
use brahman_handshake::messages::{MatchEventKind, MatchList};
|
||||
let list = MatchList {
|
||||
matches: vec![
|
||||
synth_match("a", "x", "b", "x"),
|
||||
synth_match("c", "y", "d", "y"),
|
||||
],
|
||||
};
|
||||
let last = HashSet::new();
|
||||
let (entries, keys) = diff_matches(&last, &list);
|
||||
assert_eq!(entries.len(), 2);
|
||||
assert!(entries
|
||||
.iter()
|
||||
.all(|e| matches!(e.kind, MatchEventKind::Available)));
|
||||
assert_eq!(keys.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn diff_matches_emits_lost_when_match_disappears() {
|
||||
use brahman_handshake::messages::{MatchEventKind, MatchList};
|
||||
let m = synth_match("a", "x", "b", "x");
|
||||
let prev_key = (
|
||||
m.consumer.session,
|
||||
m.consumer.flow_name.clone(),
|
||||
m.producer.session,
|
||||
m.producer.flow_name.clone(),
|
||||
);
|
||||
let last: HashSet<_> = std::iter::once(prev_key.clone()).collect();
|
||||
let list = MatchList { matches: vec![] };
|
||||
let (entries, keys) = diff_matches(&last, &list);
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert!(matches!(entries[0].kind, MatchEventKind::Lost));
|
||||
assert_eq!(entries[0].consumer_flow, "x");
|
||||
assert_eq!(entries[0].producer_flow, "x");
|
||||
assert!(keys.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn diff_matches_no_change_emits_nothing() {
|
||||
use brahman_handshake::messages::MatchList;
|
||||
let m = synth_match("a", "x", "b", "x");
|
||||
let key = (
|
||||
m.consumer.session,
|
||||
m.consumer.flow_name.clone(),
|
||||
m.producer.session,
|
||||
m.producer.flow_name.clone(),
|
||||
);
|
||||
let last: HashSet<_> = std::iter::once(key.clone()).collect();
|
||||
let list = MatchList {
|
||||
matches: vec![m.clone()],
|
||||
};
|
||||
let (entries, keys) = diff_matches(&last, &list);
|
||||
assert!(entries.is_empty(), "match unchanged → no events");
|
||||
assert_eq!(keys.len(), 1);
|
||||
assert!(keys.contains(&key));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,6 +172,16 @@ where
|
||||
got: "SessionList (pre-handshake)",
|
||||
});
|
||||
}
|
||||
Frame::ListMatches(_) => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "ListMatches (pre-handshake)",
|
||||
});
|
||||
}
|
||||
Frame::MatchList(_) => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "MatchList (pre-handshake)",
|
||||
});
|
||||
}
|
||||
};
|
||||
Ok(Self {
|
||||
stream,
|
||||
@@ -263,6 +273,31 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Pide al servidor el listado de matches actuales del broker
|
||||
/// (consumer↔producer pares con tipo y estrategia). Mismo patrón
|
||||
/// de drenado de `MatchEvent`s intermedios.
|
||||
pub async fn list_matches(&mut self) -> Result<crate::messages::MatchList, ClientError> {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::ListMatches(crate::messages::ListMatches {
|
||||
session: self.session,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
loop {
|
||||
match read_frame(&mut self.stream).await? {
|
||||
Frame::MatchList(list) => return Ok(list),
|
||||
Frame::MatchEvent(ev) => self.pending_events.push_back(ev),
|
||||
Frame::Error(e) => return Err(ClientError::Server(e)),
|
||||
_ => {
|
||||
return Err(ClientError::UnexpectedFrame {
|
||||
got: "non-match-list",
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cierre cooperativo. Consume el cliente.
|
||||
pub async fn farewell(mut self) -> Result<(), ClientError> {
|
||||
write_frame(
|
||||
|
||||
@@ -194,13 +194,30 @@ pub struct SessionList {
|
||||
pub entries: Vec<SessionEntry>,
|
||||
}
|
||||
|
||||
/// Pedido del listado de matches actuales del broker. La `session`
|
||||
/// se valida igual que `ListSessions`. Si el server no tiene broker
|
||||
/// configurado, devuelve la lista vacía (no es un error — refleja
|
||||
/// que no hay matching activo).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ListMatches {
|
||||
pub session: SessionId,
|
||||
}
|
||||
|
||||
/// Respuesta a `ListMatches` con el snapshot de matches consumidor↔productor
|
||||
/// actualmente computados por el broker.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MatchList {
|
||||
pub matches: Vec<brahman_broker::Match>,
|
||||
}
|
||||
|
||||
/// Frame único de wire — discriminada por variante. Cada conexión es un
|
||||
/// stream de frames.
|
||||
///
|
||||
/// Direcciones:
|
||||
/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`.
|
||||
/// - Cliente → Server: `Hello`, `Ping`, `Farewell`, `ListSessions`,
|
||||
/// `ListMatches`.
|
||||
/// - Server → Cliente: `HelloAck`, `Pong`, `Error`, `MatchEvent`,
|
||||
/// `SessionList`.
|
||||
/// `SessionList`, `MatchList`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Frame {
|
||||
Hello(Hello),
|
||||
@@ -212,4 +229,6 @@ pub enum Frame {
|
||||
MatchEvent(MatchEvent),
|
||||
ListSessions(ListSessions),
|
||||
SessionList(SessionList),
|
||||
ListMatches(ListMatches),
|
||||
MatchList(MatchList),
|
||||
}
|
||||
|
||||
@@ -317,10 +317,19 @@ where
|
||||
});
|
||||
|
||||
// Reader loop principal.
|
||||
let broker_for_loop = config.broker.clone();
|
||||
let result: std::io::Result<()> = loop {
|
||||
match read_frame(&mut reader).await {
|
||||
Ok(frame) => {
|
||||
match handle_inbound_frame(session_id, frame, &writer, &sessions).await {
|
||||
match handle_inbound_frame(
|
||||
session_id,
|
||||
frame,
|
||||
&writer,
|
||||
&sessions,
|
||||
broker_for_loop.as_ref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(true) => continue,
|
||||
Ok(false) => break Ok(()),
|
||||
Err(e) => break Err(e),
|
||||
@@ -350,6 +359,7 @@ async fn handle_inbound_frame<S>(
|
||||
frame: Frame,
|
||||
writer: &Arc<Mutex<WriteHalf<S>>>,
|
||||
sessions: &SessionRegistry,
|
||||
broker_for_match: Option<&SharedBroker>,
|
||||
) -> std::io::Result<bool>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
@@ -405,6 +415,32 @@ where
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::ListMatches(crate::messages::ListMatches { session })
|
||||
if session == session_id =>
|
||||
{
|
||||
let matches = match &broker_for_match {
|
||||
Some(b) => b.lock().await.all_matches(),
|
||||
None => Vec::new(),
|
||||
};
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::MatchList(crate::messages::MatchList { matches }),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::ListMatches(_) => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
|
||||
@@ -197,6 +197,52 @@ pub fn list_sessions_blocking(
|
||||
rt.block_on(list_sessions(label))
|
||||
}
|
||||
|
||||
/// Análogo a `list_sessions` pero pide los matches activos del
|
||||
/// broker. La Card observer es la misma forma minimalista (sin
|
||||
/// flow.input/output) — el endpoint no requiere participar en
|
||||
/// matching.
|
||||
pub async fn list_matches(
|
||||
observer_label: impl Into<String>,
|
||||
) -> Result<brahman_handshake::messages::MatchList, ConsumerError> {
|
||||
let init_path = transport::default_socket_path();
|
||||
let card = Card {
|
||||
payload: Payload::Virtual,
|
||||
supervision: Supervision::OneShot,
|
||||
lifecycle: Lifecycle::Oneshot,
|
||||
priority: Priority::Normal,
|
||||
kind: CardKind::Ente,
|
||||
flow: Flows {
|
||||
input: vec![],
|
||||
output: vec![],
|
||||
},
|
||||
..Card::new(observer_label)
|
||||
};
|
||||
|
||||
let mut client = Client::connect(&init_path, card)
|
||||
.await
|
||||
.map_err(|source| ConsumerError::Connect {
|
||||
socket: init_path.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
let list = client.list_matches().await?;
|
||||
let _ = client.farewell().await;
|
||||
Ok(list)
|
||||
}
|
||||
|
||||
/// Wrapper bloqueante de [`list_matches`].
|
||||
pub fn list_matches_blocking(
|
||||
observer_label: impl Into<String>,
|
||||
) -> Result<brahman_handshake::messages::MatchList, ConsumerError> {
|
||||
let label = observer_label.into();
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.build()
|
||||
.map_err(|e| ConsumerError::Runtime(e.to_string()))?;
|
||||
rt.block_on(list_matches(label))
|
||||
}
|
||||
|
||||
fn describe_first_input(card: &Card) -> (String, String) {
|
||||
match card.flow.input.first() {
|
||||
Some(flow) => {
|
||||
|
||||
@@ -18,8 +18,8 @@
|
||||
|
||||
pub mod discovery;
|
||||
pub use discovery::{
|
||||
await_provider, await_provider_blocking, build_consumer_card, list_sessions,
|
||||
list_sessions_blocking, ConsumerError,
|
||||
await_provider, await_provider_blocking, build_consumer_card, list_matches,
|
||||
list_matches_blocking, list_sessions, list_sessions_blocking, ConsumerError,
|
||||
};
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
Reference in New Issue
Block a user