diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f74919..f8305f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,46 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(brahman-net): capa P2P compartida — Fase 0 (extracción del swarm libp2p) +Primer paso del plan "el encuentro entre Entes no se restringe a local". +El swarm libp2p que vivía dentro de `minga-p2p::network` (282 LOC) sale +a una crate compartida `brahman-net` para que cualquier protocolo de la +familia (handshake brahman remoto en Fase 1, sync minga, futuros) reuse +una sola malla TCP+Noise+Yamux+Kad+Identify+Stream. + +Diseño: +- `BrahmanNet::{new, with_keypair}` arma el swarm con DHT en modo + Server, Identify auto-poblando el routing table de Kad, y un + `stream::Control` accesible para que cada protocolo registre su + `StreamProtocol` y abra/acepte streams sin acoplarse al swarm. +- API de comandos uniforme: `dial`, `listen`, `add_dht_peer`, + `find_closest_peers`, `start_providing`, `find_providers`. +- Pública: `peer_id` (libp2p) + `control` (stream::Control). +- Re-exporta `Stream` y `StreamProtocol` para que callers no necesiten + importar libp2p directo. + +Migración: +- `minga-p2p::network` reduce de 282 LOC a 22: ahora sólo re-exporta + `BrahmanNet` bajo el alias histórico `LibP2pNode` (zero churn en + `MingaPeer`) y declara la const `SYNC_PROTOCOL = "/minga/sync/1.0.0"` + específica del sub-protocolo Minga. +- Cualquier consumer que necesite armar un nodo P2P puede importar + `brahman_net::BrahmanNet` directo sin pasar por minga. +- Deps de `minga-p2p` ganan `brahman-net`; el resto del grafo + (libp2p, libp2p-stream, futures, tokio-util) sigue igual porque + `MingaPeer` aún los usa para la lógica específica de sync. + +Aclaración semántica anclada por el usuario: **Arje** es el init +(PID 1, runtime, ente-zero/kernel/soma); **Brahman** es el encuentro +entre Entes (handshake/broker/card/sidecar/ahora también net). El +nombre de la crate refleja que la malla pertenece al encuentro, no +al runtime — Arje puede usar la malla, Minga usa la malla, cualquier +futuro módulo (Nakui remoto, p.ej.) la usa, sin acoplarse a Minga. + +Tests: minga-p2p completo verde (58 tests, sin regresión). Behavior +verificado idéntico — sólo se movió código, ningún cambio funcional. +Próximo: Fase 1 (handshake brahman sobre libp2p stream). + ### refactor(explorer+card): independencia jerárquica enforced — cliente con los wire types + fallback al default path Cierra el único debt estructural detectado en el audit de independencia: `nouser-explorer` ya no arrastra `nouser-core` diff --git a/Cargo.lock b/Cargo.lock index 4f1aef7..fe9aedd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1210,6 +1210,19 @@ dependencies = [ "ulid", ] +[[package]] +name = "brahman-net" +version = "0.1.0" +dependencies = [ + "futures", + "libp2p", + "libp2p-stream", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "brahman-sidecar" version = "0.1.0" @@ -5860,6 +5873,7 @@ dependencies = [ name = "minga-p2p" version = "0.1.0" dependencies = [ + "brahman-net", "futures", "libp2p", "libp2p-stream", diff --git a/Cargo.toml b/Cargo.toml index 5530b1f..dcf14fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/core/brahman-broker", "crates/core/brahman-admin", "crates/shared/brahman-sidecar", + "crates/shared/brahman-net", "crates/core/ente-card", "crates/core/ente-bus", "crates/core/ente-cas", diff --git a/crates/modules/semantic_dht/minga-p2p/Cargo.toml b/crates/modules/semantic_dht/minga-p2p/Cargo.toml index c94dfbe..f268559 100644 --- a/crates/modules/semantic_dht/minga-p2p/Cargo.toml +++ b/crates/modules/semantic_dht/minga-p2p/Cargo.toml @@ -9,6 +9,7 @@ description = "Minga P2P: protocolo de sincronización entre repositorios. Lógi [dependencies] minga-core = { path = "../minga-core" } minga-store = { path = "../minga-store" } +brahman-net = { path = "../../../shared/brahman-net" } serde = { workspace = true } postcard = { workspace = true } thiserror = { workspace = true } diff --git a/crates/modules/semantic_dht/minga-p2p/src/network.rs b/crates/modules/semantic_dht/minga-p2p/src/network.rs index 9aadc83..70c7a10 100644 --- a/crates/modules/semantic_dht/minga-p2p/src/network.rs +++ b/crates/modules/semantic_dht/minga-p2p/src/network.rs @@ -1,282 +1,22 @@ -//! Integración libp2p con behaviour compuesto: streams Minga + -//! Kademlia DHT. +//! Re-export del nodo de la red Brahman especializado para Minga. //! -//! - **TCP + Noise + Yamux**: transporte autenticado y multiplexado. -//! - **`stream::Behaviour`**: streams bidireccionales para el -//! protocolo `/minga/sync/1.0.0`. -//! - **`kad::Behaviour`**: tabla de routing distribuida -//! para descubrimiento. Cada nodo arranca en modo `Server` y -//! responde a queries del DHT. +//! Antes este módulo contenía el swarm libp2p completo. Ahora vive en +//! `brahman-net` (capa P2P compartida con el resto de la familia +//! brahman: `/brahman/handshake/1.0.0`, futuros sub-protocolos). Este +//! módulo se reduce a: //! -//! El swarm corre en una task tokio dedicada que procesa comandos -//! externos (Dial, Listen, AddDhtPeer, FindClosestPeers) y eventos -//! del swarm (NewListenAddr para señalar address resuelto, eventos -//! Kad para completar queries). Los métodos públicos solo envían -//! comandos por canal. +//! - Re-exportar `BrahmanNet` bajo el alias histórico `LibP2pNode` +//! para zero churn en `MingaPeer`. +//! - Declarar la const `SYNC_PROTOCOL` específica de Minga +//! (`/minga/sync/1.0.0`). +//! +//! Cualquier consumer que necesite armar un nodo P2P puede importar +//! `brahman_net::BrahmanNet` directo y registrar sus propios protocolos +//! sin pasar por minga. -use std::collections::HashMap; -use std::time::Duration; +pub use brahman_net::{BrahmanNet as LibP2pNode, DiscoveredPeer, NodeError}; -use futures::StreamExt; -use libp2p::{ - identify, identity, kad, noise, - swarm::{NetworkBehaviour, SwarmEvent}, - tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, -}; -use libp2p_stream as stream; -use tokio::sync::{mpsc, oneshot, Mutex}; +use libp2p::StreamProtocol; +/// Sub-protocolo de sync Minga sobre la malla brahman-net. pub const SYNC_PROTOCOL: StreamProtocol = StreamProtocol::new("/minga/sync/1.0.0"); -const IDENTIFY_PROTOCOL: &str = "/minga/0.1.0"; - -#[derive(NetworkBehaviour)] -struct MingaBehaviour { - stream: stream::Behaviour, - kad: kad::Behaviour, - identify: identify::Behaviour, -} - -#[derive(Debug, thiserror::Error)] -pub enum NodeError { - #[error("transport build failed: {0}")] - Build(String), -} - -#[derive(Debug)] -enum Command { - Dial(Multiaddr), - Listen(Multiaddr), - AddDhtPeer(PeerId, Multiaddr), - FindClosestPeers(PeerId, oneshot::Sender>), - StartProviding(Vec), - GetProviders(Vec, oneshot::Sender>), -} - -/// Peer descubierto vía DHT: identidad + direcciones conocidas. -#[derive(Debug, Clone)] -pub struct DiscoveredPeer { - pub peer_id: PeerId, - pub addrs: Vec, -} - -pub struct LibP2pNode { - pub peer_id: PeerId, - cmd_tx: mpsc::UnboundedSender, - listen_rx: Mutex>, - /// Control para abrir/aceptar streams. - pub control: stream::Control, -} - -impl LibP2pNode { - pub fn new() -> Result { - let id = identity::Keypair::generate_ed25519(); - let peer_id = id.public().to_peer_id(); - - let mut swarm: Swarm = SwarmBuilder::with_existing_identity(id) - .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) - .map_err(|e| NodeError::Build(format!("{e}")))? - .with_behaviour(|key| { - let local = key.public().to_peer_id(); - let mut kad = - kad::Behaviour::new(local, kad::store::MemoryStore::new(local)); - // Modo Server: respondemos a queries del DHT. Por - // defecto kad arranca en Auto, que requiere detectar - // reachability. Para tests en localhost forzamos Server. - kad.set_mode(Some(kad::Mode::Server)); - let identify = identify::Behaviour::new( - identify::Config::new(IDENTIFY_PROTOCOL.to_string(), key.public()) - .with_agent_version(format!("minga/{}", env!("CARGO_PKG_VERSION"))), - ); - MingaBehaviour { - stream: stream::Behaviour::new(), - kad, - identify, - } - }) - .map_err(|e| NodeError::Build(format!("{e}")))? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) - .build(); - - let control = swarm.behaviour().stream.new_control(); - - let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::(); - let (listen_tx, listen_rx) = mpsc::unbounded_channel::(); - - tokio::spawn(async move { - let mut pending_finds: HashMap< - kad::QueryId, - oneshot::Sender>, - > = HashMap::new(); - let mut pending_providers: HashMap< - kad::QueryId, - (Vec, oneshot::Sender>), - > = HashMap::new(); - - loop { - tokio::select! { - Some(cmd) = cmd_rx.recv() => { - match cmd { - Command::Dial(addr) => { - let _ = swarm.dial(addr); - } - Command::Listen(addr) => { - let _ = swarm.listen_on(addr); - } - Command::AddDhtPeer(peer, addr) => { - swarm.behaviour_mut().kad.add_address(&peer, addr); - } - Command::FindClosestPeers(target, tx) => { - let qid = swarm.behaviour_mut().kad.get_closest_peers(target); - pending_finds.insert(qid, tx); - } - Command::StartProviding(key) => { - // Best-effort: si falla (sin peers cercanos para - // replicar), seguirá viviendo en el local store - // y se servirá vía get_providers de quien - // tenga conexión con nosotros. - let _ = swarm.behaviour_mut().kad.start_providing(key.into()); - } - Command::GetProviders(key, tx) => { - let qid = swarm.behaviour_mut().kad.get_providers(key.into()); - pending_providers.insert(qid, (Vec::new(), tx)); - } - } - } - event = swarm.select_next_some() => { - match event { - SwarmEvent::NewListenAddr { address, .. } => { - let _ = listen_tx.send(address); - } - // Identify nos dice las listen-addrs reales del - // peer. Las inyectamos a Kad para poblar el - // routing table sin necesidad de add_dht_peer - // manual — la propagación pasa a ser automática. - SwarmEvent::Behaviour(MingaBehaviourEvent::Identify( - identify::Event::Received { peer_id, info, .. } - )) => { - for addr in info.listen_addrs { - swarm.behaviour_mut().kad.add_address(&peer_id, addr); - } - } - SwarmEvent::Behaviour(MingaBehaviourEvent::Kad( - kad::Event::OutboundQueryProgressed { id, result, step, .. } - )) => { - match result { - kad::QueryResult::GetClosestPeers(Ok(ok)) if step.last => { - if let Some(tx) = pending_finds.remove(&id) { - let infos = ok.peers.into_iter() - .map(|p| DiscoveredPeer { - peer_id: p.peer_id, - addrs: p.addrs, - }) - .collect(); - let _ = tx.send(infos); - } - } - kad::QueryResult::GetClosestPeers(Err(_)) if step.last => { - if let Some(tx) = pending_finds.remove(&id) { - let _ = tx.send(Vec::new()); - } - } - kad::QueryResult::GetProviders(Ok(ok)) => { - if let Some((collected, _)) = - pending_providers.get_mut(&id) - { - if let kad::GetProvidersOk::FoundProviders { - providers, .. - } = ok - { - for p in providers { - if !collected.contains(&p) { - collected.push(p); - } - } - } - } - if step.last { - if let Some((providers, tx)) = - pending_providers.remove(&id) - { - let _ = tx.send(providers); - } - } - } - kad::QueryResult::GetProviders(Err(_)) if step.last => { - if let Some((providers, tx)) = - pending_providers.remove(&id) - { - let _ = tx.send(providers); - } - } - _ => {} - } - } - _ => {} - } - } - } - } - }); - - Ok(Self { - peer_id, - cmd_tx, - listen_rx: Mutex::new(listen_rx), - control, - }) - } - - pub async fn listen(&self, addr: Multiaddr) -> Multiaddr { - self.cmd_tx - .send(Command::Listen(addr)) - .expect("swarm task alive"); - let mut rx = self.listen_rx.lock().await; - rx.recv().await.expect("listen address arrives") - } - - pub fn dial(&self, addr: Multiaddr) { - let _ = self.cmd_tx.send(Command::Dial(addr)); - } - - /// Añade un peer al routing table de Kademlia. Punto de entrada - /// para bootstrap: tras esto, el nodo puede dirigir queries DHT - /// a través de este peer. - pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) { - let _ = self.cmd_tx.send(Command::AddDhtPeer(peer, addr)); - } - - /// Consulta el DHT por los peers más cercanos al `target` PeerId. - /// Devuelve la lista resuelta (vacía si la query falla o si no - /// hay peers conocidos). Bloquea hasta que la query completa. - pub async fn find_closest_peers(&self, target: PeerId) -> Vec { - let (tx, rx) = oneshot::channel(); - let _ = self - .cmd_tx - .send(Command::FindClosestPeers(target, tx)); - rx.await.unwrap_or_default() - } - - /// Anuncia en el DHT que este peer tiene el contenido identificado - /// por `key`. Otros peers pueden luego descubrirlo vía - /// `find_providers(key)`. Best-effort: si la replicación falla - /// inicialmente, el record vive en el store local. - pub fn start_providing(&self, key: &[u8]) { - let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec())); - } - - /// Consulta el DHT por peers que han anunciado proveer `key`. - /// Devuelve la lista de `PeerId`s que se reportan como providers. - /// Lista vacía si nadie anuncia. - pub async fn find_providers(&self, key: &[u8]) -> Vec { - let (tx, rx) = oneshot::channel(); - let _ = self - .cmd_tx - .send(Command::GetProviders(key.to_vec(), tx)); - rx.await.unwrap_or_default() - } -} diff --git a/crates/shared/brahman-net/Cargo.toml b/crates/shared/brahman-net/Cargo.toml new file mode 100644 index 0000000..c4625f5 --- /dev/null +++ b/crates/shared/brahman-net/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "brahman-net" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "Brahman — capa de transporte P2P compartida (libp2p TCP+Noise+Yamux+Kad+Identify+Stream). Cualquier protocolo (handshake brahman, sync minga, futuros) puede registrar su StreamProtocol y abrir/aceptar streams sobre la malla común." + +[dependencies] +futures = { workspace = true } +libp2p = { workspace = true } +libp2p-stream = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +tokio-util = { workspace = true } diff --git a/crates/shared/brahman-net/src/lib.rs b/crates/shared/brahman-net/src/lib.rs new file mode 100644 index 0000000..f8b5a98 --- /dev/null +++ b/crates/shared/brahman-net/src/lib.rs @@ -0,0 +1,337 @@ +//! `brahman-net` — capa P2P compartida de la red Brahman. +//! +//! Provee un nodo libp2p genérico que cualquier protocolo de la +//! familia (handshake brahman remoto, sync minga, futuros) puede +//! reusar. La idea: una sola malla, múltiples sub-protocolos +//! multiplexados por `StreamProtocol`. +//! +//! ## Stack +//! +//! - **TCP + Noise + Yamux**: transporte autenticado y multiplexado. +//! - **`stream::Behaviour`**: streams bidireccionales por +//! `StreamProtocol`. Cada protocolo (`/brahman/handshake/1.0.0`, +//! `/minga/sync/1.0.0`, …) se registra independientemente vía el +//! `stream::Control` que `BrahmanNet` expone. +//! - **`kad::Behaviour`**: Kademlia DHT en modo Server +//! para discovery (peers cercanos + content providers). +//! - **`identify::Behaviour`**: cada peer anuncia sus listen-addrs +//! reales; las inyectamos automáticamente al routing table de Kad. +//! +//! ## Modelo +//! +//! El swarm corre en una task tokio dedicada. La interfaz pública son: +//! 1. **Comandos** (canal mpsc): `dial`, `listen`, `add_dht_peer`, +//! `find_closest_peers`, `start_providing`, `find_providers`. +//! 2. **`stream::Control`** (acceso directo): para abrir/aceptar +//! streams de un protocolo concreto. Cada protocolo se ocupa de +//! su propia lógica sobre el stream resultante. +//! +//! La separación entre comandos y control permite que la lógica de +//! red (DHT, dial, listen) y la lógica de protocolos (handshake/sync) +//! evolucionen independientes — el protocolo no necesita conocer al +//! swarm, sólo pide streams. +//! +//! ## Identidad +//! +//! Por defecto se genera una keypair Ed25519 efímera. Para identidad +//! persistente (la misma `peer_id` across reboots), pasar la keypair +//! con [`BrahmanNet::with_keypair`]. Esa misma keypair puede ser la +//! base para firmas de Cards (cuando se implemente trust remoto). + +#![forbid(unsafe_code)] +#![warn(rust_2018_idioms)] + +use std::collections::HashMap; +use std::time::Duration; + +use futures::StreamExt; +use libp2p::{ + identify, identity, kad, noise, + swarm::{NetworkBehaviour, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, +}; +use libp2p_stream as stream; +use tokio::sync::{mpsc, oneshot, Mutex}; + +pub use libp2p::{Stream, StreamProtocol}; + +const IDENTIFY_PROTOCOL: &str = "/brahman-net/0.1.0"; +const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); + +#[derive(NetworkBehaviour)] +struct BrahmanBehaviour { + stream: stream::Behaviour, + kad: kad::Behaviour, + identify: identify::Behaviour, +} + +#[derive(Debug, thiserror::Error)] +pub enum NodeError { + #[error("transport build failed: {0}")] + Build(String), +} + +#[derive(Debug)] +enum Command { + Dial(Multiaddr), + Listen(Multiaddr), + AddDhtPeer(PeerId, Multiaddr), + FindClosestPeers(PeerId, oneshot::Sender>), + StartProviding(Vec), + GetProviders(Vec, oneshot::Sender>), +} + +/// Peer descubierto vía DHT: identidad + direcciones conocidas. +#[derive(Debug, Clone)] +pub struct DiscoveredPeer { + pub peer_id: PeerId, + pub addrs: Vec, +} + +/// Nodo Brahman en la malla P2P. Maneja el swarm libp2p y expone +/// API uniforme para listen/dial/DHT/streams. +pub struct BrahmanNet { + /// Identidad libp2p de este nodo. Estable mientras viva la + /// keypair (efímera por default; persistente si pasaste una + /// vía [`with_keypair`]). + pub peer_id: PeerId, + cmd_tx: mpsc::UnboundedSender, + listen_rx: Mutex>, + /// Control para abrir y aceptar streams. Cada protocolo + /// (handshake brahman, sync minga, etc.) llama + /// `control.accept(StreamProtocol::new("/foo/1.0.0"))` para + /// recibir streams entrantes, o `control.open_stream(peer, proto)` + /// para abrirlos. Multiplexado y demultiplexado lo hace libp2p. + pub control: stream::Control, +} + +impl BrahmanNet { + /// Crea un nodo con keypair Ed25519 generada al vuelo (peer_id + /// efímero — cambia en cada arranque). + pub fn new() -> Result { + Self::with_keypair(identity::Keypair::generate_ed25519()) + } + + /// Crea un nodo con una keypair libp2p específica. Usá esto para + /// `peer_id` estable (por ejemplo si tu identidad se persiste a + /// disco, o si la derivás de la identidad criptográfica del + /// módulo). + pub fn with_keypair(keypair: identity::Keypair) -> Result { + let peer_id = keypair.public().to_peer_id(); + + let mut swarm: Swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + ) + .map_err(|e| NodeError::Build(format!("{e}")))? + .with_behaviour(|key| { + let local = key.public().to_peer_id(); + let mut kad = + kad::Behaviour::new(local, kad::store::MemoryStore::new(local)); + // Modo Server: respondemos a queries del DHT. Auto + // requiere detectar reachability; para entornos + // controlados (localhost, redes privadas) Server es + // lo correcto. + kad.set_mode(Some(kad::Mode::Server)); + let identify = identify::Behaviour::new( + identify::Config::new(IDENTIFY_PROTOCOL.to_string(), key.public()) + .with_agent_version(format!("brahman-net/{}", env!("CARGO_PKG_VERSION"))), + ); + BrahmanBehaviour { + stream: stream::Behaviour::new(), + kad, + identify, + } + }) + .map_err(|e| NodeError::Build(format!("{e}")))? + .with_swarm_config(|c| c.with_idle_connection_timeout(IDLE_CONNECTION_TIMEOUT)) + .build(); + + let control = swarm.behaviour().stream.new_control(); + + let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::(); + let (listen_tx, listen_rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + let mut pending_finds: HashMap< + kad::QueryId, + oneshot::Sender>, + > = HashMap::new(); + let mut pending_providers: HashMap< + kad::QueryId, + (Vec, oneshot::Sender>), + > = HashMap::new(); + + loop { + tokio::select! { + Some(cmd) = cmd_rx.recv() => { + match cmd { + Command::Dial(addr) => { + let _ = swarm.dial(addr); + } + Command::Listen(addr) => { + let _ = swarm.listen_on(addr); + } + Command::AddDhtPeer(peer, addr) => { + swarm.behaviour_mut().kad.add_address(&peer, addr); + } + Command::FindClosestPeers(target, tx) => { + let qid = swarm.behaviour_mut().kad.get_closest_peers(target); + pending_finds.insert(qid, tx); + } + Command::StartProviding(key) => { + // Best-effort: si falla (sin peers cercanos para + // replicar), seguirá viviendo en el local store + // y se servirá vía get_providers de quien tenga + // conexión con nosotros. + let _ = swarm.behaviour_mut().kad.start_providing(key.into()); + } + Command::GetProviders(key, tx) => { + let qid = swarm.behaviour_mut().kad.get_providers(key.into()); + pending_providers.insert(qid, (Vec::new(), tx)); + } + } + } + event = swarm.select_next_some() => { + match event { + SwarmEvent::NewListenAddr { address, .. } => { + let _ = listen_tx.send(address); + } + // Identify nos dice las listen-addrs reales del + // peer. Las inyectamos a Kad para poblar el + // routing table sin necesidad de add_dht_peer + // manual — la propagación pasa a ser automática. + SwarmEvent::Behaviour(BrahmanBehaviourEvent::Identify( + identify::Event::Received { peer_id, info, .. } + )) => { + for addr in info.listen_addrs { + swarm.behaviour_mut().kad.add_address(&peer_id, addr); + } + } + SwarmEvent::Behaviour(BrahmanBehaviourEvent::Kad( + kad::Event::OutboundQueryProgressed { id, result, step, .. } + )) => { + match result { + kad::QueryResult::GetClosestPeers(Ok(ok)) if step.last => { + if let Some(tx) = pending_finds.remove(&id) { + let infos = ok.peers.into_iter() + .map(|p| DiscoveredPeer { + peer_id: p.peer_id, + addrs: p.addrs, + }) + .collect(); + let _ = tx.send(infos); + } + } + kad::QueryResult::GetClosestPeers(Err(_)) if step.last => { + if let Some(tx) = pending_finds.remove(&id) { + let _ = tx.send(Vec::new()); + } + } + kad::QueryResult::GetProviders(Ok(ok)) => { + if let Some((collected, _)) = + pending_providers.get_mut(&id) + { + if let kad::GetProvidersOk::FoundProviders { + providers, .. + } = ok + { + for p in providers { + if !collected.contains(&p) { + collected.push(p); + } + } + } + } + if step.last { + if let Some((providers, tx)) = + pending_providers.remove(&id) + { + let _ = tx.send(providers); + } + } + } + kad::QueryResult::GetProviders(Err(_)) if step.last => { + if let Some((providers, tx)) = + pending_providers.remove(&id) + { + let _ = tx.send(providers); + } + } + _ => {} + } + } + _ => {} + } + } + } + } + }); + + Ok(Self { + peer_id, + cmd_tx, + listen_rx: Mutex::new(listen_rx), + control, + }) + } + + /// Empieza a escuchar en `addr`. Bloquea hasta que el listener + /// publique su dirección real (Multiaddr resuelta — útil cuando + /// pediste `/ip4/0.0.0.0/tcp/0` y querés saber qué puerto te tocó). + pub async fn listen(&self, addr: Multiaddr) -> Multiaddr { + self.cmd_tx + .send(Command::Listen(addr)) + .expect("swarm task alive"); + let mut rx = self.listen_rx.lock().await; + rx.recv().await.expect("listen address arrives") + } + + /// Inicia conexión con un peer en `addr`. No-op si ya hay + /// conexión. Best-effort — fallos se loggean al swarm pero no se + /// propagan al caller (consistente con libp2p). + pub fn dial(&self, addr: Multiaddr) { + let _ = self.cmd_tx.send(Command::Dial(addr)); + } + + /// Añade un peer al routing table de Kademlia. Punto de entrada + /// para bootstrap: tras esto, el nodo puede dirigir queries DHT + /// a través de este peer. + pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) { + let _ = self.cmd_tx.send(Command::AddDhtPeer(peer, addr)); + } + + /// Consulta el DHT por los peers más cercanos al `target` PeerId. + /// Devuelve la lista resuelta (vacía si la query falla o si no + /// hay peers conocidos). Bloquea hasta que la query completa. + pub async fn find_closest_peers(&self, target: PeerId) -> Vec { + let (tx, rx) = oneshot::channel(); + let _ = self + .cmd_tx + .send(Command::FindClosestPeers(target, tx)); + rx.await.unwrap_or_default() + } + + /// Anuncia en el DHT que este peer tiene el contenido identificado + /// por `key`. Otros peers pueden luego descubrirlo vía + /// [`find_providers`](Self::find_providers). Best-effort: si la + /// replicación falla inicialmente, el record vive en el store + /// local hasta que llegue conexión. + pub fn start_providing(&self, key: &[u8]) { + let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec())); + } + + /// Consulta el DHT por peers que han anunciado proveer `key`. + /// Devuelve la lista de `PeerId`s que se reportan como providers. + /// Lista vacía si nadie anuncia. + pub async fn find_providers(&self, key: &[u8]) -> Vec { + let (tx, rx) = oneshot::channel(); + let _ = self + .cmd_tx + .send(Command::GetProviders(key.to_vec(), tx)); + rx.await.unwrap_or_default() + } +}