//! Servidor de handshake. Listener Unix socket → sesiones por conexión. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use brahman_broker::{Broker, Endpoint}; use brahman_card::{Card, ResolvedCard, WitInterface, CARD_SCHEMA_VERSION}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, Mutex}; use tracing::{debug, warn}; use ulid::Ulid; use crate::codec::{read_frame, write_frame}; use crate::messages::{ Farewell, Frame, HandshakeError, Hello, HelloAck, MatchEvent, MatchEventKind, Ping, Pong, SessionId, }; /// Tabla de sesiones vivas indexada por `SessionId`. pub type SessionRegistry = Arc>>; /// Broker compartido (opcional) que el servidor mantiene en sincronía con /// el ciclo de vida de las sesiones. pub type SharedBroker = Arc>; /// Tabla de canales push por sesión: el server inyecta frames hacia el /// cliente (p. ej. `MatchEvent`) sin requerir que el cliente haga request. type SessionTxTable = Arc>>>; /// Por sesión, último match conocido por nombre de input. Se usa para /// emitir diffs (Available/Lost) en lugar del estado completo. type LastMatches = Arc>>>; /// Capacidad del canal push por sesión. Si se llena (cliente lento), los /// eventos extra se descartan — el cliente puede re-consultar el estado. const PUSH_CHANNEL_CAPACITY: usize = 32; /// Configuración del servidor. #[derive(Debug, Clone, Default)] pub struct ServerConfig { /// `true` si el Init está atado al servidor (se reporta en `HelloAck`). pub init_attached: bool, /// Broker compartido. Si está presente, el servidor llama /// `register` tras un Hello aceptado y `unregister` al cerrar la /// sesión (Farewell o EOF). Si es `None`, el broker no se usa. pub broker: Option, } /// Servidor de handshake escuchando en un Unix socket. pub struct Server { listener: UnixListener, socket_path: PathBuf, sessions: SessionRegistry, push_table: SessionTxTable, last_matches: LastMatches, config: ServerConfig, } impl Server { /// Crea el listener en `path`. Si el archivo existe, lo elimina (asume /// que es un socket stale de una sesión previa). pub fn bind(path: impl Into, config: ServerConfig) -> std::io::Result { let socket_path = path.into(); if socket_path.exists() { std::fs::remove_file(&socket_path)?; } if let Some(parent) = socket_path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent)?; } } let listener = UnixListener::bind(&socket_path)?; Ok(Self { listener, socket_path, sessions: Arc::new(Mutex::new(HashMap::new())), push_table: Arc::new(Mutex::new(HashMap::new())), last_matches: Arc::new(Mutex::new(HashMap::new())), config, }) } /// Devuelve la ruta del socket (útil para clientes en el mismo proceso). pub fn socket_path(&self) -> &Path { &self.socket_path } /// Vista compartida del registro de sesiones — útil para el Init/Admin /// para inspeccionar quién está conectado. pub fn sessions(&self) -> SessionRegistry { self.sessions.clone() } /// Acepta UNA conexión, devuelve la `Session` lista para `handle()`. /// No corre el handler — eso es responsabilidad del llamante. pub async fn accept_one(&self) -> std::io::Result { let (stream, _addr) = self.listener.accept().await?; Ok(Session { stream, sessions: self.sessions.clone(), push_table: self.push_table.clone(), last_matches: self.last_matches.clone(), config: self.config.clone(), }) } /// Loop de aceptación: cada conexión se despacha en una task separada. /// Vive hasta que el listener falle o el caller drop el future. pub async fn run(self) -> std::io::Result<()> { loop { let session = self.accept_one().await?; tokio::spawn(async move { if let Err(e) = session.handle().await { warn!(error = %e, "session terminó con error"); } }); } } } impl Drop for Server { fn drop(&mut self) { // Limpieza best-effort del socket. Si falla, log y seguir. if let Err(e) = std::fs::remove_file(&self.socket_path) { if e.kind() != std::io::ErrorKind::NotFound { warn!(path = %self.socket_path.display(), error = %e, "no se pudo limpiar socket"); } } } } /// Conexión individual aceptada por el servidor. pub struct Session { stream: UnixStream, sessions: SessionRegistry, push_table: SessionTxTable, last_matches: LastMatches, config: ServerConfig, } impl Session { /// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings. /// Garantiza cleanup (sessions + broker) sin importar la rama de salida. pub async fn handle(mut self) -> std::io::Result<()> { let session_id = match self.do_handshake().await? { Some(id) => id, None => return Ok(()), // Hello rechazado, no se registró nada }; let result = self.run_post_handshake(session_id).await; self.cleanup(session_id).await; result } async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> { // Canal por donde el server inyecta frames push (MatchEvent, etc.). let (tx, mut rx) = mpsc::channel::(PUSH_CHANNEL_CAPACITY); self.push_table.lock().await.insert(session_id, tx); // Tras registrar el canal, recomputar matches y emitir diffs a // todas las sesiones afectadas (incluida ésta, si tiene inputs). self.broadcast_match_diffs().await; loop { tokio::select! { // Frame entrante del cliente. res = read_frame(&mut self.stream) => { match res { Ok(frame) => { if !self.handle_inbound_frame(session_id, frame).await? { return Ok(()); } } Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { debug!(session = %session_id, "cliente cerró sin Farewell"); return Ok(()); } Err(e) => return Err(e), } } // Frame push desde el server (MatchEvent). Some(frame) = rx.recv() => { write_frame(&mut self.stream, &frame).await?; } } } } /// Maneja un frame entrante. Devuelve `Ok(false)` si la sesión debe /// cerrarse limpiamente (Farewell con session-id correcto). async fn handle_inbound_frame( &mut self, session_id: SessionId, frame: Frame, ) -> std::io::Result { match frame { Frame::Ping(Ping { session }) if session == session_id => { let pong = Pong { timestamp_ms: now_ms(), }; write_frame(&mut self.stream, &Frame::Pong(pong)).await?; Ok(true) } Frame::Ping(_) => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Unauthorized( "session-id no coincide".into(), )), ) .await?; Ok(true) } Frame::Farewell(Farewell { session }) if session == session_id => Ok(false), Frame::Farewell(_) => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Unauthorized( "session-id no coincide".into(), )), ) .await?; Ok(true) } _ => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Rejected( "frame inesperado tras handshake".into(), )), ) .await?; Ok(true) } } } /// Limpieza atómica de TRES vistas: registro de sesiones + broker + /// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF, /// o error. Tras desregistrar, emite diffs a las sesiones que perdieron /// el match contra ésta. async fn cleanup(&self, session_id: SessionId) { self.sessions.lock().await.remove(&session_id); self.push_table.lock().await.remove(&session_id); self.last_matches.lock().await.remove(&session_id); if let Some(broker) = &self.config.broker { broker.lock().await.unregister(session_id); } self.broadcast_match_diffs().await; } /// Recomputa los matches para todas las sesiones registradas y empuja /// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron /// respecto al último estado conocido. /// /// Se llama tras cada `register_session` y `cleanup`. Las inserciones /// al canal usan `try_send` (non-blocking); si el cliente está lento /// y se llena el buffer, los eventos extra se pierden — es ephemeral /// y el cliente puede re-consultar el estado vía `brahman-status`. async fn broadcast_match_diffs(&self) { let broker = match &self.config.broker { Some(b) => b, None => return, }; let b = broker.lock().await; let push_table = self.push_table.lock().await; let mut last = self.last_matches.lock().await; debug!( target: "brahman_handshake::broadcast", cards = b.len(), push_subscribers = push_table.len(), "broadcast_match_diffs" ); // Snapshot de cards para no tener que sostener el lock del broker. let cards: Vec<_> = b.cards().cloned().collect(); for cons in &cards { let cons_session = cons.session; let tx = match push_table.get(&cons_session) { Some(tx) => tx, None => continue, // todavía no tiene canal push }; let cons_last = last.entry(cons_session).or_default(); for input in &cons.inputs { let new_match = b.find_producer_for(cons_session, &input.name); let new_endpoint = new_match.as_ref().map(|m| m.producer.clone()); let old_endpoint = cons_last.get(&input.name).cloned(); if old_endpoint == new_endpoint { continue; } if let Some(m) = &new_match { // Resolvemos el service_socket del productor desde // la BrokeredCard; pasarlo en el evento permite al // consumer conectar directo sin discovery extra. let producer_service_socket = b .cards() .find(|c| c.session == m.producer.session) .and_then(|c| c.service_socket.clone()); let event = MatchEvent { kind: MatchEventKind::Available, consumer_flow: input.name.clone(), producer_session: m.producer.session, producer_label: m.producer_label.clone(), producer_flow: m.producer.flow_name.clone(), ty: m.ty.clone(), via: m.via, pinned: m.pinned, producer_service_socket, }; let send_res = tx.try_send(Frame::MatchEvent(event)); debug!( target: "brahman_handshake::broadcast", consumer = %cons_session, flow = %input.name, producer = %m.producer_label, result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e { tokio::sync::mpsc::error::TrySendError::Full(_) => "full", tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed", }), "Available pushed" ); } else { // Tenía match, ahora no. let event = MatchEvent { kind: MatchEventKind::Lost, consumer_flow: input.name.clone(), producer_session: Ulid::nil(), producer_label: String::new(), producer_flow: String::new(), ty: input.ty.clone(), via: brahman_broker::MatchStrategy::Exact, pinned: false, producer_service_socket: None, }; let _ = tx.try_send(Frame::MatchEvent(event)); } if let Some(ep) = new_endpoint { cons_last.insert(input.name.clone(), ep); } else { cons_last.remove(&input.name); } } } } /// Lee el Hello, valida, registra la sesión y emite HelloAck. /// Devuelve `Some(session_id)` si el handshake fue exitoso. async fn do_handshake(&mut self) -> std::io::Result> { let frame = read_frame(&mut self.stream).await?; let hello = match frame { Frame::Hello(h) => h, _ => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Rejected( "primer frame debe ser Hello".into(), )), ) .await?; return Ok(None); } }; if let Some(err) = self.validate_hello(&hello) { write_frame(&mut self.stream, &Frame::Error(err)).await?; return Ok(None); } let session_id = Ulid::new(); // WireCard → Card: extensiones quedan vacías post-wire (es el contrato). let card: Card = hello.card.into(); self.register_session(session_id, card, hello.wit).await; let ack = HelloAck { server_version: crate::HANDSHAKE_VERSION.to_string(), protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), session: session_id, init_attached: self.config.init_attached, }; write_frame(&mut self.stream, &Frame::HelloAck(ack)).await?; debug!(session = %session_id, "handshake completado"); Ok(Some(session_id)) } /// Indexa la sesión: ResolvedCard en sessions + Card en broker (si hay). /// Si `wit` está presente, el módulo se registra como "consciente". async fn register_session( &self, session_id: SessionId, card: Card, wit: Option, ) { if let Some(broker) = &self.config.broker { broker .lock() .await .register(session_id, &card, wit.clone()); } let resolved = match wit { Some(w) => ResolvedCard::from_conscious(card, w), None => ResolvedCard::from_agnostic(card), }; self.sessions.lock().await.insert(session_id, resolved); } /// Validaciones que el servidor aplica al Hello del cliente. fn validate_hello(&self, hello: &Hello) -> Option { if hello.schema_version != CARD_SCHEMA_VERSION { return Some(HandshakeError::SchemaMismatch { client: hello.schema_version, server: CARD_SCHEMA_VERSION, }); } if hello.protocol_version != brahman_card::PROTOCOL_VERSION { return Some(HandshakeError::ProtocolMismatch(format!( "cliente={}, servidor={}", hello.protocol_version, brahman_card::PROTOCOL_VERSION ))); } // Validamos contra Card (la rica) — convertir es barato y centraliza // la lógica de validación en un solo lugar. let as_card: Card = Card::from(hello.card.clone()); if let Err(e) = as_card.validate() { return Some(HandshakeError::InvalidCard(e.to_string())); } None } } fn now_ms() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_millis() as u64) .unwrap_or(0) }