feat(brahman-handshake): Fase 1 — handshake brahman sobre stream libp2p
Segundo paso del plan "el encuentro entre Entes no se restringe a
local". El protocolo brahman (Hello / HelloAck / Ping / Pong /
MatchEvent / Farewell, frames postcard length-prefixed) ahora tambien
viaja sobre streams libp2p de la malla brahman-net — el mismo Init
acepta sesiones por Unix socket Y por libp2p indistintamente, y un
consumer remoto puede dial-ar al multiaddr y completar handshake.
Cambios:
- Session<S> y Client<S> genericos: ambos dejan de estar atados a
UnixStream y pasan a ser genericos sobre S: AsyncRead + AsyncWrite
+ Unpin + Send + 'static. El path Unix queda como
Client = Client<UnixStream> (default generico). Constructores
nuevos: Server::session_from_stream(stream),
Client::connect_with_stream(stream, card, wit).
- Refactor del post-handshake con split: tokio::select! sobre
&mut self.stream requeria S: Sync indirectamente, y libp2p::Stream
no es Sync. Reemplazado por tokio::io::split(stream) -> reader loop
principal + writer task separada que drena el push channel. Writer
compartido bajo Arc<Mutex<WriteHalf<S>>> para serializar Pong/Error
inline con los MatchEvents pusheados. Cleanup garantizado en todas
las ramas. La logica del post-handshake migra a funciones libres
(run_post_handshake, handle_inbound_frame, cleanup,
broadcast_match_diffs, do_handshake, register_session,
validate_hello).
- Nuevo modulo brahman-handshake::network:
- BRAHMAN_HANDSHAKE_PROTOCOL = "/brahman/handshake/1.0.0"
- LibP2pHandshakeStream = Compat<libp2p::Stream>
- run_libp2p_accept_loop(server, net): accept loop que delega cada
stream entrante a session_from_stream(stream.compat()). Sesiones
libp2p y Unix conviven en el mismo Server — comparten broker,
push table, last_matches.
- connect_libp2p(net, peer, card, wit): abre stream libp2p al peer
y arranca handshake.
- NetworkError tipado.
Deps: brahman-handshake gana brahman-net, futures, tokio-util.
brahman-net re-exporta Multiaddr, PeerId, Stream, StreamProtocol,
Protocol, OpenStreamError para que callers no necesiten dep directa
a libp2p.
Tests: 9 verdes en el path Unix (sin regresion). Nuevo
tests/network_libp2p.rs E2E que arma server con BrahmanNet, hace
listen TCP, monta accept loop; cliente con su propio BrahmanNet
dial-ea al peer_id, completa handshake remoto, ping, farewell.
Verifica que la sesion se registro durante la conversacion y se
removio tras farewell.
This commit is contained in:
@@ -6,6 +6,7 @@ use std::time::Duration;
|
||||
|
||||
use brahman_card::{Card, WitInterface, CARD_SCHEMA_VERSION};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
use crate::codec::{read_frame, write_frame};
|
||||
@@ -34,33 +35,55 @@ pub enum ClientError {
|
||||
/// y tiene su `SessionId`. Los `MatchEvent` recibidos durante operaciones
|
||||
/// request/response se buferean en `pending_events` y se obtienen vía
|
||||
/// [`Client::take_event`] o [`Client::await_event`].
|
||||
///
|
||||
/// Genérico sobre el transport (`AsyncRead + AsyncWrite + Unpin + Send`):
|
||||
/// funciona indistintamente sobre `UnixStream` (path local) o sobre un
|
||||
/// stream libp2p wrapped con `tokio_util::compat` (path remoto, vía
|
||||
/// `brahman_handshake::network`).
|
||||
#[derive(Debug)]
|
||||
pub struct Client {
|
||||
stream: UnixStream,
|
||||
pub struct Client<S = UnixStream> {
|
||||
stream: S,
|
||||
session: SessionId,
|
||||
server_info: HelloAck,
|
||||
pending_events: VecDeque<MatchEvent>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Conecta como módulo agnóstico (sin WIT). Equivalente a
|
||||
/// `connect_with(path, card, None)`.
|
||||
impl Client<UnixStream> {
|
||||
/// Conecta como módulo agnóstico (sin WIT) sobre Unix socket.
|
||||
/// Equivalente a `connect_with(path, card, None)`.
|
||||
pub async fn connect(path: impl AsRef<Path>, card: Card) -> Result<Self, ClientError> {
|
||||
Self::connect_with(path, card, None).await
|
||||
}
|
||||
|
||||
/// Conecta al socket enviando Hello con la Card dada y opcionalmente
|
||||
/// una `WitInterface` ya extraída. Si `wit` es `Some`, el server
|
||||
/// registra el módulo como "consciente".
|
||||
/// Conecta al socket Unix enviando Hello con la Card dada y
|
||||
/// opcionalmente una `WitInterface` ya extraída. Si `wit` es `Some`,
|
||||
/// el server registra el módulo como "consciente".
|
||||
pub async fn connect_with(
|
||||
path: impl AsRef<Path>,
|
||||
card: Card,
|
||||
wit: Option<WitInterface>,
|
||||
) -> Result<Self, ClientError> {
|
||||
let stream = UnixStream::connect(path).await?;
|
||||
Self::connect_with_stream(stream, card, wit).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Client<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
{
|
||||
/// Constructor genérico: arranca el handshake sobre un stream
|
||||
/// arbitrario que ya esté abierto. Es el punto de entrada para
|
||||
/// transports alternativos (libp2p, in-memory para tests, etc.)
|
||||
/// que reusan toda la lógica del handshake post-stream-open.
|
||||
pub async fn connect_with_stream(
|
||||
mut stream: S,
|
||||
card: Card,
|
||||
wit: Option<WitInterface>,
|
||||
) -> Result<Self, ClientError> {
|
||||
card.validate()
|
||||
.map_err(|e| ClientError::InvalidCard(e.to_string()))?;
|
||||
|
||||
let mut stream = UnixStream::connect(path).await?;
|
||||
let hello = Hello {
|
||||
schema_version: CARD_SCHEMA_VERSION,
|
||||
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod codec;
|
||||
pub mod messages;
|
||||
pub mod server;
|
||||
pub mod client;
|
||||
pub mod network;
|
||||
pub mod transport;
|
||||
|
||||
pub use brahman_card::PROTOCOL_VERSION;
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
//! Backend libp2p del handshake brahman: el mismo protocolo (Hello /
|
||||
//! HelloAck / Ping / Pong / MatchEvent / Farewell, frames postcard
|
||||
//! length-prefixed) ahora también viaja sobre streams libp2p de la
|
||||
//! malla `brahman-net`.
|
||||
//!
|
||||
//! El servidor expone el bucle [`run_libp2p_accept_loop`] que acepta
|
||||
//! streams del protocolo `BRAHMAN_HANDSHAKE_PROTOCOL` y los delega al
|
||||
//! mismo `Server` que ya escucha por Unix socket — la `Session` es
|
||||
//! genérica sobre el transporte, así que ambas vías comparten broker,
|
||||
//! tablas de sesiones, push de MatchEvents, todo.
|
||||
//!
|
||||
//! El cliente se conecta vía [`connect_libp2p`]: abre un stream
|
||||
//! libp2p hacia un `PeerId` ya conocido y arranca el handshake como
|
||||
//! cualquier `Client`.
|
||||
//!
|
||||
//! Identidad: cada nodo libp2p tiene su `PeerId` (ed25519 derivado).
|
||||
//! La identidad del Ente (Card.id ULID + futura firma) viaja en el
|
||||
//! Hello, como en el path Unix. Trust remoto (verificación de firma
|
||||
//! antes de aceptar el Hello) es Fase 3.
|
||||
//!
|
||||
//! Ejemplo (servidor — Arje):
|
||||
//! ```ignore
|
||||
//! let server = Arc::new(Server::bind("/run/brahman-init.sock", config)?);
|
||||
//! let net = Arc::new(BrahmanNet::new()?);
|
||||
//! net.listen("/ip4/0.0.0.0/tcp/4101".parse()?).await;
|
||||
//!
|
||||
//! tokio::spawn(brahman_handshake::network::run_libp2p_accept_loop(
|
||||
//! server.clone(),
|
||||
//! net.clone(),
|
||||
//! ));
|
||||
//! // Server::run sigue escuchando Unix en paralelo.
|
||||
//! ```
|
||||
//!
|
||||
//! Ejemplo (cliente — sidecar de un Ente remoto):
|
||||
//! ```ignore
|
||||
//! let net = BrahmanNet::new()?;
|
||||
//! net.dial(remote_multiaddr);
|
||||
//! let mut client = brahman_handshake::network::connect_libp2p(
|
||||
//! &net, peer_id, my_card, None,
|
||||
//! ).await?;
|
||||
//! client.ping().await?;
|
||||
//! ```
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use brahman_card::{Card, WitInterface};
|
||||
use brahman_net::{BrahmanNet, OpenStreamError, PeerId, Stream, StreamProtocol};
|
||||
use futures::StreamExt;
|
||||
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::client::{Client, ClientError};
|
||||
use crate::server::Server;
|
||||
|
||||
/// Sub-protocolo del handshake brahman sobre streams libp2p.
|
||||
pub const BRAHMAN_HANDSHAKE_PROTOCOL: StreamProtocol =
|
||||
StreamProtocol::new("/brahman/handshake/1.0.0");
|
||||
|
||||
/// Tipo del stream que ve la lógica del handshake una vez convertido
|
||||
/// del mundo `futures::AsyncRead/Write` (libp2p) al mundo
|
||||
/// `tokio::io::AsyncRead/Write` (resto del crate).
|
||||
pub type LibP2pHandshakeStream = Compat<Stream>;
|
||||
|
||||
/// Errores específicos del backend libp2p.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum NetworkError {
|
||||
#[error("abrir stream libp2p: {0}")]
|
||||
OpenStream(#[from] OpenStreamError),
|
||||
|
||||
#[error("handshake: {0}")]
|
||||
Handshake(#[from] ClientError),
|
||||
|
||||
#[error("aceptar stream libp2p: {0}")]
|
||||
AcceptStream(String),
|
||||
}
|
||||
|
||||
/// Loop de aceptación de streams libp2p del protocolo handshake.
|
||||
/// Cada stream entrante se construye como `Session` reutilizando las
|
||||
/// tablas compartidas del `Server`, así que conviven con sesiones
|
||||
/// Unix indistinguibles.
|
||||
///
|
||||
/// Vive hasta que el control libp2p se cierre o el caller drop el
|
||||
/// future. Errores por sesión se loggean (no tumban el loop).
|
||||
pub async fn run_libp2p_accept_loop(
|
||||
server: Arc<Server>,
|
||||
net: Arc<BrahmanNet>,
|
||||
) -> Result<(), NetworkError> {
|
||||
let mut control = net.control.clone();
|
||||
let mut incoming = control
|
||||
.accept(BRAHMAN_HANDSHAKE_PROTOCOL)
|
||||
.map_err(|e| NetworkError::AcceptStream(e.to_string()))?;
|
||||
|
||||
while let Some((peer, stream)) = incoming.next().await {
|
||||
let server = server.clone();
|
||||
// .compat() debe pasar al spawn ADENTRO; si lo hacemos afuera
|
||||
// y capturamos `Compat<Stream>` en la closure, el future
|
||||
// resultante hereda traits que dyn AsyncReadWrite no satisface
|
||||
// (compatibility con thread-safety de tokio::spawn).
|
||||
tokio::spawn(handle_libp2p_session(server, stream, peer));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_libp2p_session(
|
||||
server: Arc<Server>,
|
||||
stream: Stream,
|
||||
peer: PeerId,
|
||||
) {
|
||||
let session = server.session_from_stream(stream.compat());
|
||||
if let Err(e) = session.handle().await {
|
||||
warn!(
|
||||
target: "brahman_handshake::network",
|
||||
peer = %peer,
|
||||
error = %e,
|
||||
"sesión libp2p terminó con error"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Conecta como cliente a un Ente remoto vía libp2p y completa el
|
||||
/// handshake. Requiere que `net` ya tenga conexión (o pueda dial-ar)
|
||||
/// al `peer`; típicamente el caller hace `net.dial(multiaddr)` antes.
|
||||
///
|
||||
/// Devuelve un `Client` típico — los métodos `ping`, `await_event`,
|
||||
/// `farewell` funcionan idéntico al path Unix. El stream subyacente
|
||||
/// es libp2p convertido vía `tokio_util::compat`.
|
||||
pub async fn connect_libp2p(
|
||||
net: &BrahmanNet,
|
||||
peer: PeerId,
|
||||
card: Card,
|
||||
wit: Option<WitInterface>,
|
||||
) -> Result<Client<LibP2pHandshakeStream>, NetworkError> {
|
||||
let mut control = net.control.clone();
|
||||
let stream = control
|
||||
.open_stream(peer, BRAHMAN_HANDSHAKE_PROTOCOL)
|
||||
.await?;
|
||||
let client = Client::connect_with_stream(stream.compat(), card, wit).await?;
|
||||
Ok(client)
|
||||
}
|
||||
@@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use brahman_broker::{Broker, Endpoint};
|
||||
use brahman_card::{Card, ResolvedCard, WitInterface, CARD_SCHEMA_VERSION};
|
||||
use tokio::io::{split, AsyncRead, AsyncWrite, WriteHalf};
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tracing::{debug, warn};
|
||||
@@ -93,17 +94,34 @@ impl Server {
|
||||
self.sessions.clone()
|
||||
}
|
||||
|
||||
/// Acepta UNA conexión, devuelve la `Session` lista para `handle()`.
|
||||
/// Acepta UNA conexión Unix, devuelve la `Session` lista para `handle()`.
|
||||
/// No corre el handler — eso es responsabilidad del llamante.
|
||||
pub async fn accept_one(&self) -> std::io::Result<Session> {
|
||||
pub async fn accept_one(&self) -> std::io::Result<Session<UnixStream>> {
|
||||
let (stream, _addr) = self.listener.accept().await?;
|
||||
Ok(Session {
|
||||
Ok(self.session_from_stream(stream))
|
||||
}
|
||||
|
||||
/// Construye una `Session` a partir de un stream arbitrario que
|
||||
/// implemente `AsyncRead + AsyncWrite + Unpin + Send`. Es el
|
||||
/// punto de entrada para transports alternativos (libp2p en
|
||||
/// `brahman_handshake::network`, in-memory para tests, etc.) que
|
||||
/// quieren reutilizar la lógica del handshake sin venir por el
|
||||
/// listener Unix.
|
||||
///
|
||||
/// Las tablas compartidas (sessions/push/last_matches/broker) se
|
||||
/// clonan, así que sesiones construidas por esta vía conviven
|
||||
/// indistinguibles en el mismo `Server`.
|
||||
pub fn session_from_stream<S>(&self, stream: S) -> Session<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
Session {
|
||||
stream,
|
||||
sessions: self.sessions.clone(),
|
||||
push_table: self.push_table.clone(),
|
||||
last_matches: self.last_matches.clone(),
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Loop de aceptación: cada conexión se despacha en una task separada.
|
||||
@@ -131,310 +149,391 @@ impl Drop for Server {
|
||||
}
|
||||
}
|
||||
|
||||
/// Conexión individual aceptada por el servidor.
|
||||
pub struct Session {
|
||||
stream: UnixStream,
|
||||
/// Conexión individual aceptada por el servidor. Genérica sobre el
|
||||
/// transport — funciona indistinguiblemente sobre `UnixStream` (modo
|
||||
/// local), libp2p stream wrapped con `tokio_util::compat`, in-memory
|
||||
/// duplex (tests), etc.
|
||||
pub struct Session<S> {
|
||||
stream: S,
|
||||
sessions: SessionRegistry,
|
||||
push_table: SessionTxTable,
|
||||
last_matches: LastMatches,
|
||||
config: ServerConfig,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
/// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings.
|
||||
/// Garantiza cleanup (sessions + broker) sin importar la rama de salida.
|
||||
pub async fn handle(mut self) -> std::io::Result<()> {
|
||||
let session_id = match self.do_handshake().await? {
|
||||
impl<S> Session<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
/// Procesa la conexión hasta `Farewell` o EOF.
|
||||
///
|
||||
/// Estructura: handshake (sobre el stream entero) → split en
|
||||
/// halves (read + write) → reader loop principal + writer task
|
||||
/// que drena el push channel. Garantiza cleanup (sessions + broker
|
||||
/// + canales) sin importar la rama de salida.
|
||||
///
|
||||
/// El split es necesario para soportar streams `!Sync` como
|
||||
/// `libp2p::Stream`: `tokio::select!` sobre `&mut self.stream`
|
||||
/// requeriría `S: Sync`. Con `tokio::io::split` cada mitad va a
|
||||
/// su propia task, eliminando el requirement y permitiendo que
|
||||
/// la misma `Session` sirva indistinta sobre Unix socket o
|
||||
/// stream libp2p remoto.
|
||||
pub async fn handle(self) -> std::io::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
sessions,
|
||||
push_table,
|
||||
last_matches,
|
||||
config,
|
||||
} = self;
|
||||
|
||||
let session_id = match do_handshake(&mut stream, &config, &sessions).await? {
|
||||
Some(id) => id,
|
||||
None => return Ok(()), // Hello rechazado, no se registró nada
|
||||
};
|
||||
|
||||
let result = self.run_post_handshake(session_id).await;
|
||||
self.cleanup(session_id).await;
|
||||
let result = run_post_handshake(
|
||||
stream,
|
||||
session_id,
|
||||
push_table.clone(),
|
||||
last_matches.clone(),
|
||||
config.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
cleanup(
|
||||
session_id,
|
||||
&sessions,
|
||||
&push_table,
|
||||
&last_matches,
|
||||
&config,
|
||||
)
|
||||
.await;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> {
|
||||
// Canal por donde el server inyecta frames push (MatchEvent, etc.).
|
||||
let (tx, mut rx) = mpsc::channel::<Frame>(PUSH_CHANNEL_CAPACITY);
|
||||
self.push_table.lock().await.insert(session_id, tx);
|
||||
}
|
||||
|
||||
// Tras registrar el canal, recomputar matches y emitir diffs a
|
||||
// todas las sesiones afectadas (incluida ésta, si tiene inputs).
|
||||
self.broadcast_match_diffs().await;
|
||||
// ============================================================================
|
||||
// Free functions (post-refactor): la lógica del post-handshake corre sobre
|
||||
// halves del stream; no necesita más `&mut Session<S>` y por eso vive afuera.
|
||||
// ============================================================================
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Frame entrante del cliente.
|
||||
res = read_frame(&mut self.stream) => {
|
||||
match res {
|
||||
Ok(frame) => {
|
||||
if !self.handle_inbound_frame(session_id, frame).await? {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
debug!(session = %session_id, "cliente cerró sin Farewell");
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
// Frame push desde el server (MatchEvent).
|
||||
Some(frame) = rx.recv() => {
|
||||
write_frame(&mut self.stream, &frame).await?;
|
||||
}
|
||||
async fn run_post_handshake<S>(
|
||||
stream: S,
|
||||
session_id: SessionId,
|
||||
push_table: SessionTxTable,
|
||||
last_matches: LastMatches,
|
||||
config: ServerConfig,
|
||||
) -> std::io::Result<()>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
// Canal por donde el server inyecta frames push (MatchEvent, etc.).
|
||||
let (tx, mut rx) = mpsc::channel::<Frame>(PUSH_CHANNEL_CAPACITY);
|
||||
push_table.lock().await.insert(session_id, tx);
|
||||
|
||||
// Tras registrar el canal, recomputar matches y emitir diffs.
|
||||
broadcast_match_diffs(&push_table, &last_matches, &config).await;
|
||||
|
||||
// Split: reader en el hilo principal, writer compartido bajo Mutex
|
||||
// entre la writer task (push channel) y el handler de inbound
|
||||
// (que escribe Pong/Error). Mutex serializa writes; es OK porque
|
||||
// la frecuencia de writes por sesión es baja.
|
||||
let (mut reader, writer) = split(stream);
|
||||
let writer = Arc::new(Mutex::new(writer));
|
||||
|
||||
// Writer task: drena el push channel.
|
||||
let writer_for_push = writer.clone();
|
||||
let writer_task = tokio::spawn(async move {
|
||||
while let Some(frame) = rx.recv().await {
|
||||
let mut w = writer_for_push.lock().await;
|
||||
if write_frame(&mut *w, &frame).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
/// Maneja un frame entrante. Devuelve `Ok(false)` si la sesión debe
|
||||
/// cerrarse limpiamente (Farewell con session-id correcto).
|
||||
async fn handle_inbound_frame(
|
||||
&mut self,
|
||||
session_id: SessionId,
|
||||
frame: Frame,
|
||||
) -> std::io::Result<bool> {
|
||||
match frame {
|
||||
Frame::Ping(Ping { session }) if session == session_id => {
|
||||
let pong = Pong {
|
||||
timestamp_ms: now_ms(),
|
||||
};
|
||||
write_frame(&mut self.stream, &Frame::Pong(pong)).await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::Ping(_) => {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::Farewell(Farewell { session }) if session == session_id => Ok(false),
|
||||
Frame::Farewell(_) => {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::Error(HandshakeError::Rejected(
|
||||
"frame inesperado tras handshake".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
// Reader loop principal.
|
||||
let result: std::io::Result<()> = loop {
|
||||
match read_frame(&mut reader).await {
|
||||
Ok(frame) => match handle_inbound_frame(session_id, frame, &writer).await {
|
||||
Ok(true) => continue,
|
||||
Ok(false) => break Ok(()),
|
||||
Err(e) => break Err(e),
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
debug!(session = %session_id, "cliente cerró sin Farewell");
|
||||
break Ok(());
|
||||
}
|
||||
Err(e) => break Err(e),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Limpieza atómica de TRES vistas: registro de sesiones + broker +
|
||||
/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF,
|
||||
/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron
|
||||
/// el match contra ésta.
|
||||
async fn cleanup(&self, session_id: SessionId) {
|
||||
self.sessions.lock().await.remove(&session_id);
|
||||
self.push_table.lock().await.remove(&session_id);
|
||||
self.last_matches.lock().await.remove(&session_id);
|
||||
if let Some(broker) = &self.config.broker {
|
||||
broker.lock().await.unregister(session_id);
|
||||
}
|
||||
self.broadcast_match_diffs().await;
|
||||
}
|
||||
// Cerrar writer: drop nuestro Arc y abortar la task. La task
|
||||
// saldrá igual cuando rx se cierre por drop del último Sender,
|
||||
// pero abortarla es más rápido que esperar a que próximo recv()
|
||||
// observe el cierre.
|
||||
drop(writer);
|
||||
writer_task.abort();
|
||||
let _ = writer_task.await;
|
||||
|
||||
/// Recomputa los matches para todas las sesiones registradas y empuja
|
||||
/// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron
|
||||
/// respecto al último estado conocido.
|
||||
///
|
||||
/// Se llama tras cada `register_session` y `cleanup`. Las inserciones
|
||||
/// al canal usan `try_send` (non-blocking); si el cliente está lento
|
||||
/// y se llena el buffer, los eventos extra se pierden — es ephemeral
|
||||
/// y el cliente puede re-consultar el estado vía `brahman-status`.
|
||||
async fn broadcast_match_diffs(&self) {
|
||||
let broker = match &self.config.broker {
|
||||
Some(b) => b,
|
||||
None => return,
|
||||
};
|
||||
result
|
||||
}
|
||||
|
||||
let b = broker.lock().await;
|
||||
let push_table = self.push_table.lock().await;
|
||||
let mut last = self.last_matches.lock().await;
|
||||
|
||||
debug!(
|
||||
target: "brahman_handshake::broadcast",
|
||||
cards = b.len(),
|
||||
push_subscribers = push_table.len(),
|
||||
"broadcast_match_diffs"
|
||||
);
|
||||
|
||||
// Snapshot de cards para no tener que sostener el lock del broker.
|
||||
let cards: Vec<_> = b.cards().cloned().collect();
|
||||
|
||||
for cons in &cards {
|
||||
let cons_session = cons.session;
|
||||
let tx = match push_table.get(&cons_session) {
|
||||
Some(tx) => tx,
|
||||
None => continue, // todavía no tiene canal push
|
||||
async fn handle_inbound_frame<S>(
|
||||
session_id: SessionId,
|
||||
frame: Frame,
|
||||
writer: &Arc<Mutex<WriteHalf<S>>>,
|
||||
) -> std::io::Result<bool>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
match frame {
|
||||
Frame::Ping(Ping { session }) if session == session_id => {
|
||||
let pong = Pong {
|
||||
timestamp_ms: now_ms(),
|
||||
};
|
||||
let cons_last = last.entry(cons_session).or_default();
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(&mut *w, &Frame::Pong(pong)).await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::Ping(_) => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
Frame::Farewell(Farewell { session }) if session == session_id => Ok(false),
|
||||
Frame::Farewell(_) => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::Error(HandshakeError::Unauthorized(
|
||||
"session-id no coincide".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => {
|
||||
let mut w = writer.lock().await;
|
||||
write_frame(
|
||||
&mut *w,
|
||||
&Frame::Error(HandshakeError::Rejected(
|
||||
"frame inesperado tras handshake".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for input in &cons.inputs {
|
||||
let new_match = b.find_producer_for(cons_session, &input.name);
|
||||
let new_endpoint = new_match.as_ref().map(|m| m.producer.clone());
|
||||
let old_endpoint = cons_last.get(&input.name).cloned();
|
||||
/// Limpieza atómica de TRES vistas: registro de sesiones + broker +
|
||||
/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF,
|
||||
/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron
|
||||
/// el match contra ésta.
|
||||
async fn cleanup(
|
||||
session_id: SessionId,
|
||||
sessions: &SessionRegistry,
|
||||
push_table: &SessionTxTable,
|
||||
last_matches: &LastMatches,
|
||||
config: &ServerConfig,
|
||||
) {
|
||||
sessions.lock().await.remove(&session_id);
|
||||
push_table.lock().await.remove(&session_id);
|
||||
last_matches.lock().await.remove(&session_id);
|
||||
if let Some(broker) = &config.broker {
|
||||
broker.lock().await.unregister(session_id);
|
||||
}
|
||||
broadcast_match_diffs(push_table, last_matches, config).await;
|
||||
}
|
||||
|
||||
if old_endpoint == new_endpoint {
|
||||
continue;
|
||||
}
|
||||
/// Recomputa los matches para todas las sesiones registradas y empuja
|
||||
/// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron
|
||||
/// respecto al último estado conocido.
|
||||
async fn broadcast_match_diffs(
|
||||
push_table: &SessionTxTable,
|
||||
last_matches: &LastMatches,
|
||||
config: &ServerConfig,
|
||||
) {
|
||||
let broker = match &config.broker {
|
||||
Some(b) => b,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if let Some(m) = &new_match {
|
||||
// Resolvemos el service_socket del productor desde
|
||||
// la BrokeredCard; pasarlo en el evento permite al
|
||||
// consumer conectar directo sin discovery extra.
|
||||
let producer_service_socket = b
|
||||
.cards()
|
||||
.find(|c| c.session == m.producer.session)
|
||||
.and_then(|c| c.service_socket.clone());
|
||||
let event = MatchEvent {
|
||||
kind: MatchEventKind::Available,
|
||||
consumer_flow: input.name.clone(),
|
||||
producer_session: m.producer.session,
|
||||
producer_label: m.producer_label.clone(),
|
||||
producer_flow: m.producer.flow_name.clone(),
|
||||
ty: m.ty.clone(),
|
||||
via: m.via,
|
||||
pinned: m.pinned,
|
||||
producer_service_socket,
|
||||
};
|
||||
let send_res = tx.try_send(Frame::MatchEvent(event));
|
||||
debug!(
|
||||
target: "brahman_handshake::broadcast",
|
||||
consumer = %cons_session,
|
||||
flow = %input.name,
|
||||
producer = %m.producer_label,
|
||||
result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e {
|
||||
tokio::sync::mpsc::error::TrySendError::Full(_) => "full",
|
||||
tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed",
|
||||
}),
|
||||
"Available pushed"
|
||||
);
|
||||
} else {
|
||||
// Tenía match, ahora no.
|
||||
let event = MatchEvent {
|
||||
kind: MatchEventKind::Lost,
|
||||
consumer_flow: input.name.clone(),
|
||||
producer_session: Ulid::nil(),
|
||||
producer_label: String::new(),
|
||||
producer_flow: String::new(),
|
||||
ty: input.ty.clone(),
|
||||
via: brahman_broker::MatchStrategy::Exact,
|
||||
pinned: false,
|
||||
producer_service_socket: None,
|
||||
};
|
||||
let _ = tx.try_send(Frame::MatchEvent(event));
|
||||
}
|
||||
let b = broker.lock().await;
|
||||
let push_table = push_table.lock().await;
|
||||
let mut last = last_matches.lock().await;
|
||||
|
||||
if let Some(ep) = new_endpoint {
|
||||
cons_last.insert(input.name.clone(), ep);
|
||||
} else {
|
||||
cons_last.remove(&input.name);
|
||||
}
|
||||
debug!(
|
||||
target: "brahman_handshake::broadcast",
|
||||
cards = b.len(),
|
||||
push_subscribers = push_table.len(),
|
||||
"broadcast_match_diffs"
|
||||
);
|
||||
|
||||
let cards: Vec<_> = b.cards().cloned().collect();
|
||||
|
||||
for cons in &cards {
|
||||
let cons_session = cons.session;
|
||||
let tx = match push_table.get(&cons_session) {
|
||||
Some(tx) => tx,
|
||||
None => continue,
|
||||
};
|
||||
let cons_last = last.entry(cons_session).or_default();
|
||||
|
||||
for input in &cons.inputs {
|
||||
let new_match = b.find_producer_for(cons_session, &input.name);
|
||||
let new_endpoint = new_match.as_ref().map(|m| m.producer.clone());
|
||||
let old_endpoint = cons_last.get(&input.name).cloned();
|
||||
|
||||
if old_endpoint == new_endpoint {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(m) = &new_match {
|
||||
let producer_service_socket = b
|
||||
.cards()
|
||||
.find(|c| c.session == m.producer.session)
|
||||
.and_then(|c| c.service_socket.clone());
|
||||
let event = MatchEvent {
|
||||
kind: MatchEventKind::Available,
|
||||
consumer_flow: input.name.clone(),
|
||||
producer_session: m.producer.session,
|
||||
producer_label: m.producer_label.clone(),
|
||||
producer_flow: m.producer.flow_name.clone(),
|
||||
ty: m.ty.clone(),
|
||||
via: m.via,
|
||||
pinned: m.pinned,
|
||||
producer_service_socket,
|
||||
};
|
||||
let send_res = tx.try_send(Frame::MatchEvent(event));
|
||||
debug!(
|
||||
target: "brahman_handshake::broadcast",
|
||||
consumer = %cons_session,
|
||||
flow = %input.name,
|
||||
producer = %m.producer_label,
|
||||
result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e {
|
||||
tokio::sync::mpsc::error::TrySendError::Full(_) => "full",
|
||||
tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed",
|
||||
}),
|
||||
"Available pushed"
|
||||
);
|
||||
} else {
|
||||
let event = MatchEvent {
|
||||
kind: MatchEventKind::Lost,
|
||||
consumer_flow: input.name.clone(),
|
||||
producer_session: Ulid::nil(),
|
||||
producer_label: String::new(),
|
||||
producer_flow: String::new(),
|
||||
ty: input.ty.clone(),
|
||||
via: brahman_broker::MatchStrategy::Exact,
|
||||
pinned: false,
|
||||
producer_service_socket: None,
|
||||
};
|
||||
let _ = tx.try_send(Frame::MatchEvent(event));
|
||||
}
|
||||
|
||||
if let Some(ep) = new_endpoint {
|
||||
cons_last.insert(input.name.clone(), ep);
|
||||
} else {
|
||||
cons_last.remove(&input.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lee el Hello, valida, registra la sesión y emite HelloAck.
|
||||
/// Devuelve `Some(session_id)` si el handshake fue exitoso.
|
||||
async fn do_handshake(&mut self) -> std::io::Result<Option<SessionId>> {
|
||||
let frame = read_frame(&mut self.stream).await?;
|
||||
let hello = match frame {
|
||||
Frame::Hello(h) => h,
|
||||
_ => {
|
||||
write_frame(
|
||||
&mut self.stream,
|
||||
&Frame::Error(HandshakeError::Rejected(
|
||||
"primer frame debe ser Hello".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = self.validate_hello(&hello) {
|
||||
write_frame(&mut self.stream, &Frame::Error(err)).await?;
|
||||
/// Lee el Hello, valida, registra la sesión y emite HelloAck.
|
||||
async fn do_handshake<S>(
|
||||
stream: &mut S,
|
||||
config: &ServerConfig,
|
||||
sessions: &SessionRegistry,
|
||||
) -> std::io::Result<Option<SessionId>>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let frame = read_frame(stream).await?;
|
||||
let hello = match frame {
|
||||
Frame::Hello(h) => h,
|
||||
_ => {
|
||||
write_frame(
|
||||
stream,
|
||||
&Frame::Error(HandshakeError::Rejected(
|
||||
"primer frame debe ser Hello".into(),
|
||||
)),
|
||||
)
|
||||
.await?;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let session_id = Ulid::new();
|
||||
// WireCard → Card: extensiones quedan vacías post-wire (es el contrato).
|
||||
let card: Card = hello.card.into();
|
||||
self.register_session(session_id, card, hello.wit).await;
|
||||
|
||||
let ack = HelloAck {
|
||||
server_version: crate::HANDSHAKE_VERSION.to_string(),
|
||||
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
|
||||
session: session_id,
|
||||
init_attached: self.config.init_attached,
|
||||
};
|
||||
write_frame(&mut self.stream, &Frame::HelloAck(ack)).await?;
|
||||
debug!(session = %session_id, "handshake completado");
|
||||
Ok(Some(session_id))
|
||||
if let Some(err) = validate_hello(&hello) {
|
||||
write_frame(stream, &Frame::Error(err)).await?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
/// Indexa la sesión: ResolvedCard en sessions + Card en broker (si hay).
|
||||
/// Si `wit` está presente, el módulo se registra como "consciente".
|
||||
async fn register_session(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
card: Card,
|
||||
wit: Option<WitInterface>,
|
||||
) {
|
||||
if let Some(broker) = &self.config.broker {
|
||||
broker
|
||||
.lock()
|
||||
.await
|
||||
.register(session_id, &card, wit.clone());
|
||||
}
|
||||
let resolved = match wit {
|
||||
Some(w) => ResolvedCard::from_conscious(card, w),
|
||||
None => ResolvedCard::from_agnostic(card),
|
||||
};
|
||||
self.sessions.lock().await.insert(session_id, resolved);
|
||||
}
|
||||
let session_id = Ulid::new();
|
||||
let card: Card = hello.card.into();
|
||||
register_session(session_id, card, hello.wit, config, sessions).await;
|
||||
|
||||
/// Validaciones que el servidor aplica al Hello del cliente.
|
||||
fn validate_hello(&self, hello: &Hello) -> Option<HandshakeError> {
|
||||
if hello.schema_version != CARD_SCHEMA_VERSION {
|
||||
return Some(HandshakeError::SchemaMismatch {
|
||||
client: hello.schema_version,
|
||||
server: CARD_SCHEMA_VERSION,
|
||||
});
|
||||
}
|
||||
if hello.protocol_version != brahman_card::PROTOCOL_VERSION {
|
||||
return Some(HandshakeError::ProtocolMismatch(format!(
|
||||
"cliente={}, servidor={}",
|
||||
hello.protocol_version,
|
||||
brahman_card::PROTOCOL_VERSION
|
||||
)));
|
||||
}
|
||||
// Validamos contra Card (la rica) — convertir es barato y centraliza
|
||||
// la lógica de validación en un solo lugar.
|
||||
let as_card: Card = Card::from(hello.card.clone());
|
||||
if let Err(e) = as_card.validate() {
|
||||
return Some(HandshakeError::InvalidCard(e.to_string()));
|
||||
}
|
||||
None
|
||||
let ack = HelloAck {
|
||||
server_version: crate::HANDSHAKE_VERSION.to_string(),
|
||||
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
|
||||
session: session_id,
|
||||
init_attached: config.init_attached,
|
||||
};
|
||||
write_frame(stream, &Frame::HelloAck(ack)).await?;
|
||||
debug!(session = %session_id, "handshake completado");
|
||||
Ok(Some(session_id))
|
||||
}
|
||||
|
||||
async fn register_session(
|
||||
session_id: SessionId,
|
||||
card: Card,
|
||||
wit: Option<WitInterface>,
|
||||
config: &ServerConfig,
|
||||
sessions: &SessionRegistry,
|
||||
) {
|
||||
if let Some(broker) = &config.broker {
|
||||
broker
|
||||
.lock()
|
||||
.await
|
||||
.register(session_id, &card, wit.clone());
|
||||
}
|
||||
let resolved = match wit {
|
||||
Some(w) => ResolvedCard::from_conscious(card, w),
|
||||
None => ResolvedCard::from_agnostic(card),
|
||||
};
|
||||
sessions.lock().await.insert(session_id, resolved);
|
||||
}
|
||||
|
||||
fn validate_hello(hello: &Hello) -> Option<HandshakeError> {
|
||||
if hello.schema_version != CARD_SCHEMA_VERSION {
|
||||
return Some(HandshakeError::SchemaMismatch {
|
||||
client: hello.schema_version,
|
||||
server: CARD_SCHEMA_VERSION,
|
||||
});
|
||||
}
|
||||
if hello.protocol_version != brahman_card::PROTOCOL_VERSION {
|
||||
return Some(HandshakeError::ProtocolMismatch(format!(
|
||||
"cliente={}, servidor={}",
|
||||
hello.protocol_version,
|
||||
brahman_card::PROTOCOL_VERSION
|
||||
)));
|
||||
}
|
||||
let as_card: Card = Card::from(hello.card.clone());
|
||||
if let Err(e) = as_card.validate() {
|
||||
return Some(HandshakeError::InvalidCard(e.to_string()));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn now_ms() -> u64 {
|
||||
|
||||
Reference in New Issue
Block a user