feat(handshake): notificación push de matches del broker al cliente

El servidor empuja MatchEvent (Available | Lost) a los consumers cuando
sus inputs cambian de match — sea porque un productor llegó, porque
otro mejor lo desplazó, o porque desapareció.

Mecánica:

- Frame::MatchEvent con MatchEventKind { Available, Lost } y los datos
  del match (consumer_flow, producer_session/label/flow, ty, via, pinned).
- Server: SessionTxTable (Arc<Mutex<HashMap<SessionId, mpsc::Sender>>>)
  + LastMatches (último match conocido por consumer/input). En cada
  register/unregister, broadcast_match_diffs recomputa con el broker
  y emite SOLO los diffs respecto al estado anterior.
- Session::run_post_handshake usa tokio::select! para multiplexar
  read_frame del cliente y rx.recv() de su tx push.
- Cleanup ahora también limpia push_table y last_matches y dispara un
  broadcast (para notificar a quienes pierden el match).
- Client: VecDeque<MatchEvent> bufferea eventos que llegan mezclados
  con respuestas a Ping. API:
    - take_event() — non-blocking, drena buffer
    - await_event(timeout) — bloquea hasta evento o timeout
- ping() ahora drena MatchEvents intermedios hasta encontrar el Pong.

Capacity del canal push por sesión: 32 frames (try_send no-blocking;
si se llena, los eventos extra se descartan — se documenta como
ephemeral, el cliente puede re-consultar via brahman-status).

Test nuevo en brahman-handshake/tests/handshake.rs:
- match_event_pushed_on_producer_arrival: consumer espera, no recibe
  evento → llega productor → recibe Available → productor se va →
  recibe Lost.

Example nuevo: brahman-handshake/examples/subscriber.rs — cliente que
loguea cada MatchEvent en tiempo real. Útil para ver la dinámica del
broker. Pings cada 25s para keepalive.

Demo end-to-end verificada (4 eventos, 3 ya cubren el ciclo completo):

  T+0.3  alpha llega    → Available ← demo.alpha.out
  T+0.8  beta llega     → (sin evento: alpha gana por orden alfabético)
  T+1.3  alpha killed   → Available ← demo.beta.out (re-evaluación)
  T+1.8  beta killed    → Lost ← <none>

El broker emite diff: ningún evento cuando un nuevo productor llega
sin desplazar al ganador actual.

