//! Servidor de handshake. Listener Unix socket → sesiones por conexión. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use brahman_card::{ResolvedCard, CARD_SCHEMA_VERSION}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::Mutex; use tracing::{debug, warn}; use ulid::Ulid; use crate::codec::{read_frame, write_frame}; use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Pong, SessionId}; /// Tabla de sesiones vivas indexada por `SessionId`. pub type SessionRegistry = Arc>>; /// Configuración del servidor. #[derive(Debug, Clone)] pub struct ServerConfig { /// `true` si el Init está atado al servidor (se reporta en `HelloAck`). pub init_attached: bool, } impl Default for ServerConfig { fn default() -> Self { Self { init_attached: false, } } } /// Servidor de handshake escuchando en un Unix socket. pub struct Server { listener: UnixListener, socket_path: PathBuf, sessions: SessionRegistry, config: ServerConfig, } impl Server { /// Crea el listener en `path`. Si el archivo existe, lo elimina (asume /// que es un socket stale de una sesión previa). pub fn bind(path: impl Into, config: ServerConfig) -> std::io::Result { let socket_path = path.into(); if socket_path.exists() { std::fs::remove_file(&socket_path)?; } if let Some(parent) = socket_path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent)?; } } let listener = UnixListener::bind(&socket_path)?; Ok(Self { listener, socket_path, sessions: Arc::new(Mutex::new(HashMap::new())), config, }) } /// Devuelve la ruta del socket (útil para clientes en el mismo proceso). pub fn socket_path(&self) -> &Path { &self.socket_path } /// Vista compartida del registro de sesiones — útil para el Init/Admin /// para inspeccionar quién está conectado. pub fn sessions(&self) -> SessionRegistry { self.sessions.clone() } /// Acepta UNA conexión, devuelve la `Session` lista para `handle()`. /// No corre el handler — eso es responsabilidad del llamante. pub async fn accept_one(&self) -> std::io::Result { let (stream, _addr) = self.listener.accept().await?; Ok(Session { stream, sessions: self.sessions.clone(), config: self.config.clone(), }) } /// Loop de aceptación: cada conexión se despacha en una task separada. /// Vive hasta que el listener falle o el caller drop el future. pub async fn run(self) -> std::io::Result<()> { loop { let session = self.accept_one().await?; tokio::spawn(async move { if let Err(e) = session.handle().await { warn!(error = %e, "session terminó con error"); } }); } } } impl Drop for Server { fn drop(&mut self) { // Limpieza best-effort del socket. Si falla, log y seguir. if let Err(e) = std::fs::remove_file(&self.socket_path) { if e.kind() != std::io::ErrorKind::NotFound { warn!(path = %self.socket_path.display(), error = %e, "no se pudo limpiar socket"); } } } } /// Conexión individual aceptada por el servidor. pub struct Session { stream: UnixStream, sessions: SessionRegistry, config: ServerConfig, } impl Session { /// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings. pub async fn handle(mut self) -> std::io::Result<()> { let session_id = match self.do_handshake().await? { Some(id) => id, None => return Ok(()), // hello rechazado, conexión cerrada }; loop { let frame = match read_frame(&mut self.stream).await { Ok(f) => f, Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { debug!(session = %session_id, "cliente cerró conexión sin Farewell"); self.sessions.lock().await.remove(&session_id); return Ok(()); } Err(e) => return Err(e), }; match frame { Frame::Ping(Ping { session }) if session == session_id => { let pong = Pong { timestamp_ms: now_ms(), }; write_frame(&mut self.stream, &Frame::Pong(pong)).await?; } Frame::Ping(_) => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Unauthorized( "session-id no coincide".into(), )), ) .await?; } Frame::Farewell(Farewell { session }) => { if session == session_id { self.sessions.lock().await.remove(&session_id); } return Ok(()); } _ => { // Frame inesperado en estado post-handshake. write_frame( &mut self.stream, &Frame::Error(HandshakeError::Rejected( "frame inesperado tras handshake".into(), )), ) .await?; } } } } /// Lee el Hello, valida, registra la sesión y emite HelloAck. /// Devuelve `Some(session_id)` si el handshake fue exitoso. async fn do_handshake(&mut self) -> std::io::Result> { let frame = read_frame(&mut self.stream).await?; let hello = match frame { Frame::Hello(h) => h, _ => { write_frame( &mut self.stream, &Frame::Error(HandshakeError::Rejected( "primer frame debe ser Hello".into(), )), ) .await?; return Ok(None); } }; if let Some(err) = self.validate_hello(&hello) { write_frame(&mut self.stream, &Frame::Error(err)).await?; return Ok(None); } let resolved = ResolvedCard::from_agnostic(hello.card); let session_id = Ulid::new(); self.sessions .lock() .await .insert(session_id, resolved); let ack = HelloAck { server_version: crate::HANDSHAKE_VERSION.to_string(), protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), session: session_id, init_attached: self.config.init_attached, }; write_frame(&mut self.stream, &Frame::HelloAck(ack)).await?; debug!(session = %session_id, "handshake completado"); Ok(Some(session_id)) } /// Validaciones que el servidor aplica al Hello del cliente. fn validate_hello(&self, hello: &Hello) -> Option { if hello.schema_version != CARD_SCHEMA_VERSION { return Some(HandshakeError::SchemaMismatch { client: hello.schema_version, server: CARD_SCHEMA_VERSION, }); } if hello.protocol_version != brahman_card::PROTOCOL_VERSION { return Some(HandshakeError::ProtocolMismatch(format!( "cliente={}, servidor={}", hello.protocol_version, brahman_card::PROTOCOL_VERSION ))); } if let Err(e) = hello.card.validate() { return Some(HandshakeError::InvalidCard(e.to_string())); } None } } fn now_ms() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_millis() as u64) .unwrap_or(0) }