diff --git a/CHANGELOG.md b/CHANGELOG.md index f8305f8..e95a324 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,61 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(brahman-handshake): Fase 1 — handshake brahman sobre stream libp2p +Segundo paso del plan "el encuentro entre Entes no se restringe a +local". El protocolo brahman (Hello / HelloAck / Ping / Pong / +MatchEvent / Farewell, frames postcard length-prefixed) ahora también +viaja sobre streams libp2p de la malla `brahman-net` — el mismo Init +acepta sesiones por Unix socket Y por libp2p indistintamente, y un +consumer remoto puede dial-ar al multiaddr y completar handshake. + +Cambios: +- **`Session` y `Client` genéricos**: ambos dejan de estar atados + a `UnixStream` y pasan a ser genéricos sobre `S: AsyncRead + + AsyncWrite + Unpin + Send + 'static`. El path Unix queda como + `Client = Client` (default genérico). Constructores + nuevos: `Server::session_from_stream(stream)`, + `Client::connect_with_stream(stream, card, wit)`. +- **Refactor del post-handshake con split**: `tokio::select!` sobre + `&mut self.stream` requería `S: Sync` indirectamente, y + `libp2p::Stream` no es Sync. Reemplazado por + `tokio::io::split(stream)` → reader loop principal + writer task + separada que drena el push channel. Writer compartido bajo + `Arc>>` para serializar Pong/Error inline con + los MatchEvents pusheados. Cleanup garantizado en todas las ramas. + La lógica del post-handshake migra a funciones libres + (`run_post_handshake`, `handle_inbound_frame`, `cleanup`, + `broadcast_match_diffs`, `do_handshake`, `register_session`, + `validate_hello`). +- **Nuevo módulo `brahman-handshake::network`**: + - `BRAHMAN_HANDSHAKE_PROTOCOL = "/brahman/handshake/1.0.0"`. + - `LibP2pHandshakeStream = Compat` (alias del + stream una vez convertido al mundo `tokio::io`). + - `run_libp2p_accept_loop(server, net)`: bucle accept sobre el + protocolo que delega cada stream entrante a una `Session` + construida vía `server.session_from_stream(stream.compat())`. + Sesiones libp2p y Unix conviven en el mismo `Server` — + comparten broker, push table, last_matches. + - `connect_libp2p(net, peer, card, wit)`: abre stream libp2p al + `peer` y arranca handshake. + - `NetworkError` tipado (`OpenStream`, `Handshake`, `AcceptStream`). + +Deps: `brahman-handshake` gana `brahman-net`, `futures`, `tokio-util`. +`brahman-net` re-exporta `Multiaddr`, `PeerId`, `Stream`, +`StreamProtocol`, `Protocol`, `OpenStreamError` para que callers no +necesiten dep directa a libp2p. + +Tests: +- 9 tests unit + integration verdes (sin regresión en el path Unix). +- Nuevo `tests/network_libp2p.rs`: test E2E que arma server con + Unix socket + BrahmanNet, hace listen TCP, monta el accept loop; + cliente con su propio BrahmanNet dial-ea al peer_id, completa + handshake remoto, pinguea, farewell. Verifica que la sesión se + registró durante la conversación y se removió tras farewell. + +Próximo: Fase 2 (discovery remoto vía DHT — anunciar Cards bajo +flow type, broker query local + remoto). + ### feat(brahman-net): capa P2P compartida — Fase 0 (extracción del swarm libp2p) Primer paso del plan "el encuentro entre Entes no se restringe a local". El swarm libp2p que vivía dentro de `minga-p2p::network` (282 LOC) sale diff --git a/Cargo.lock b/Cargo.lock index fe9aedd..15aeb83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1201,11 +1201,14 @@ dependencies = [ "anyhow", "brahman-broker", "brahman-card", + "brahman-net", + "futures", "postcard", "serde", "tempfile", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", "ulid", ] diff --git a/crates/core/brahman-handshake/Cargo.toml b/crates/core/brahman-handshake/Cargo.toml index 0674620..b931377 100644 --- a/crates/core/brahman-handshake/Cargo.toml +++ b/crates/core/brahman-handshake/Cargo.toml @@ -6,14 +6,17 @@ rust-version.workspace = true license.workspace = true authors.workspace = true publish.workspace = true -description = "Brahman — handshake runtime Init↔módulo sobre Unix socket (postcard frames)." +description = "Brahman — handshake runtime Init↔módulo. Local sobre Unix socket; remoto sobre stream libp2p (brahman-net)." [dependencies] brahman-card = { path = "../brahman-card" } brahman-broker = { path = "../brahman-broker" } +brahman-net = { path = "../../shared/brahman-net" } +futures = { workspace = true } serde = { workspace = true } postcard = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } thiserror = { workspace = true } ulid = { workspace = true } tracing = { workspace = true } diff --git a/crates/core/brahman-handshake/src/client.rs b/crates/core/brahman-handshake/src/client.rs index 367e222..8b08d6f 100644 --- a/crates/core/brahman-handshake/src/client.rs +++ b/crates/core/brahman-handshake/src/client.rs @@ -6,6 +6,7 @@ use std::time::Duration; use brahman_card::{Card, WitInterface, CARD_SCHEMA_VERSION}; use thiserror::Error; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::UnixStream; use crate::codec::{read_frame, write_frame}; @@ -34,33 +35,55 @@ pub enum ClientError { /// y tiene su `SessionId`. Los `MatchEvent` recibidos durante operaciones /// request/response se buferean en `pending_events` y se obtienen vía /// [`Client::take_event`] o [`Client::await_event`]. +/// +/// Genérico sobre el transport (`AsyncRead + AsyncWrite + Unpin + Send`): +/// funciona indistintamente sobre `UnixStream` (path local) o sobre un +/// stream libp2p wrapped con `tokio_util::compat` (path remoto, vía +/// `brahman_handshake::network`). #[derive(Debug)] -pub struct Client { - stream: UnixStream, +pub struct Client { + stream: S, session: SessionId, server_info: HelloAck, pending_events: VecDeque, } -impl Client { - /// Conecta como módulo agnóstico (sin WIT). Equivalente a - /// `connect_with(path, card, None)`. +impl Client { + /// Conecta como módulo agnóstico (sin WIT) sobre Unix socket. + /// Equivalente a `connect_with(path, card, None)`. pub async fn connect(path: impl AsRef, card: Card) -> Result { Self::connect_with(path, card, None).await } - /// Conecta al socket enviando Hello con la Card dada y opcionalmente - /// una `WitInterface` ya extraída. Si `wit` es `Some`, el server - /// registra el módulo como "consciente". + /// Conecta al socket Unix enviando Hello con la Card dada y + /// opcionalmente una `WitInterface` ya extraída. Si `wit` es `Some`, + /// el server registra el módulo como "consciente". pub async fn connect_with( path: impl AsRef, card: Card, wit: Option, + ) -> Result { + let stream = UnixStream::connect(path).await?; + Self::connect_with_stream(stream, card, wit).await + } +} + +impl Client +where + S: AsyncRead + AsyncWrite + Unpin + Send, +{ + /// Constructor genérico: arranca el handshake sobre un stream + /// arbitrario que ya esté abierto. Es el punto de entrada para + /// transports alternativos (libp2p, in-memory para tests, etc.) + /// que reusan toda la lógica del handshake post-stream-open. + pub async fn connect_with_stream( + mut stream: S, + card: Card, + wit: Option, ) -> Result { card.validate() .map_err(|e| ClientError::InvalidCard(e.to_string()))?; - let mut stream = UnixStream::connect(path).await?; let hello = Hello { schema_version: CARD_SCHEMA_VERSION, protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), diff --git a/crates/core/brahman-handshake/src/lib.rs b/crates/core/brahman-handshake/src/lib.rs index 8486b4d..a16188e 100644 --- a/crates/core/brahman-handshake/src/lib.rs +++ b/crates/core/brahman-handshake/src/lib.rs @@ -21,6 +21,7 @@ pub mod codec; pub mod messages; pub mod server; pub mod client; +pub mod network; pub mod transport; pub use brahman_card::PROTOCOL_VERSION; diff --git a/crates/core/brahman-handshake/src/network.rs b/crates/core/brahman-handshake/src/network.rs new file mode 100644 index 0000000..d032062 --- /dev/null +++ b/crates/core/brahman-handshake/src/network.rs @@ -0,0 +1,140 @@ +//! Backend libp2p del handshake brahman: el mismo protocolo (Hello / +//! HelloAck / Ping / Pong / MatchEvent / Farewell, frames postcard +//! length-prefixed) ahora también viaja sobre streams libp2p de la +//! malla `brahman-net`. +//! +//! El servidor expone el bucle [`run_libp2p_accept_loop`] que acepta +//! streams del protocolo `BRAHMAN_HANDSHAKE_PROTOCOL` y los delega al +//! mismo `Server` que ya escucha por Unix socket — la `Session` es +//! genérica sobre el transporte, así que ambas vías comparten broker, +//! tablas de sesiones, push de MatchEvents, todo. +//! +//! El cliente se conecta vía [`connect_libp2p`]: abre un stream +//! libp2p hacia un `PeerId` ya conocido y arranca el handshake como +//! cualquier `Client`. +//! +//! Identidad: cada nodo libp2p tiene su `PeerId` (ed25519 derivado). +//! La identidad del Ente (Card.id ULID + futura firma) viaja en el +//! Hello, como en el path Unix. Trust remoto (verificación de firma +//! antes de aceptar el Hello) es Fase 3. +//! +//! Ejemplo (servidor — Arje): +//! ```ignore +//! let server = Arc::new(Server::bind("/run/brahman-init.sock", config)?); +//! let net = Arc::new(BrahmanNet::new()?); +//! net.listen("/ip4/0.0.0.0/tcp/4101".parse()?).await; +//! +//! tokio::spawn(brahman_handshake::network::run_libp2p_accept_loop( +//! server.clone(), +//! net.clone(), +//! )); +//! // Server::run sigue escuchando Unix en paralelo. +//! ``` +//! +//! Ejemplo (cliente — sidecar de un Ente remoto): +//! ```ignore +//! let net = BrahmanNet::new()?; +//! net.dial(remote_multiaddr); +//! let mut client = brahman_handshake::network::connect_libp2p( +//! &net, peer_id, my_card, None, +//! ).await?; +//! client.ping().await?; +//! ``` + +use std::sync::Arc; + +use brahman_card::{Card, WitInterface}; +use brahman_net::{BrahmanNet, OpenStreamError, PeerId, Stream, StreamProtocol}; +use futures::StreamExt; +use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; +use tracing::warn; + +use crate::client::{Client, ClientError}; +use crate::server::Server; + +/// Sub-protocolo del handshake brahman sobre streams libp2p. +pub const BRAHMAN_HANDSHAKE_PROTOCOL: StreamProtocol = + StreamProtocol::new("/brahman/handshake/1.0.0"); + +/// Tipo del stream que ve la lógica del handshake una vez convertido +/// del mundo `futures::AsyncRead/Write` (libp2p) al mundo +/// `tokio::io::AsyncRead/Write` (resto del crate). +pub type LibP2pHandshakeStream = Compat; + +/// Errores específicos del backend libp2p. +#[derive(Debug, thiserror::Error)] +pub enum NetworkError { + #[error("abrir stream libp2p: {0}")] + OpenStream(#[from] OpenStreamError), + + #[error("handshake: {0}")] + Handshake(#[from] ClientError), + + #[error("aceptar stream libp2p: {0}")] + AcceptStream(String), +} + +/// Loop de aceptación de streams libp2p del protocolo handshake. +/// Cada stream entrante se construye como `Session` reutilizando las +/// tablas compartidas del `Server`, así que conviven con sesiones +/// Unix indistinguibles. +/// +/// Vive hasta que el control libp2p se cierre o el caller drop el +/// future. Errores por sesión se loggean (no tumban el loop). +pub async fn run_libp2p_accept_loop( + server: Arc, + net: Arc, +) -> Result<(), NetworkError> { + let mut control = net.control.clone(); + let mut incoming = control + .accept(BRAHMAN_HANDSHAKE_PROTOCOL) + .map_err(|e| NetworkError::AcceptStream(e.to_string()))?; + + while let Some((peer, stream)) = incoming.next().await { + let server = server.clone(); + // .compat() debe pasar al spawn ADENTRO; si lo hacemos afuera + // y capturamos `Compat` en la closure, el future + // resultante hereda traits que dyn AsyncReadWrite no satisface + // (compatibility con thread-safety de tokio::spawn). + tokio::spawn(handle_libp2p_session(server, stream, peer)); + } + + Ok(()) +} + +async fn handle_libp2p_session( + server: Arc, + stream: Stream, + peer: PeerId, +) { + let session = server.session_from_stream(stream.compat()); + if let Err(e) = session.handle().await { + warn!( + target: "brahman_handshake::network", + peer = %peer, + error = %e, + "sesión libp2p terminó con error" + ); + } +} + +/// Conecta como cliente a un Ente remoto vía libp2p y completa el +/// handshake. Requiere que `net` ya tenga conexión (o pueda dial-ar) +/// al `peer`; típicamente el caller hace `net.dial(multiaddr)` antes. +/// +/// Devuelve un `Client` típico — los métodos `ping`, `await_event`, +/// `farewell` funcionan idéntico al path Unix. El stream subyacente +/// es libp2p convertido vía `tokio_util::compat`. +pub async fn connect_libp2p( + net: &BrahmanNet, + peer: PeerId, + card: Card, + wit: Option, +) -> Result, NetworkError> { + let mut control = net.control.clone(); + let stream = control + .open_stream(peer, BRAHMAN_HANDSHAKE_PROTOCOL) + .await?; + let client = Client::connect_with_stream(stream.compat(), card, wit).await?; + Ok(client) +} diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 1aba07c..11511f6 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use brahman_broker::{Broker, Endpoint}; use brahman_card::{Card, ResolvedCard, WitInterface, CARD_SCHEMA_VERSION}; +use tokio::io::{split, AsyncRead, AsyncWrite, WriteHalf}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, Mutex}; use tracing::{debug, warn}; @@ -93,17 +94,34 @@ impl Server { self.sessions.clone() } - /// Acepta UNA conexión, devuelve la `Session` lista para `handle()`. + /// Acepta UNA conexión Unix, devuelve la `Session` lista para `handle()`. /// No corre el handler — eso es responsabilidad del llamante. - pub async fn accept_one(&self) -> std::io::Result { + pub async fn accept_one(&self) -> std::io::Result> { let (stream, _addr) = self.listener.accept().await?; - Ok(Session { + Ok(self.session_from_stream(stream)) + } + + /// Construye una `Session` a partir de un stream arbitrario que + /// implemente `AsyncRead + AsyncWrite + Unpin + Send`. Es el + /// punto de entrada para transports alternativos (libp2p en + /// `brahman_handshake::network`, in-memory para tests, etc.) que + /// quieren reutilizar la lógica del handshake sin venir por el + /// listener Unix. + /// + /// Las tablas compartidas (sessions/push/last_matches/broker) se + /// clonan, así que sesiones construidas por esta vía conviven + /// indistinguibles en el mismo `Server`. + pub fn session_from_stream(&self, stream: S) -> Session + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + Session { stream, sessions: self.sessions.clone(), push_table: self.push_table.clone(), last_matches: self.last_matches.clone(), config: self.config.clone(), - }) + } } /// Loop de aceptación: cada conexión se despacha en una task separada. @@ -131,310 +149,391 @@ impl Drop for Server { } } -/// Conexión individual aceptada por el servidor. -pub struct Session { - stream: UnixStream, +/// Conexión individual aceptada por el servidor. Genérica sobre el +/// transport — funciona indistinguiblemente sobre `UnixStream` (modo +/// local), libp2p stream wrapped con `tokio_util::compat`, in-memory +/// duplex (tests), etc. +pub struct Session { + stream: S, sessions: SessionRegistry, push_table: SessionTxTable, last_matches: LastMatches, config: ServerConfig, } -impl Session { - /// Procesa la conexión hasta `Farewell` o EOF: handshake + loop de pings. - /// Garantiza cleanup (sessions + broker) sin importar la rama de salida. - pub async fn handle(mut self) -> std::io::Result<()> { - let session_id = match self.do_handshake().await? { +impl Session +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + /// Procesa la conexión hasta `Farewell` o EOF. + /// + /// Estructura: handshake (sobre el stream entero) → split en + /// halves (read + write) → reader loop principal + writer task + /// que drena el push channel. Garantiza cleanup (sessions + broker + /// + canales) sin importar la rama de salida. + /// + /// El split es necesario para soportar streams `!Sync` como + /// `libp2p::Stream`: `tokio::select!` sobre `&mut self.stream` + /// requeriría `S: Sync`. Con `tokio::io::split` cada mitad va a + /// su propia task, eliminando el requirement y permitiendo que + /// la misma `Session` sirva indistinta sobre Unix socket o + /// stream libp2p remoto. + pub async fn handle(self) -> std::io::Result<()> { + let Self { + mut stream, + sessions, + push_table, + last_matches, + config, + } = self; + + let session_id = match do_handshake(&mut stream, &config, &sessions).await? { Some(id) => id, None => return Ok(()), // Hello rechazado, no se registró nada }; - let result = self.run_post_handshake(session_id).await; - self.cleanup(session_id).await; + let result = run_post_handshake( + stream, + session_id, + push_table.clone(), + last_matches.clone(), + config.clone(), + ) + .await; + + cleanup( + session_id, + &sessions, + &push_table, + &last_matches, + &config, + ) + .await; + result } - async fn run_post_handshake(&mut self, session_id: SessionId) -> std::io::Result<()> { - // Canal por donde el server inyecta frames push (MatchEvent, etc.). - let (tx, mut rx) = mpsc::channel::(PUSH_CHANNEL_CAPACITY); - self.push_table.lock().await.insert(session_id, tx); +} - // Tras registrar el canal, recomputar matches y emitir diffs a - // todas las sesiones afectadas (incluida ésta, si tiene inputs). - self.broadcast_match_diffs().await; +// ============================================================================ +// Free functions (post-refactor): la lógica del post-handshake corre sobre +// halves del stream; no necesita más `&mut Session` y por eso vive afuera. +// ============================================================================ - loop { - tokio::select! { - // Frame entrante del cliente. - res = read_frame(&mut self.stream) => { - match res { - Ok(frame) => { - if !self.handle_inbound_frame(session_id, frame).await? { - return Ok(()); - } - } - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { - debug!(session = %session_id, "cliente cerró sin Farewell"); - return Ok(()); - } - Err(e) => return Err(e), - } - } - // Frame push desde el server (MatchEvent). - Some(frame) = rx.recv() => { - write_frame(&mut self.stream, &frame).await?; - } +async fn run_post_handshake( + stream: S, + session_id: SessionId, + push_table: SessionTxTable, + last_matches: LastMatches, + config: ServerConfig, +) -> std::io::Result<()> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + // Canal por donde el server inyecta frames push (MatchEvent, etc.). + let (tx, mut rx) = mpsc::channel::(PUSH_CHANNEL_CAPACITY); + push_table.lock().await.insert(session_id, tx); + + // Tras registrar el canal, recomputar matches y emitir diffs. + broadcast_match_diffs(&push_table, &last_matches, &config).await; + + // Split: reader en el hilo principal, writer compartido bajo Mutex + // entre la writer task (push channel) y el handler de inbound + // (que escribe Pong/Error). Mutex serializa writes; es OK porque + // la frecuencia de writes por sesión es baja. + let (mut reader, writer) = split(stream); + let writer = Arc::new(Mutex::new(writer)); + + // Writer task: drena el push channel. + let writer_for_push = writer.clone(); + let writer_task = tokio::spawn(async move { + while let Some(frame) = rx.recv().await { + let mut w = writer_for_push.lock().await; + if write_frame(&mut *w, &frame).await.is_err() { + break; } } - } + }); - /// Maneja un frame entrante. Devuelve `Ok(false)` si la sesión debe - /// cerrarse limpiamente (Farewell con session-id correcto). - async fn handle_inbound_frame( - &mut self, - session_id: SessionId, - frame: Frame, - ) -> std::io::Result { - match frame { - Frame::Ping(Ping { session }) if session == session_id => { - let pong = Pong { - timestamp_ms: now_ms(), - }; - write_frame(&mut self.stream, &Frame::Pong(pong)).await?; - Ok(true) - } - Frame::Ping(_) => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Unauthorized( - "session-id no coincide".into(), - )), - ) - .await?; - Ok(true) - } - Frame::Farewell(Farewell { session }) if session == session_id => Ok(false), - Frame::Farewell(_) => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Unauthorized( - "session-id no coincide".into(), - )), - ) - .await?; - Ok(true) - } - _ => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Rejected( - "frame inesperado tras handshake".into(), - )), - ) - .await?; - Ok(true) + // Reader loop principal. + let result: std::io::Result<()> = loop { + match read_frame(&mut reader).await { + Ok(frame) => match handle_inbound_frame(session_id, frame, &writer).await { + Ok(true) => continue, + Ok(false) => break Ok(()), + Err(e) => break Err(e), + }, + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + debug!(session = %session_id, "cliente cerró sin Farewell"); + break Ok(()); } + Err(e) => break Err(e), } - } + }; - /// Limpieza atómica de TRES vistas: registro de sesiones + broker + - /// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF, - /// o error. Tras desregistrar, emite diffs a las sesiones que perdieron - /// el match contra ésta. - async fn cleanup(&self, session_id: SessionId) { - self.sessions.lock().await.remove(&session_id); - self.push_table.lock().await.remove(&session_id); - self.last_matches.lock().await.remove(&session_id); - if let Some(broker) = &self.config.broker { - broker.lock().await.unregister(session_id); - } - self.broadcast_match_diffs().await; - } + // Cerrar writer: drop nuestro Arc y abortar la task. La task + // saldrá igual cuando rx se cierre por drop del último Sender, + // pero abortarla es más rápido que esperar a que próximo recv() + // observe el cierre. + drop(writer); + writer_task.abort(); + let _ = writer_task.await; - /// Recomputa los matches para todas las sesiones registradas y empuja - /// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron - /// respecto al último estado conocido. - /// - /// Se llama tras cada `register_session` y `cleanup`. Las inserciones - /// al canal usan `try_send` (non-blocking); si el cliente está lento - /// y se llena el buffer, los eventos extra se pierden — es ephemeral - /// y el cliente puede re-consultar el estado vía `brahman-status`. - async fn broadcast_match_diffs(&self) { - let broker = match &self.config.broker { - Some(b) => b, - None => return, - }; + result +} - let b = broker.lock().await; - let push_table = self.push_table.lock().await; - let mut last = self.last_matches.lock().await; - - debug!( - target: "brahman_handshake::broadcast", - cards = b.len(), - push_subscribers = push_table.len(), - "broadcast_match_diffs" - ); - - // Snapshot de cards para no tener que sostener el lock del broker. - let cards: Vec<_> = b.cards().cloned().collect(); - - for cons in &cards { - let cons_session = cons.session; - let tx = match push_table.get(&cons_session) { - Some(tx) => tx, - None => continue, // todavía no tiene canal push +async fn handle_inbound_frame( + session_id: SessionId, + frame: Frame, + writer: &Arc>>, +) -> std::io::Result +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + match frame { + Frame::Ping(Ping { session }) if session == session_id => { + let pong = Pong { + timestamp_ms: now_ms(), }; - let cons_last = last.entry(cons_session).or_default(); + let mut w = writer.lock().await; + write_frame(&mut *w, &Frame::Pong(pong)).await?; + Ok(true) + } + Frame::Ping(_) => { + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } + Frame::Farewell(Farewell { session }) if session == session_id => Ok(false), + Frame::Farewell(_) => { + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::Error(HandshakeError::Unauthorized( + "session-id no coincide".into(), + )), + ) + .await?; + Ok(true) + } + _ => { + let mut w = writer.lock().await; + write_frame( + &mut *w, + &Frame::Error(HandshakeError::Rejected( + "frame inesperado tras handshake".into(), + )), + ) + .await?; + Ok(true) + } + } +} - for input in &cons.inputs { - let new_match = b.find_producer_for(cons_session, &input.name); - let new_endpoint = new_match.as_ref().map(|m| m.producer.clone()); - let old_endpoint = cons_last.get(&input.name).cloned(); +/// Limpieza atómica de TRES vistas: registro de sesiones + broker + +/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF, +/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron +/// el match contra ésta. +async fn cleanup( + session_id: SessionId, + sessions: &SessionRegistry, + push_table: &SessionTxTable, + last_matches: &LastMatches, + config: &ServerConfig, +) { + sessions.lock().await.remove(&session_id); + push_table.lock().await.remove(&session_id); + last_matches.lock().await.remove(&session_id); + if let Some(broker) = &config.broker { + broker.lock().await.unregister(session_id); + } + broadcast_match_diffs(push_table, last_matches, config).await; +} - if old_endpoint == new_endpoint { - continue; - } +/// Recomputa los matches para todas las sesiones registradas y empuja +/// `MatchEvent::Available` / `MatchEvent::Lost` por las que cambiaron +/// respecto al último estado conocido. +async fn broadcast_match_diffs( + push_table: &SessionTxTable, + last_matches: &LastMatches, + config: &ServerConfig, +) { + let broker = match &config.broker { + Some(b) => b, + None => return, + }; - if let Some(m) = &new_match { - // Resolvemos el service_socket del productor desde - // la BrokeredCard; pasarlo en el evento permite al - // consumer conectar directo sin discovery extra. - let producer_service_socket = b - .cards() - .find(|c| c.session == m.producer.session) - .and_then(|c| c.service_socket.clone()); - let event = MatchEvent { - kind: MatchEventKind::Available, - consumer_flow: input.name.clone(), - producer_session: m.producer.session, - producer_label: m.producer_label.clone(), - producer_flow: m.producer.flow_name.clone(), - ty: m.ty.clone(), - via: m.via, - pinned: m.pinned, - producer_service_socket, - }; - let send_res = tx.try_send(Frame::MatchEvent(event)); - debug!( - target: "brahman_handshake::broadcast", - consumer = %cons_session, - flow = %input.name, - producer = %m.producer_label, - result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e { - tokio::sync::mpsc::error::TrySendError::Full(_) => "full", - tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed", - }), - "Available pushed" - ); - } else { - // Tenía match, ahora no. - let event = MatchEvent { - kind: MatchEventKind::Lost, - consumer_flow: input.name.clone(), - producer_session: Ulid::nil(), - producer_label: String::new(), - producer_flow: String::new(), - ty: input.ty.clone(), - via: brahman_broker::MatchStrategy::Exact, - pinned: false, - producer_service_socket: None, - }; - let _ = tx.try_send(Frame::MatchEvent(event)); - } + let b = broker.lock().await; + let push_table = push_table.lock().await; + let mut last = last_matches.lock().await; - if let Some(ep) = new_endpoint { - cons_last.insert(input.name.clone(), ep); - } else { - cons_last.remove(&input.name); - } + debug!( + target: "brahman_handshake::broadcast", + cards = b.len(), + push_subscribers = push_table.len(), + "broadcast_match_diffs" + ); + + let cards: Vec<_> = b.cards().cloned().collect(); + + for cons in &cards { + let cons_session = cons.session; + let tx = match push_table.get(&cons_session) { + Some(tx) => tx, + None => continue, + }; + let cons_last = last.entry(cons_session).or_default(); + + for input in &cons.inputs { + let new_match = b.find_producer_for(cons_session, &input.name); + let new_endpoint = new_match.as_ref().map(|m| m.producer.clone()); + let old_endpoint = cons_last.get(&input.name).cloned(); + + if old_endpoint == new_endpoint { + continue; + } + + if let Some(m) = &new_match { + let producer_service_socket = b + .cards() + .find(|c| c.session == m.producer.session) + .and_then(|c| c.service_socket.clone()); + let event = MatchEvent { + kind: MatchEventKind::Available, + consumer_flow: input.name.clone(), + producer_session: m.producer.session, + producer_label: m.producer_label.clone(), + producer_flow: m.producer.flow_name.clone(), + ty: m.ty.clone(), + via: m.via, + pinned: m.pinned, + producer_service_socket, + }; + let send_res = tx.try_send(Frame::MatchEvent(event)); + debug!( + target: "brahman_handshake::broadcast", + consumer = %cons_session, + flow = %input.name, + producer = %m.producer_label, + result = ?send_res.as_ref().map(|_| "ok").unwrap_or_else(|e| match e { + tokio::sync::mpsc::error::TrySendError::Full(_) => "full", + tokio::sync::mpsc::error::TrySendError::Closed(_) => "closed", + }), + "Available pushed" + ); + } else { + let event = MatchEvent { + kind: MatchEventKind::Lost, + consumer_flow: input.name.clone(), + producer_session: Ulid::nil(), + producer_label: String::new(), + producer_flow: String::new(), + ty: input.ty.clone(), + via: brahman_broker::MatchStrategy::Exact, + pinned: false, + producer_service_socket: None, + }; + let _ = tx.try_send(Frame::MatchEvent(event)); + } + + if let Some(ep) = new_endpoint { + cons_last.insert(input.name.clone(), ep); + } else { + cons_last.remove(&input.name); } } } +} - /// Lee el Hello, valida, registra la sesión y emite HelloAck. - /// Devuelve `Some(session_id)` si el handshake fue exitoso. - async fn do_handshake(&mut self) -> std::io::Result> { - let frame = read_frame(&mut self.stream).await?; - let hello = match frame { - Frame::Hello(h) => h, - _ => { - write_frame( - &mut self.stream, - &Frame::Error(HandshakeError::Rejected( - "primer frame debe ser Hello".into(), - )), - ) - .await?; - return Ok(None); - } - }; - - if let Some(err) = self.validate_hello(&hello) { - write_frame(&mut self.stream, &Frame::Error(err)).await?; +/// Lee el Hello, valida, registra la sesión y emite HelloAck. +async fn do_handshake( + stream: &mut S, + config: &ServerConfig, + sessions: &SessionRegistry, +) -> std::io::Result> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + let frame = read_frame(stream).await?; + let hello = match frame { + Frame::Hello(h) => h, + _ => { + write_frame( + stream, + &Frame::Error(HandshakeError::Rejected( + "primer frame debe ser Hello".into(), + )), + ) + .await?; return Ok(None); } + }; - let session_id = Ulid::new(); - // WireCard → Card: extensiones quedan vacías post-wire (es el contrato). - let card: Card = hello.card.into(); - self.register_session(session_id, card, hello.wit).await; - - let ack = HelloAck { - server_version: crate::HANDSHAKE_VERSION.to_string(), - protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), - session: session_id, - init_attached: self.config.init_attached, - }; - write_frame(&mut self.stream, &Frame::HelloAck(ack)).await?; - debug!(session = %session_id, "handshake completado"); - Ok(Some(session_id)) + if let Some(err) = validate_hello(&hello) { + write_frame(stream, &Frame::Error(err)).await?; + return Ok(None); } - /// Indexa la sesión: ResolvedCard en sessions + Card en broker (si hay). - /// Si `wit` está presente, el módulo se registra como "consciente". - async fn register_session( - &self, - session_id: SessionId, - card: Card, - wit: Option, - ) { - if let Some(broker) = &self.config.broker { - broker - .lock() - .await - .register(session_id, &card, wit.clone()); - } - let resolved = match wit { - Some(w) => ResolvedCard::from_conscious(card, w), - None => ResolvedCard::from_agnostic(card), - }; - self.sessions.lock().await.insert(session_id, resolved); - } + let session_id = Ulid::new(); + let card: Card = hello.card.into(); + register_session(session_id, card, hello.wit, config, sessions).await; - /// Validaciones que el servidor aplica al Hello del cliente. - fn validate_hello(&self, hello: &Hello) -> Option { - if hello.schema_version != CARD_SCHEMA_VERSION { - return Some(HandshakeError::SchemaMismatch { - client: hello.schema_version, - server: CARD_SCHEMA_VERSION, - }); - } - if hello.protocol_version != brahman_card::PROTOCOL_VERSION { - return Some(HandshakeError::ProtocolMismatch(format!( - "cliente={}, servidor={}", - hello.protocol_version, - brahman_card::PROTOCOL_VERSION - ))); - } - // Validamos contra Card (la rica) — convertir es barato y centraliza - // la lógica de validación en un solo lugar. - let as_card: Card = Card::from(hello.card.clone()); - if let Err(e) = as_card.validate() { - return Some(HandshakeError::InvalidCard(e.to_string())); - } - None + let ack = HelloAck { + server_version: crate::HANDSHAKE_VERSION.to_string(), + protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), + session: session_id, + init_attached: config.init_attached, + }; + write_frame(stream, &Frame::HelloAck(ack)).await?; + debug!(session = %session_id, "handshake completado"); + Ok(Some(session_id)) +} + +async fn register_session( + session_id: SessionId, + card: Card, + wit: Option, + config: &ServerConfig, + sessions: &SessionRegistry, +) { + if let Some(broker) = &config.broker { + broker + .lock() + .await + .register(session_id, &card, wit.clone()); } + let resolved = match wit { + Some(w) => ResolvedCard::from_conscious(card, w), + None => ResolvedCard::from_agnostic(card), + }; + sessions.lock().await.insert(session_id, resolved); +} + +fn validate_hello(hello: &Hello) -> Option { + if hello.schema_version != CARD_SCHEMA_VERSION { + return Some(HandshakeError::SchemaMismatch { + client: hello.schema_version, + server: CARD_SCHEMA_VERSION, + }); + } + if hello.protocol_version != brahman_card::PROTOCOL_VERSION { + return Some(HandshakeError::ProtocolMismatch(format!( + "cliente={}, servidor={}", + hello.protocol_version, + brahman_card::PROTOCOL_VERSION + ))); + } + let as_card: Card = Card::from(hello.card.clone()); + if let Err(e) = as_card.validate() { + return Some(HandshakeError::InvalidCard(e.to_string())); + } + None } fn now_ms() -> u64 { diff --git a/crates/core/brahman-handshake/tests/network_libp2p.rs b/crates/core/brahman-handshake/tests/network_libp2p.rs new file mode 100644 index 0000000..5b94812 --- /dev/null +++ b/crates/core/brahman-handshake/tests/network_libp2p.rs @@ -0,0 +1,117 @@ +//! Test E2E: handshake brahman remoto sobre libp2p stream. +//! +//! Pipeline: +//! 1. Server: bind Unix socket (necesario aunque no lo use el cliente); +//! crear `BrahmanNet` y escuchar en `/ip4/127.0.0.1/tcp/0`; +//! montar `run_libp2p_accept_loop`. +//! 2. Client: crear su propio `BrahmanNet`; dial al multiaddr del +//! server; `connect_libp2p` con su Card; `ping`; `farewell`. +//! 3. Verificar: el server registró la sesión; sessions.len() == 1 +//! durante la sesión, == 0 después del farewell. + +use std::collections::BTreeSet; +use std::sync::Arc; +use std::time::Duration; + +use brahman_broker::{Broker, BrokerConfig}; +use brahman_card::{ + ulid::Ulid, Card, CardKind, Lifecycle, Payload, Priority, Supervision, + CARD_SCHEMA_VERSION, +}; +use brahman_handshake::network::{connect_libp2p, run_libp2p_accept_loop}; +use brahman_handshake::server::{Server, ServerConfig}; +use brahman_net::{BrahmanNet, Multiaddr, PeerId, Protocol}; +use tempfile::TempDir; +use tokio::sync::Mutex; + +fn sample_card(label: &str) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + permissions: Default::default(), + soma: Default::default(), + payload: Payload::Virtual, + supervision: Supervision::OneShot, + lifecycle: Lifecycle::default(), + priority: Priority::default(), + kind: CardKind::Ente, + ..Default::default() + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn libp2p_handshake_roundtrip() { + // ---- Server side ---- + let tmp = TempDir::new().unwrap(); + let unix_socket = tmp.path().join("brahman-init.sock"); + + let broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let server = Arc::new( + Server::bind( + &unix_socket, + ServerConfig { + init_attached: true, + broker: Some(broker.clone()), + }, + ) + .unwrap(), + ); + let sessions = server.sessions(); + + let server_net = Arc::new(BrahmanNet::new().unwrap()); + let server_peer_id = server_net.peer_id; + + // Listen on a random TCP port. + let listen_addr: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + let actual_addr = server_net.listen(listen_addr).await; + // Inject the libp2p PeerId into the multiaddr so the client knows + // who to dial. + let mut full_addr = actual_addr.clone(); + full_addr.push(Protocol::P2p(server_peer_id)); + + // Spawn the libp2p accept loop. + tokio::spawn(run_libp2p_accept_loop(server.clone(), server_net.clone())); + + // ---- Client side ---- + let client_net = BrahmanNet::new().unwrap(); + client_net.dial(full_addr.clone()); + + // Pequeña espera para que el dial conecte. En un entorno real el + // caller usaría un mecanismo de barrier, pero para tests un sleep + // corto es suficiente y deterministic en localhost. + tokio::time::sleep(Duration::from_millis(200)).await; + + let card = sample_card("test.remote_ente"); + let mut client = connect_libp2p(&client_net, server_peer_id, card, None) + .await + .expect("handshake remoto debería completar"); + + // Verificación: el server vio la sesión. + { + let s = sessions.lock().await; + assert_eq!(s.len(), 1, "una sesión registrada"); + let resolved = s.values().next().unwrap(); + assert_eq!(resolved.card.label, "test.remote_ente"); + } + + // Ping roundtrip. + let ts = client.ping().await.expect("ping debería responder"); + assert!(ts > 0, "timestamp del Pong > 0"); + + // Farewell limpio. + client.farewell().await.expect("farewell debería completar"); + + // Tras el farewell, el cleanup remueve la sesión. + // Damos un tick para que el handler procese el frame. + tokio::time::sleep(Duration::from_millis(100)).await; + { + let s = sessions.lock().await; + assert_eq!(s.len(), 0, "sesión removida tras farewell"); + } + + // peer_id no usado aquí, pero validamos que la API existe. + let _ = PeerId::random(); +} diff --git a/crates/shared/brahman-net/src/lib.rs b/crates/shared/brahman-net/src/lib.rs index f8b5a98..0d38819 100644 --- a/crates/shared/brahman-net/src/lib.rs +++ b/crates/shared/brahman-net/src/lib.rs @@ -48,12 +48,13 @@ use futures::StreamExt; use libp2p::{ identify, identity, kad, noise, swarm::{NetworkBehaviour, SwarmEvent}, - tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, + tcp, yamux, Swarm, SwarmBuilder, }; use libp2p_stream as stream; use tokio::sync::{mpsc, oneshot, Mutex}; -pub use libp2p::{Stream, StreamProtocol}; +pub use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol}; +pub use libp2p_stream::OpenStreamError; const IDENTIFY_PROTOCOL: &str = "/brahman-net/0.1.0"; const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);