Tests: 28/28 (handshake integ 6→7). cargo check --workspace: 0 errores.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-08 15:43:41 +00:00
parent 70a7a0d46d
commit 8a83a26de0
6 changed files with 449 additions and 56 deletions
+200 -49
View File
@@ -5,15 +5,18 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use brahman_broker::Broker;
use brahman_broker::{Broker, Endpoint};
use brahman_card::{Card, ResolvedCard, CARD_SCHEMA_VERSION};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, warn};
use ulid::Ulid;
use crate::codec::{read_frame, write_frame};
use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Pong, SessionId};
use crate::messages::{
Farewell, Frame, HandshakeError, Hello, HelloAck, MatchEvent, MatchEventKind, Ping, Pong,
SessionId,
};
/// Tabla de sesiones vivas indexada por `SessionId`.
pub type SessionRegistry = Arc<Mutex<HashMap<SessionId, ResolvedCard>>>;
@@ -22,6 +25,18 @@ pub type SessionRegistry = Arc<Mutex<HashMap<SessionId, ResolvedCard>>>;
/// el ciclo de vida de las sesiones.
pub type SharedBroker = Arc<Mutex<Broker>>;
/// Tabla de canales push por sesión: el server inyecta frames hacia el
/// cliente (p. ej. `MatchEvent`) sin requerir que el cliente haga request.
type SessionTxTable = Arc<Mutex<HashMap<SessionId, mpsc::Sender<Frame>>>>;
/// Por sesión, último match conocido por nombre de input. Se usa para
/// emitir diffs (Available/Lost) en lugar del estado completo.
type LastMatches = Arc<Mutex<HashMap<SessionId, HashMap<String, Endpoint>>>>;
/// Capacidad del canal push por sesión. Si se llena (cliente lento), los
/// eventos extra se descartan — el cliente puede re-consultar el estado.
const PUSH_CHANNEL_CAPACITY: usize = 32;
/// Configuración del servidor.
#[derive(Debug, Clone, Default)]
pub struct ServerConfig {
@@ -38,6 +53,8 @@ pub struct Server {
listener: UnixListener,
socket_path: PathBuf,
sessions: SessionRegistry,
push_table: SessionTxTable,
last_matches: LastMatches,
config: ServerConfig,
}
@@ -59,6 +76,8 @@ impl Server {
listener,
socket_path,
sessions: Arc::new(Mutex::new(HashMap::new())),
push_table: Arc::new(Mutex::new(HashMap::new())),
last_matches: Arc::new(Mutex::new(HashMap::new())),
config,
})
}
@@ -81,6 +100,8 @@ impl Server {
Ok(Session {
stream,
sessions: self.sessions.clone(),
push_table: self.push_table.clone(),
last_matches: self.last_matches.clone(),
config: self.config.clone(),
})
}
@@ -114,6 +135,8 @@ impl Drop for Server {
pub struct Session {
stream: UnixStream,
sessions: SessionRegistry,
push_table: SessionTxTable,
last_matches: LastMatches,
config: ServerConfig,
}
@@ -132,64 +155,192 @@ impl Session {
}
async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> {
// Canal por donde el server inyecta frames push (MatchEvent, etc.).
let (tx, mut rx) = mpsc::channel::<Frame>(PUSH_CHANNEL_CAPACITY);
self.push_table.lock().await.insert(session_id, tx);
// Tras registrar el canal, recomputar matches y emitir diffs a
// todas las sesiones afectadas (incluida ésta, si tiene inputs).
self.broadcast_match_diffs().await;
loop {
let frame = match read_frame(&mut self.stream).await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
debug!(session = %session_id, "cliente cerró conexión sin Farewell");
return Ok(());
tokio::select! {
// Frame entrante del cliente.
res = read_frame(&mut self.stream) => {
match res {
Ok(frame) => {
if !self.handle_inbound_frame(session_id, frame).await? {
return Ok(());
}
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
debug!(session = %session_id, "cliente cerró sin Farewell");
return Ok(());
}
Err(e) => return Err(e),
}
}
Err(e) => return Err(e),
};
match frame {
Frame::Ping(Ping { session }) if session == session_id => {
let pong = Pong {
timestamp_ms: now_ms(),
};
write_frame(&mut self.stream, &Frame::Pong(pong)).await?;
}
Frame::Ping(_) => {
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Unauthorized(
"session-id no coincide".into(),
)),
)
.await?;
}
Frame::Farewell(Farewell { session }) if session == session_id => {
return Ok(());
}
Frame::Farewell(_) => {
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Unauthorized(
"session-id no coincide".into(),
)),
)
.await?;
}
_ => {
// Frame inesperado en estado post-handshake.
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Rejected(
"frame inesperado tras handshake".into(),
)),
)
.await?;
// Frame push desde el server (MatchEvent).
Some(frame) = rx.recv() => {
write_frame(&mut self.stream, &frame).await?;
}
}
}
}
/// Limpieza atómica de las dos vistas: registro de sesiones + broker.
/// Se ejecuta tanto si la sesión cierra por Farewell, EOF, o error.
/// Maneja un frame entrante. Devuelve `Ok(false)` si la sesión debe
/// cerrarse limpiamente (Farewell con session-id correcto).
async fn handle_inbound_frame(
&mut self,
session_id: SessionId,
frame: Frame,
) -> std::io::Result<bool> {
match frame {
Frame::Ping(Ping { session }) if session == session_id => {
let pong = Pong {
timestamp_ms: now_ms(),
};
write_frame(&mut self.stream, &Frame::Pong(pong)).await?;
Ok(true)
}
Frame::Ping(_) => {
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Unauthorized(
"session-id no coincide".into(),
)),
)
.await?;
Ok(true)
}
Frame::Farewell(Farewell { session }) if session == session_id => Ok(false),
Frame::Farewell(_) => {
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Unauthorized(
"session-id no coincide".into(),
)),
)
.await?;
Ok(true)
}
_ => {
write_frame(
&mut self.stream,
&Frame::Error(HandshakeError::Rejected(
"frame inesperado tras handshake".into(),
)),
)
.await?;
Ok(true)
}
}
}
/// Limpieza atómica de TRES vistas: registro de sesiones + broker +
/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF,
/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron
/// el match contra ésta.
async fn cleanup(&self, session_id: SessionId) {
self.sessions.lock().await.remove(&session_id);
self.push_table.lock().await.remove(&session_id);
self.last_matches.lock().await.remove(&session_id);
if let Some(broker) = &self.config.broker {
broker.lock().await.unregister(session_id);
}
self.broadcast_match_diffs().await;
}
/// Recomputa los matches para todas las sesiones registradas y empuja
/// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron
/// respecto al último estado conocido.
///
/// Se llama tras cada `register_session` y `cleanup`. Las inserciones
/// al canal usan `try_send` (non-blocking); si el cliente está lento
/// y se llena el buffer, los eventos extra se pierden — es ephemeral
/// y el cliente puede re-consultar el estado vía `brahman-status`.
async fn broadcast_match_diffs(&self) {
let broker = match &self.config.broker {
Some(b) => b,
None => return,
};
let b = broker.lock().await;
let push_table = self.push_table.lock().await;
let mut last = self.last_matches.lock().await;
debug!(
target: "brahman_handshake::broadcast",
cards = b.len(),
push_subscribers = push_table.len(),
"broadcast_match_diffs"
);
// Snapshot de cards para no tener que sostener el lock del broker.
let cards: Vec<_> = b.cards().cloned().collect();
for cons in &cards {
let cons_session = cons.session;
let tx = match push_table.get(&cons_session) {
Some(tx) => tx,
None => continue, // todavía no tiene canal push
};
let cons_last = last.entry(cons_session).or_default();
for input in &cons.inputs {
let new_match = b.find_producer_for(cons_session, &input.name);
let new_endpoint = new_match.as_ref().map(|m| m.producer.clone());
let old_endpoint = cons_last.get(&input.name).cloned();
if old_endpoint == new_endpoint {
continue;
}
if let Some(m) = &new_match {
let event = MatchEvent {
kind: MatchEventKind::Available,
consumer_flow: input.name.clone(),
producer_session: m.producer.session,
producer_label: m.producer_label.clone(),
producer_flow: m.producer.flow_name.clone(),
ty: m.ty.clone(),
via: m.via,
pinned: m.pinned,
};
let send_res = tx.try_send(Frame::MatchEvent(event));
debug!(
target: "brahman_handshake::broadcast",
consumer = %cons_session,
flow = %input.name,
producer = %m.producer_label,
result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e {
tokio::sync::mpsc::error::TrySendError::Full(_) => "full",
tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed",
}),
"Available pushed"
);
} else {
// Tenía match, ahora no.
let event = MatchEvent {
kind: MatchEventKind::Lost,
consumer_flow: input.name.clone(),
producer_session: Ulid::nil(),
producer_label: String::new(),
producer_flow: String::new(),
ty: input.ty.clone(),
via: brahman_broker::MatchStrategy::Exact,
pinned: false,
};
let _ = tx.try_send(Frame::MatchEvent(event));
}
if let Some(ep) = new_endpoint {
cons_last.insert(input.name.clone(), ep);
} else {
cons_last.remove(&input.name);
}
}
}
}
/// Lee el Hello, valida, registra la sesión y emite HelloAck.