diff --git a/Cargo.lock b/Cargo.lock index 5f24f23..3e20a2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1150,6 +1150,20 @@ dependencies = [ "ulid", ] +[[package]] +name = "brahman-handshake" +version = "0.1.0" +dependencies = [ + "brahman-card", + "postcard", + "serde", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", + "ulid", +] + [[package]] name = "bs58" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 671bdaa..94bc8e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ # core/ — Init y compat (arje absorbido) # ============================================================ "crates/core/brahman-card", + "crates/core/brahman-handshake", "crates/core/ente-card", "crates/core/ente-bus", "crates/core/ente-cas", diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index 0897075..8c7d40e 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -18,12 +18,11 @@ #![forbid(unsafe_code)] #![warn(rust_2018_idioms)] -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashSet}; use std::path::Path; use std::time::Duration; use serde::{Deserialize, Serialize}; -use serde_json::Value; use thiserror::Error; use ulid::Ulid; @@ -125,10 +124,6 @@ pub struct Card { /// Hijas a instanciar inmediatamente al encarnar esta Card. #[serde(default)] pub genesis: Vec, - - /// Campos desconocidos preservados intactos (forward-compat). - #[serde(flatten, default)] - pub extensions: HashMap, } impl Default for Card { @@ -151,7 +146,6 @@ impl Default for Card { priority: Priority::default(), flow: Flows::default(), genesis: Vec::new(), - extensions: HashMap::new(), } } } @@ -221,8 +215,6 @@ pub struct Permissions { /// Capacidad de spawnear sub-procesos. Implica `TrustLevel::System`. #[serde(default)] pub processes: bool, - #[serde(flatten, default)] - pub extra: HashMap, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] diff --git a/crates/core/brahman-handshake/Cargo.toml b/crates/core/brahman-handshake/Cargo.toml new file mode 100644 index 0000000..d4eeb27 --- /dev/null +++ b/crates/core/brahman-handshake/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "brahman-handshake" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "Brahman — handshake runtime Init↔módulo sobre Unix socket (postcard frames)." + +[dependencies] +brahman-card = { path = "../brahman-card" } +serde = { workspace = true } +postcard = { workspace = true } +tokio = { workspace = true } +thiserror = { workspace = true } +ulid = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true } diff --git a/crates/core/brahman-handshake/src/client.rs b/crates/core/brahman-handshake/src/client.rs new file mode 100644 index 0000000..4992b7b --- /dev/null +++ b/crates/core/brahman-handshake/src/client.rs @@ -0,0 +1,108 @@ +//! Cliente de handshake. Conecta a un Unix socket y mantiene la sesión. + +use std::path::Path; + +use brahman_card::{Card, CARD_SCHEMA_VERSION}; +use thiserror::Error; +use tokio::net::UnixStream; + +use crate::codec::{read_frame, write_frame}; +use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, SessionId}; + +/// Errores del cliente. +#[derive(Debug, Error)] +pub enum ClientError { + #[error("E/S: {0}")] + Io(#[from] std::io::Error), + + /// El servidor respondió con un error explícito. + #[error("servidor: {0}")] + Server(#[source] HandshakeError), + + /// El servidor envió un frame que no esperábamos en este punto del protocolo. + #[error("frame inesperado: {got}")] + UnexpectedFrame { got: &'static str }, + + /// La Card que el cliente intentó enviar no pasa su propia validación. + #[error("card inválida pre-envío: {0}")] + InvalidCard(String), +} + +/// Cliente conectado y autenticado. Tras `connect` ya completó el handshake +/// y tiene su `SessionId`. +#[derive(Debug)] +pub struct Client { + stream: UnixStream, + session: SessionId, + server_info: HelloAck, +} + +impl Client { + /// Conecta al socket, envía Hello con la Card dada y procesa la respuesta. + pub async fn connect(path: impl AsRef, card: Card) -> Result { + // Pre-validamos para fallar local antes de hablar con el servidor. + card.validate() + .map_err(|e| ClientError::InvalidCard(e.to_string()))?; + + let mut stream = UnixStream::connect(path).await?; + let hello = Hello { + schema_version: CARD_SCHEMA_VERSION, + protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), + card, + }; + write_frame(&mut stream, &Frame::Hello(hello)).await?; + + let frame = read_frame(&mut stream).await?; + let ack = match frame { + Frame::HelloAck(a) => a, + Frame::Error(e) => return Err(ClientError::Server(e)), + Frame::Hello(_) => return Err(ClientError::UnexpectedFrame { got: "Hello" }), + Frame::Ping(_) => return Err(ClientError::UnexpectedFrame { got: "Ping" }), + Frame::Pong(_) => return Err(ClientError::UnexpectedFrame { got: "Pong" }), + Frame::Farewell(_) => return Err(ClientError::UnexpectedFrame { got: "Farewell" }), + }; + Ok(Self { + stream, + session: ack.session, + server_info: ack, + }) + } + + /// `SessionId` asignado por el servidor. + pub fn session(&self) -> SessionId { + self.session + } + + /// Información del servidor recibida en el handshake. + pub fn server_info(&self) -> &HelloAck { + &self.server_info + } + + /// Envía un Ping y devuelve el timestamp del servidor. + pub async fn ping(&mut self) -> Result { + write_frame( + &mut self.stream, + &Frame::Ping(Ping { + session: self.session, + }), + ) + .await?; + match read_frame(&mut self.stream).await? { + Frame::Pong(p) => Ok(p.timestamp_ms), + Frame::Error(e) => Err(ClientError::Server(e)), + _ => Err(ClientError::UnexpectedFrame { got: "non-pong" }), + } + } + + /// Cierre cooperativo. Consume el cliente. + pub async fn farewell(mut self) -> Result<(), ClientError> { + write_frame( + &mut self.stream, + &Frame::Farewell(Farewell { + session: self.session, + }), + ) + .await?; + Ok(()) + } +} diff --git a/crates/core/brahman-handshake/src/codec.rs b/crates/core/brahman-handshake/src/codec.rs new file mode 100644 index 0000000..adfb038 --- /dev/null +++ b/crates/core/brahman-handshake/src/codec.rs @@ -0,0 +1,72 @@ +//! Codec de wire: frames length-prefixed con cuerpo postcard. +//! +//! Cada frame en el stream tiene la forma: +//! ```text +//! [4 bytes LE: longitud N] [N bytes: postcard(Frame)] +//! ``` +//! +//! El `MAX_FRAME_BYTES` evita que un cliente malicioso/buggy reserve memoria +//! arbitraria al anunciar un length absurdo. + +use std::io::{Error, ErrorKind, Result}; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use crate::messages::Frame; + +/// Tamaño máximo de un frame antes de que el reader rechace la conexión. +/// 4 MiB cubre cualquier Card razonable con margen amplio. +pub const MAX_FRAME_BYTES: usize = 4 * 1024 * 1024; + +/// Escribe un frame al stream. +pub async fn write_frame(w: &mut W, frame: &Frame) -> Result<()> { + let bytes = postcard::to_allocvec(frame) + .map_err(|e| Error::new(ErrorKind::InvalidData, format!("postcard encode: {e}")))?; + if bytes.len() > MAX_FRAME_BYTES { + return Err(Error::new( + ErrorKind::InvalidData, + format!("frame demasiado grande: {} bytes", bytes.len()), + )); + } + let len = bytes.len() as u32; + w.write_all(&len.to_le_bytes()).await?; + w.write_all(&bytes).await?; + w.flush().await?; + Ok(()) +} + +/// Lee un frame del stream. +pub async fn read_frame(r: &mut R) -> Result { + let mut len_buf = [0u8; 4]; + r.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf) as usize; + if len > MAX_FRAME_BYTES { + return Err(Error::new( + ErrorKind::InvalidData, + format!("frame anunciado demasiado grande: {len} bytes"), + )); + } + let mut buf = vec![0u8; len]; + r.read_exact(&mut buf).await?; + postcard::from_bytes(&buf) + .map_err(|e| Error::new(ErrorKind::InvalidData, format!("postcard decode: {e}"))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::messages::{Frame, HandshakeError}; + + #[tokio::test] + async fn frame_roundtrip() { + let frame = Frame::Error(HandshakeError::Rejected("test".into())); + let mut buf = Vec::new(); + write_frame(&mut buf, &frame).await.unwrap(); + let mut cursor = std::io::Cursor::new(buf); + let decoded = read_frame(&mut cursor).await.unwrap(); + match decoded { + Frame::Error(HandshakeError::Rejected(s)) => assert_eq!(s, "test"), + _ => panic!("variant mismatch"), + } + } +} diff --git a/crates/core/brahman-handshake/src/lib.rs b/crates/core/brahman-handshake/src/lib.rs new file mode 100644 index 0000000..ca0545e --- /dev/null +++ b/crates/core/brahman-handshake/src/lib.rs @@ -0,0 +1,28 @@ +//! `brahman-handshake` — protocolo runtime Init↔módulo sobre Unix socket. +//! +//! Implementa la versión concreta de `shared_wit/protocol.wit` (handshake + +//! lifecycle): un servidor que vive en el Init (o un Admin proxy) y clientes +//! que son los módulos Brahman. Cada conexión arranca con un `Hello` que +//! lleva una [`brahman_card::Card`]; el servidor valida la Card, deriva el +//! [`TrustLevel`], emite un `HelloAck` con `session-id` ULID, y a partir de +//! ahí acepta `Ping`/`Farewell`. +//! +//! Wire format: frames length-prefixed (4 bytes LE) con cuerpo +//! [`postcard`]-codificado. Compacto, rápido y reversible. +//! +//! Esto NO es la implementación WIT/WASM (que generaría wit-bindgen). Es la +//! implementación nativa Rust↔Rust que cubre el caso común antes de que los +//! módulos WASM consuman el mismo contrato vía ABI generada. + +#![forbid(unsafe_code)] +#![warn(rust_2018_idioms)] + +pub mod codec; +pub mod messages; +pub mod server; +pub mod client; + +pub use brahman_card::PROTOCOL_VERSION; + +/// Versión del crate de handshake (independiente de `PROTOCOL_VERSION`). +pub const HANDSHAKE_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/core/brahman-handshake/src/messages.rs b/crates/core/brahman-handshake/src/messages.rs new file mode 100644 index 0000000..2b64e66 --- /dev/null +++ b/crates/core/brahman-handshake/src/messages.rs @@ -0,0 +1,84 @@ +//! Mensajes del protocolo de handshake. +//! +//! Todos los mensajes que cruzan el wire son variantes de [`Frame`]. + +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +/// Identificador de sesión emitido por el servidor en `HelloAck`. +pub type SessionId = Ulid; + +/// Saludo inicial del módulo. Lleva la Card completa para que el servidor +/// la valide e indexe. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Hello { + /// Versión del schema de Card que el cliente sigue. + pub schema_version: u16, + /// Versión del protocolo handshake del cliente. + pub protocol_version: String, + /// Tarjeta de Presentación. + pub card: brahman_card::Card, +} + +/// Respuesta del servidor a un `Hello` aceptado. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HelloAck { + /// Versión del crate del servidor. + pub server_version: String, + /// Versión del protocolo soportada por el servidor. + pub protocol_version: String, + /// Identificador de sesión asignado. + pub session: SessionId, + /// `true` si el Init está vinculado al servidor; `false` si el servidor + /// corre standalone (modo degradado). + pub init_attached: bool, +} + +/// Latido del cliente. El servidor responde con [`Pong`] llevando su reloj. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Ping { + pub session: SessionId, +} + +/// Respuesta a un `Ping` con timestamp del servidor (ms desde UNIX_EPOCH). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Pong { + pub timestamp_ms: u64, +} + +/// Cierre cooperativo de la sesión por parte del cliente. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Farewell { + pub session: SessionId, +} + +/// Errores del protocolo emitidos por el servidor. +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)] +pub enum HandshakeError { + #[error("protocolo incompatible: {0}")] + ProtocolMismatch(String), + #[error("card inválida: {0}")] + InvalidCard(String), + #[error("schema de card incompatible: cliente={client}, servidor={server}")] + SchemaMismatch { client: u16, server: u16 }, + #[error("sin autorización: {0}")] + Unauthorized(String), + #[error("capacidad requerida no satisfecha: {0}")] + CapabilityUnmet(String), + #[error("rechazado: {0}")] + Rejected(String), + #[error("error interno: {0}")] + Internal(String), +} + +/// Frame único de wire — discriminada por variante. Cada conexión es un +/// stream de frames. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Frame { + Hello(Hello), + HelloAck(HelloAck), + Ping(Ping), + Pong(Pong), + Farewell(Farewell), + Error(HandshakeError), +} diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs new file mode 100644 index 0000000..8f6f776 --- /dev/null +++ b/crates/core/brahman-handshake/src/server.rs @@ -0,0 +1,241 @@ +//! Servidor de handshake. Listener Unix socket → sesiones por conexión. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use brahman_card::{ResolvedCard, CARD_SCHEMA_VERSION}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::Mutex; +use tracing::{debug, warn}; +use ulid::Ulid; + +use crate::codec::{read_frame, write_frame}; +use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, Ping, Pong, SessionId}; + +/// Tabla de sesiones vivas indexada por `SessionId`. +pub type SessionRegistry = Arc>>; + +/// Configuración del servidor. +#[derive(Debug, Clone)] +pub struct ServerConfig { + /// `true` si el Init está atado al servidor (se reporta en `HelloAck`). + pub init_attached: bool, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + init_attached: false, + } + } +} + +/// Servidor de handshake escuchando en un Unix socket. +pub struct Server { + listener: UnixListener, + socket_path: PathBuf, + sessions: SessionRegistry, + config: ServerConfig, +} + +impl Server { + /// Crea el listener en `path`. Si el archivo existe, lo elimina (asume + /// que es un socket stale de una sesión previa). + pub fn bind(path: impl Into, config: ServerConfig) -> std::io::Result { + let socket_path = path.into(); + if socket_path.exists() { + std::fs::remove_file(&socket_path)?; + } + if let Some(parent) = socket_path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent)?; + } + } + let listener = UnixListener::bind(&socket_path)?; + Ok(Self { + listener, + socket_path, + sessions: Arc::new(Mutex::new(HashMap::new())), + config, + }) + } + + /// Devuelve la ruta del socket (útil para clientes en el mismo proceso). + pub fn socket_path(&self) -> &Path { + &self.socket_path + } + + /// Vista compartida del registro de sesiones — útil para el Init/Admin + /// para inspeccionar quién está conectado. + pub fn sessions(&self) -> SessionRegistry { + self.sessions.clone() + } + + /// Acepta UNA conexión, devuelve la `Session` lista para `handle()`. + /// No corre el handler — eso es responsabilidad del llamante. + pub async fn accept_one(&self) -> std::io::Result { + let (stream, _addr) = self.listener.accept().await?; + Ok(Session { + stream, + sessions: self.sessions.clone(), + config: self.config.clone(), + }) + } + + /// Loop de aceptación: cada conexión se despacha en una task separada. + /// Vive hasta que el listener falle o el caller drop el future. + pub async fn run(self) -> std::io::Result<()> { + loop { + let session = self.accept_one().await?; + tokio::spawn(async move { + if let Err(e) = session.handle().await { + warn!(error = %e, "session terminó con error"); + } + }); + } + } +} + +impl Drop for Server { + fn drop(&mut self) { + // Limpieza best-effort del socket. Si falla, log y seguir. + if let Err(e) = std::fs::remove_file(&self.socket_path) { + if e.kind() != std::io::ErrorKind::NotFound { + warn!(path = %self.socket_path.display(), error = %e, "no se pudo limpiar socket"); + } + } + } +} + +/// Conexión individual aceptada por el servidor. +pub struct Session { + stream: UnixStream, + sessions: SessionRegistry, + config: ServerConfig, +} + +impl Session { + /// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings. + pub async fn handle(mut self) -> std::io::Result<()> { + let session_id = match self.do_handshake().await? { + Some(id) => id, + None => return Ok(()), // hello rechazado, conexión cerrada + }; + + loop { + let frame = match read_frame(&mut self.stream).await { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + debug!(session = %session_id, "cliente cerró conexión sin Farewell"); + self.sessions.lock().await.remove(&session_id); + return Ok(()); + } + Err(e) => return Err(e), + }; + match frame { + Frame::Ping(Ping { session }) if session == session_id => { + let pong = Pong { + timestamp_ms: now_ms(), + }; + write_frame(&mut self.stream, &Frame::Pong(pong)).await?; + } + Frame::Ping(_) => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + } + Frame::Farewell(Farewell { session }) => { + if session == session_id { + self.sessions.lock().await.remove(&session_id); + } + return Ok(()); + } + _ => { + // Frame inesperado en estado post-handshake. + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Rejected( + "frame inesperado tras handshake".into(), + )), + ) + .await?; + } + } + } + } + + /// Lee el Hello, valida, registra la sesión y emite HelloAck. + /// Devuelve `Some(session_id)` si el handshake fue exitoso. + async fn do_handshake(&mut self) -> std::io::Result> { + let frame = read_frame(&mut self.stream).await?; + let hello = match frame { + Frame::Hello(h) => h, + _ => { + write_frame( + &mut self.stream, + &Frame::Error(HandshakeError::Rejected( + "primer frame debe ser Hello".into(), + )), + ) + .await?; + return Ok(None); + } + }; + + if let Some(err) = self.validate_hello(&hello) { + write_frame(&mut self.stream, &Frame::Error(err)).await?; + return Ok(None); + } + + let resolved = ResolvedCard::from_agnostic(hello.card); + let session_id = Ulid::new(); + self.sessions + .lock() + .await + .insert(session_id, resolved); + + let ack = HelloAck { + server_version: crate::HANDSHAKE_VERSION.to_string(), + protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), + session: session_id, + init_attached: self.config.init_attached, + }; + write_frame(&mut self.stream, &Frame::HelloAck(ack)).await?; + debug!(session = %session_id, "handshake completado"); + Ok(Some(session_id)) + } + + /// Validaciones que el servidor aplica al Hello del cliente. + fn validate_hello(&self, hello: &Hello) -> Option { + if hello.schema_version != CARD_SCHEMA_VERSION { + return Some(HandshakeError::SchemaMismatch { + client: hello.schema_version, + server: CARD_SCHEMA_VERSION, + }); + } + if hello.protocol_version != brahman_card::PROTOCOL_VERSION { + return Some(HandshakeError::ProtocolMismatch(format!( + "cliente={}, servidor={}", + hello.protocol_version, + brahman_card::PROTOCOL_VERSION + ))); + } + if let Err(e) = hello.card.validate() { + return Some(HandshakeError::InvalidCard(e.to_string())); + } + None + } +} + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} diff --git a/crates/core/brahman-handshake/tests/handshake.rs b/crates/core/brahman-handshake/tests/handshake.rs new file mode 100644 index 0000000..7562b5c --- /dev/null +++ b/crates/core/brahman-handshake/tests/handshake.rs @@ -0,0 +1,167 @@ +//! Tests de integración: levanta server + client en el mismo proceso, +//! ejercita el round-trip completo del protocolo. + +use std::collections::BTreeSet; +use std::time::Duration; + +use brahman_card::{ + Card, CgroupSpec, NamespaceSet, Payload, ResourceLimits, SomaSpec, Supervision, + CARD_SCHEMA_VERSION, +}; +use brahman_handshake::{ + client::{Client, ClientError}, + codec::{read_frame, write_frame}, + messages::{Frame, HandshakeError, Hello, Ping}, + server::{Server, ServerConfig}, +}; +use tokio::net::UnixStream; +use ulid::Ulid; + +fn sample_card(label: &str) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + lineage: None, + label: label.into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + soma: SomaSpec { + cgroup: CgroupSpec { + path: "ente.slice/test".into(), + cpu_weight: None, + io_weight: None, + }, + namespaces: NamespaceSet::default(), + rlimits: ResourceLimits::default(), + cpu_affinity: None, + }, + payload: Payload::Virtual, + supervision: Supervision::OneShot, + ..Default::default() + } +} + +/// Genera una ruta de socket única bajo TMPDIR. No la creamos — +/// el server la creará al hacer bind. +fn sock_path(name: &str) -> std::path::PathBuf { + std::env::temp_dir().join(format!( + "brahman-test-{}-{}-{}.sock", + name, + std::process::id(), + Ulid::new() + )) +} + +#[tokio::test] +async fn full_handshake_roundtrip() { + let path = sock_path("happy"); + let server = Server::bind(&path, ServerConfig { init_attached: true }).unwrap(); + + let session_handle = tokio::spawn({ + async move { + let session = server.accept_one().await.unwrap(); + session.handle().await.unwrap(); + } + }); + + let mut client = Client::connect(&path, sample_card("alpha")).await.unwrap(); + assert!(client.server_info().init_attached); + assert_eq!( + client.server_info().protocol_version, + brahman_card::PROTOCOL_VERSION + ); + + let mut last = 0u64; + for _ in 0..3 { + let ts = client.ping().await.unwrap(); + assert!(ts >= last); + last = ts; + tokio::time::sleep(Duration::from_millis(2)).await; + } + client.farewell().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(2), session_handle) + .await + .expect("server hung after farewell") + .unwrap(); +} + +#[tokio::test] +async fn rejects_invalid_card_client_side() { + let path = sock_path("invalid"); + let server = Server::bind(&path, ServerConfig::default()).unwrap(); + let session_handle = tokio::spawn(async move { + // No esperamos que el server complete: el cliente corta antes. + let _ = tokio::time::timeout(Duration::from_secs(1), async move { + let session = server.accept_one().await.unwrap(); + session.handle().await.unwrap(); + }) + .await; + }); + + let mut bad = sample_card("placeholder"); + bad.label = String::new(); + let err = Client::connect(&path, bad).await.unwrap_err(); + assert!(matches!(err, ClientError::InvalidCard(_))); + + session_handle.abort(); +} + +#[tokio::test] +async fn server_rejects_protocol_mismatch() { + let path = sock_path("mismatch"); + let server = Server::bind(&path, ServerConfig::default()).unwrap(); + let session_handle = tokio::spawn(async move { + let session = server.accept_one().await.unwrap(); + session.handle().await.unwrap(); + }); + + let mut stream = UnixStream::connect(&path).await.unwrap(); + let hello = Hello { + schema_version: CARD_SCHEMA_VERSION, + protocol_version: "999.0.0".into(), + card: sample_card("future-module"), + }; + write_frame(&mut stream, &Frame::Hello(hello)).await.unwrap(); + + match read_frame(&mut stream).await.unwrap() { + Frame::Error(HandshakeError::ProtocolMismatch(_)) => {} + other => panic!("esperado ProtocolMismatch, got {other:?}"), + } + + tokio::time::timeout(Duration::from_secs(2), session_handle) + .await + .expect("server hung after rejecting") + .unwrap(); +} + +#[tokio::test] +async fn ping_before_hello_rejected() { + let path = sock_path("ping-no-hello"); + let server = Server::bind(&path, ServerConfig::default()).unwrap(); + let session_handle = tokio::spawn(async move { + let session = server.accept_one().await.unwrap(); + session.handle().await.unwrap(); + }); + + // Conectamos y mandamos un Ping sin haber saludado. + let mut stream = UnixStream::connect(&path).await.unwrap(); + write_frame( + &mut stream, + &Frame::Ping(Ping { + session: Ulid::new(), + }), + ) + .await + .unwrap(); + + match read_frame(&mut stream).await.unwrap() { + Frame::Error(HandshakeError::Rejected(_)) => {} + other => panic!("esperado Rejected, got {other:?}"), + } + + tokio::time::timeout(Duration::from_secs(2), session_handle) + .await + .expect("server hung after rejecting") + .unwrap(); +}