Files
brahman/crates/core/brahman-handshake/src/client.rs
T
Sergio f19ca723b6 feat(card): WireCard + extensions — forward-compat sin romper postcard
Restaura el campo extensions de Card que había caído al adoptar postcard
(serde_json::Value usa secuencias/maps de longitud dinámica). La
solución es separar dos formas:

- Card (la rica): para JSON/TOML. Tiene extensions: BTreeMap<String,
  serde_json::Value> con #[serde(flatten, skip_serializing_if = is_empty)].
  Los campos desconocidos del archivo sobreviven el roundtrip.
- WireCard (la slim): para postcard. Mismo schema sin extensions y con
  genesis: Vec<WireCard> recursivo. Postcard-friendly por construcción.

Conversiones From<Card> for WireCard (descarta extensions) y
From<WireCard> for Card (extensiones quedan vacías post-wire). El
contrato es explícito: extensions son anotaciones locales que sobreviven
file I/O pero NO cruzan al Init.

brahman-handshake::Hello.card cambia de Card a WireCard. Client hace
card.into() al enviar; Server hace hello.card.into() para volver a
Card antes de validar/registrar.

Tests:
- 3 nuevos en brahman-card: extensions_preserved_in_json_roundtrip,
  wire_card_roundtrip_strips_extensions, wire_card_postcard_friendly
  (verifica que postcard::to_allocvec(&wire) NO falla — caso que
  rompía con Card.extensions populadas).
- 1 ajuste en handshake/tests/handshake.rs (struct-literal de Hello
  ahora con card: sample_card(...).into()).
- brahman-card: postcard como dev-dep.

Tests acumulados: 35 (card 11, broker 11, handshake codec+transport 2 +
integ 7, card-wit 4, admin 0). 0 errores, 0 warnings (vienen del
commit anterior 9420eae).

CHANGELOG.md actualizado con esta entrada y con el commit 9420eae
("probando" del usuario, limpieza de 17 warnings dead-code).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 17:33:15 +00:00

162 lines
5.7 KiB
Rust

//! Cliente de handshake. Conecta a un Unix socket y mantiene la sesión.
use std::collections::VecDeque;
use std::path::Path;
use std::time::Duration;
use brahman_card::{Card, WitInterface, CARD_SCHEMA_VERSION};
use thiserror::Error;
use tokio::net::UnixStream;
use crate::codec::{read_frame, write_frame};
use crate::messages::{Farewell, Frame, HandshakeError, Hello, HelloAck, MatchEvent, Ping, SessionId};
/// Errores del cliente.
#[derive(Debug, Error)]
pub enum ClientError {
#[error("E/S: {0}")]
Io(#[from] std::io::Error),
/// El servidor respondió con un error explícito.
#[error("servidor: {0}")]
Server(#[source] HandshakeError),
/// El servidor envió un frame que no esperábamos en este punto del protocolo.
#[error("frame inesperado: {got}")]
UnexpectedFrame { got: &'static str },
/// La Card que el cliente intentó enviar no pasa su propia validación.
#[error("card inválida pre-envío: {0}")]
InvalidCard(String),
}
/// Cliente conectado y autenticado. Tras `connect` ya completó el handshake
/// y tiene su `SessionId`. Los `MatchEvent` recibidos durante operaciones
/// request/response se buferean en `pending_events` y se obtienen vía
/// [`Client::take_event`] o [`Client::await_event`].
#[derive(Debug)]
pub struct Client {
stream: UnixStream,
session: SessionId,
server_info: HelloAck,
pending_events: VecDeque<MatchEvent>,
}
impl Client {
/// Conecta como módulo agnóstico (sin WIT). Equivalente a
/// `connect_with(path, card, None)`.
pub async fn connect(path: impl AsRef<Path>, card: Card) -> Result<Self, ClientError> {
Self::connect_with(path, card, None).await
}
/// Conecta al socket enviando Hello con la Card dada y opcionalmente
/// una `WitInterface` ya extraída. Si `wit` es `Some`, el server
/// registra el módulo como "consciente".
pub async fn connect_with(
path: impl AsRef<Path>,
card: Card,
wit: Option<WitInterface>,
) -> Result<Self, ClientError> {
card.validate()
.map_err(|e| ClientError::InvalidCard(e.to_string()))?;
let mut stream = UnixStream::connect(path).await?;
let hello = Hello {
schema_version: CARD_SCHEMA_VERSION,
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
card: card.into(), // Card → WireCard: descarta extensions
wit,
};
write_frame(&mut stream, &Frame::Hello(hello)).await?;
let frame = read_frame(&mut stream).await?;
let ack = match frame {
Frame::HelloAck(a) => a,
Frame::Error(e) => return Err(ClientError::Server(e)),
Frame::Hello(_) => return Err(ClientError::UnexpectedFrame { got: "Hello" }),
Frame::Ping(_) => return Err(ClientError::UnexpectedFrame { got: "Ping" }),
Frame::Pong(_) => return Err(ClientError::UnexpectedFrame { got: "Pong" }),
Frame::Farewell(_) => return Err(ClientError::UnexpectedFrame { got: "Farewell" }),
Frame::MatchEvent(_) => {
return Err(ClientError::UnexpectedFrame {
got: "MatchEvent (pre-handshake)",
});
}
};
Ok(Self {
stream,
session: ack.session,
server_info: ack,
pending_events: VecDeque::new(),
})
}
/// `SessionId` asignado por el servidor.
pub fn session(&self) -> SessionId {
self.session
}
/// Información del servidor recibida en el handshake.
pub fn server_info(&self) -> &HelloAck {
&self.server_info
}
/// Envía un Ping y devuelve el timestamp del servidor. Los frames
/// `MatchEvent` que lleguen mezclados se buferean en `pending_events`.
pub async fn ping(&mut self) -> Result<u64, ClientError> {
write_frame(
&mut self.stream,
&Frame::Ping(Ping {
session: self.session,
}),
)
.await?;
loop {
match read_frame(&mut self.stream).await? {
Frame::Pong(p) => return Ok(p.timestamp_ms),
Frame::MatchEvent(ev) => self.pending_events.push_back(ev),
Frame::Error(e) => return Err(ClientError::Server(e)),
_ => return Err(ClientError::UnexpectedFrame { got: "non-pong" }),
}
}
}
/// Saca un evento pendiente del buffer, sin bloquear ni leer del wire.
pub fn take_event(&mut self) -> Option<MatchEvent> {
self.pending_events.pop_front()
}
/// Espera un `MatchEvent` con timeout. Drena primero el buffer; si
/// está vacío, lee del wire hasta el timeout. Otros frames recibidos
/// (Pong huérfano, Error) cortan la espera con error.
pub async fn await_event(
&mut self,
timeout: Duration,
) -> Result<Option<MatchEvent>, ClientError> {
if let Some(ev) = self.pending_events.pop_front() {
return Ok(Some(ev));
}
match tokio::time::timeout(timeout, read_frame(&mut self.stream)).await {
Err(_) => Ok(None),
Ok(Err(e)) => Err(ClientError::Io(e)),
Ok(Ok(Frame::MatchEvent(ev))) => Ok(Some(ev)),
Ok(Ok(Frame::Error(e))) => Err(ClientError::Server(e)),
Ok(Ok(_)) => Err(ClientError::UnexpectedFrame {
got: "non-event en await_event",
}),
}
}
/// Cierre cooperativo. Consume el cliente.
pub async fn farewell(mut self) -> Result<(), ClientError> {
write_frame(
&mut self.stream,
&Frame::Farewell(Farewell {
session: self.session,
}),
)
.await?;
Ok(())
}
}