feat(handshake): integra el broker con el ciclo de sesiones
ServerConfig acepta un Option<Arc<Mutex<Broker>>> compartido. Cuando está
presente, el servidor lo mantiene en sincronía con las sesiones:
- Tras un Hello aceptado, register_session indexa la Card en el broker
ANTES de insertar en el SessionRegistry y de emitir HelloAck.
- Al cerrar la sesión (Farewell, EOF, o error en run_post_handshake), un
cleanup() unificado llama unregister en el broker y remove en el
SessionRegistry. Garantizado por refactor de Session::handle a
do_handshake → run_post_handshake → cleanup.
Tests nuevos en handshake.rs:
- broker_registers_and_unregisters_with_session: confirma el ciclo
register → farewell → unregister.
- broker_matches_two_live_modules: dos clientes (productor + consumidor)
conectados; el broker resuelve find_producer_for(consumer.session, "in")
→ producer "dht". Tras farewell del productor, el match desaparece.
Fix colateral: brahman-card::TypeRef pasa de internally-tagged
(#[serde(tag = "kind")]) a externally-tagged (default). Postcard no
soporta internally-tagged en formatos no self-describing — sin este
cambio el wire de Hello con Cards que tengan flujos no codificaba.
JSON cambia de {"kind":"primitive","name":"x"} a
{"primitive":{"name":"x"}}. Documentado en el doc-comment de TypeRef.
26/26 tests verdes (broker 11 + card 8 + handshake codec 1 + integ 6).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5,7 +5,8 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use brahman_card::{ResolvedCard, CARD_SCHEMA_VERSION};
|
||||
use brahman_broker::Broker;
|
||||
use brahman_card::{Card, ResolvedCard, CARD_SCHEMA_VERSION};
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
@@ -17,19 +18,19 @@ use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Po
|
||||
/// Tabla de sesiones vivas indexada por `SessionId`.
|
||||
pub type SessionRegistry = Arc<Mutex<HashMap<SessionId, ResolvedCard>>>;
|
||||
|
||||
/// Broker compartido (opcional) que el servidor mantiene en sincronía con
|
||||
/// el ciclo de vida de las sesiones.
|
||||
pub type SharedBroker = Arc<Mutex<Broker>>;
|
||||
|
||||
/// Configuración del servidor.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ServerConfig {
|
||||
/// `true` si el Init está atado al servidor (se reporta en `HelloAck`).
|
||||
pub init_attached: bool,
|
||||
}
|
||||
|
||||
impl Default for ServerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
init_attached: false,
|
||||
}
|
||||
}
|
||||
/// Broker compartido. Si está presente, el servidor llama
|
||||
/// `register` tras un Hello aceptado y `unregister` al cerrar la
|
||||
/// sesión (Farewell o EOF). Si es `None`, el broker no se usa.
|
||||
pub broker: Option<SharedBroker>,
|
||||
}
|
||||
|
||||
/// Servidor de handshake escuchando en un Unix socket.
|
||||
@@ -118,18 +119,24 @@ pub struct Session {
|
||||
|
||||
impl Session {
|
||||
/// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings.
|
||||
/// Garantiza cleanup (sessions + broker) sin importar la rama de salida.
|
||||
pub async fn handle(mut self) -> std::io::Result<()> {
|
||||
let session_id = match self.do_handshake().await? {
|
||||
Some(id) => id,
|
||||
None => return Ok(()), // hello rechazado, conexión cerrada
|
||||
None => return Ok(()), // Hello rechazado, no se registró nada
|
||||
};
|
||||
|
||||
let result = self.run_post_handshake(session_id).await;
|
||||
self.cleanup(session_id).await;
|
||||
result
|
||||
}
|
||||
|
||||
async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> {
|
||||
loop {
|
||||
let frame = match read_frame(&mut self.stream).await {
|
||||
Ok(f) => f,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
debug!(session = %session_id, "cliente cerró conexión sin Farewell");
|
||||
self.sessions.lock().await.remove(&session_id);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
@@ -150,12 +157,18 @@ impl Session {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Frame::Farewell(Farewell { session }) => {
|
||||
if session == session_id {
|
||||
self.sessions.lock().await.remove(&session_id);
|
||||
}
|
||||
Frame::Farewell(Farewell { session }) if session == session_id => {
|
||||
return Ok(());
|
||||
}
|
||||
Frame::Farewell(_) => {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
_ => {
|
||||
// Frame inesperado en estado post-handshake.
|
||||
write_frame(
|
||||
@@ -170,6 +183,15 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
/// Limpieza atómica de las dos vistas: registro de sesiones + broker.
|
||||
/// Se ejecuta tanto si la sesión cierra por Farewell, EOF, o error.
|
||||
async fn cleanup(&self, session_id: SessionId) {
|
||||
self.sessions.lock().await.remove(&session_id);
|
||||
if let Some(broker) = &self.config.broker {
|
||||
broker.lock().await.unregister(session_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Lee el Hello, valida, registra la sesión y emite HelloAck.
|
||||
/// Devuelve `Some(session_id)` si el handshake fue exitoso.
|
||||
async fn do_handshake(&mut self) -> std::io::Result<Option<SessionId>> {
|
||||
@@ -193,12 +215,8 @@ impl Session {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let resolved = ResolvedCard::from_agnostic(hello.card);
|
||||
let session_id = Ulid::new();
|
||||
self.sessions
|
||||
.lock()
|
||||
.await
|
||||
.insert(session_id, resolved);
|
||||
self.register_session(session_id, hello.card).await;
|
||||
|
||||
let ack = HelloAck {
|
||||
server_version: crate::HANDSHAKE_VERSION.to_string(),
|
||||
@@ -211,6 +229,15 @@ impl Session {
|
||||
Ok(Some(session_id))
|
||||
}
|
||||
|
||||
/// Indexa la sesión: ResolvedCard en sessions + Card en broker (si hay).
|
||||
async fn register_session(&self, session_id: SessionId, card: Card) {
|
||||
if let Some(broker) = &self.config.broker {
|
||||
broker.lock().await.register(session_id, &card);
|
||||
}
|
||||
let resolved = ResolvedCard::from_agnostic(card);
|
||||
self.sessions.lock().await.insert(session_id, resolved);
|
||||
}
|
||||
|
||||
/// Validaciones que el servidor aplica al Hello del cliente.
|
||||
fn validate_hello(&self, hello: &Hello) -> Option<HandshakeError> {
|
||||
if hello.schema_version != CARD_SCHEMA_VERSION {
|
||||
|
||||
Reference in New Issue
Block a user