Files
Sergio 8a83a26de0 feat(handshake): notificación push de matches del broker al cliente
El servidor empuja MatchEvent (Available | Lost) a los consumers cuando
sus inputs cambian de match — sea porque un productor llegó, porque
otro mejor lo desplazó, o porque desapareció.

Mecánica:

- Frame::MatchEvent con MatchEventKind { Available, Lost } y los datos
  del match (consumer_flow, producer_session/label/flow, ty, via, pinned).
- Server: SessionTxTable (Arc<Mutex<HashMap<SessionId, mpsc::Sender>>>)
  + LastMatches (último match conocido por consumer/input). En cada
  register/unregister, broadcast_match_diffs recomputa con el broker
  y emite SOLO los diffs respecto al estado anterior.
- Session::run_post_handshake usa tokio::select! para multiplexar
  read_frame del cliente y rx.recv() de su tx push.
- Cleanup ahora también limpia push_table y last_matches y dispara un
  broadcast (para notificar a quienes pierden el match).
- Client: VecDeque<MatchEvent> bufferea eventos que llegan mezclados
  con respuestas a Ping. API:
    - take_event() — non-blocking, drena buffer
    - await_event(timeout) — bloquea hasta evento o timeout
- ping() ahora drena MatchEvents intermedios hasta encontrar el Pong.

Capacity del canal push por sesión: 32 frames (try_send no-blocking;
si se llena, los eventos extra se descartan — se documenta como
ephemeral, el cliente puede re-consultar via brahman-status).

Test nuevo en brahman-handshake/tests/handshake.rs:
- match_event_pushed_on_producer_arrival: consumer espera, no recibe
  evento → llega productor → recibe Available → productor se va →
  recibe Lost.

Example nuevo: brahman-handshake/examples/subscriber.rs — cliente que
loguea cada MatchEvent en tiempo real. Útil para ver la dinámica del
broker. Pings cada 25s para keepalive.

Demo end-to-end verificada (4 eventos, 3 ya cubren el ciclo completo):

  T+0.3  alpha llega    → Available ← demo.alpha.out
  T+0.8  beta llega     → (sin evento: alpha gana por orden alfabético)
  T+1.3  alpha killed   → Available ← demo.beta.out (re-evaluación)
  T+1.8  beta killed    → Lost ← <none>

El broker emite diff: ningún evento cuando un nuevo productor llega
sin desplazar al ganador actual.

Tests: 28/28 (handshake integ 6→7). cargo check --workspace: 0 errores.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 15:43:41 +00:00

84 lines
2.5 KiB
Rust

//! `subscriber` — cliente brahman que loguea cada `MatchEvent` recibido.
//!
//! Declara una Card con un input `in` de tipo `json`. Cada vez que el
//! broker matchea (o desmatch) ese input contra un productor, imprime
//! una línea. Útil para visualizar la dinámica del broker en vivo.
//!
//! Uso:
//! ```sh
//! cargo run -p brahman-handshake --example subscriber [label]
//! ```
use std::collections::BTreeSet;
use std::time::Duration;
use brahman_card::{
ulid::Ulid, Card, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef,
CARD_SCHEMA_VERSION,
};
use brahman_handshake::{client::Client, transport};
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let label = std::env::args()
.nth(1)
.unwrap_or_else(|| "subscriber".into());
let card = Card {
schema_version: CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: label.clone(),
provides: BTreeSet::new(),
requires: BTreeSet::new(),
payload: Payload::Virtual,
supervision: Supervision::OneShot,
lifecycle: Lifecycle::Daemon,
priority: Priority::Normal,
flow: Flows {
input: vec![Flow {
name: "in".into(),
ty: TypeRef::Primitive {
name: "json".into(),
},
pin_to: None,
}],
output: vec![],
},
..Default::default()
};
let path = transport::default_socket_path();
eprintln!("[{label}] connecting to {}", path.display());
let mut client = Client::connect(&path, card).await?;
eprintln!(
"[{label}] attached: session={} init={}",
client.session(),
client.server_info().init_attached
);
// Loop: espera hasta 25s por un MatchEvent. Si timeout, ping para
// mantener la conexión viva.
loop {
match client.await_event(Duration::from_secs(25)).await? {
Some(ev) => {
eprintln!(
"[{label}] {:?} {}{}.{} via={:?}{}",
ev.kind,
ev.consumer_flow,
if ev.producer_label.is_empty() {
"<none>"
} else {
&ev.producer_label
},
ev.producer_flow,
ev.via,
if ev.pinned { " 📌" } else { "" }
);
}
None => {
let _ts = client.ping().await?;
}
}
}
}