feat(brahman-net): capa P2P compartida — Fase 0 (extracción del swarm)

Primer paso del plan "el encuentro entre Entes no se restringe a local".
El swarm libp2p que vivía dentro de minga-p2p::network (282 LOC) sale
a una crate compartida brahman-net para que cualquier protocolo de la
familia (handshake brahman remoto en Fase 1, sync minga, futuros) reuse
una sola malla TCP+Noise+Yamux+Kad+Identify+Stream.

BrahmanNet expone:
- new() / with_keypair() para identidad efimera o persistente
- API de comandos uniforme: dial, listen, add_dht_peer,
  find_closest_peers, start_providing, find_providers
- Publica peer_id (libp2p) y control (stream::Control) — cada
  protocolo registra su StreamProtocol sin acoplarse al swarm
- Re-exporta Stream y StreamProtocol para evitar dep directa a libp2p

minga-p2p::network reduce de 282 LOC a 22: re-export del nuevo
BrahmanNet bajo el alias historico LibP2pNode (zero churn en
MingaPeer) y la const SYNC_PROTOCOL = "/minga/sync/1.0.0" especifica
del sub-protocolo de sync Minga.

Aclaracion semantica anclada por el usuario: Arje es el init (PID 1),
Brahman es el encuentro entre Entes. El nombre brahman-net refleja
que la malla pertenece al encuentro, no al runtime — Minga es un
cliente de la malla, no su dueño.

Tests: minga-p2p completo verde (58 tests, sin regresion). Behavior
identico — solo se movio codigo, ningun cambio funcional.
This commit is contained in:
Sergio
2026-05-09 12:29:16 +00:00
parent 6f993f4268
commit ad0d475a2e
7 changed files with 430 additions and 276 deletions
+40
View File
@@ -6,6 +6,46 @@ ratio/diff ver `git show <sha>`.
## 2026-05-09
### feat(brahman-net): capa P2P compartida — Fase 0 (extracción del swarm libp2p)
Primer paso del plan "el encuentro entre Entes no se restringe a local".
El swarm libp2p que vivía dentro de `minga-p2p::network` (282 LOC) sale
a una crate compartida `brahman-net` para que cualquier protocolo de la
familia (handshake brahman remoto en Fase 1, sync minga, futuros) reuse
una sola malla TCP+Noise+Yamux+Kad+Identify+Stream.
Diseño:
- `BrahmanNet::{new, with_keypair}` arma el swarm con DHT en modo
Server, Identify auto-poblando el routing table de Kad, y un
`stream::Control` accesible para que cada protocolo registre su
`StreamProtocol` y abra/acepte streams sin acoplarse al swarm.
- API de comandos uniforme: `dial`, `listen`, `add_dht_peer`,
`find_closest_peers`, `start_providing`, `find_providers`.
- Pública: `peer_id` (libp2p) + `control` (stream::Control).
- Re-exporta `Stream` y `StreamProtocol` para que callers no necesiten
importar libp2p directo.
Migración:
- `minga-p2p::network` reduce de 282 LOC a 22: ahora sólo re-exporta
`BrahmanNet` bajo el alias histórico `LibP2pNode` (zero churn en
`MingaPeer`) y declara la const `SYNC_PROTOCOL = "/minga/sync/1.0.0"`
específica del sub-protocolo Minga.
- Cualquier consumer que necesite armar un nodo P2P puede importar
`brahman_net::BrahmanNet` directo sin pasar por minga.
- Deps de `minga-p2p` ganan `brahman-net`; el resto del grafo
(libp2p, libp2p-stream, futures, tokio-util) sigue igual porque
`MingaPeer` aún los usa para la lógica específica de sync.
Aclaración semántica anclada por el usuario: **Arje** es el init
(PID 1, runtime, ente-zero/kernel/soma); **Brahman** es el encuentro
entre Entes (handshake/broker/card/sidecar/ahora también net). El
nombre de la crate refleja que la malla pertenece al encuentro, no
al runtime — Arje puede usar la malla, Minga usa la malla, cualquier
futuro módulo (Nakui remoto, p.ej.) la usa, sin acoplarse a Minga.
Tests: minga-p2p completo verde (58 tests, sin regresión). Behavior
verificado idéntico — sólo se movió código, ningún cambio funcional.
Próximo: Fase 1 (handshake brahman sobre libp2p stream).
### refactor(explorer+card): independencia jerárquica enforced — cliente con los wire types + fallback al default path
Cierra el único debt estructural detectado en el audit de
independencia: `nouser-explorer` ya no arrastra `nouser-core`
Generated
+14
View File
@@ -1210,6 +1210,19 @@ dependencies = [
"ulid",
]
[[package]]
name = "brahman-net"
version = "0.1.0"
dependencies = [
"futures",
"libp2p",
"libp2p-stream",
"thiserror 2.0.18",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "brahman-sidecar"
version = "0.1.0"
@@ -5860,6 +5873,7 @@ dependencies = [
name = "minga-p2p"
version = "0.1.0"
dependencies = [
"brahman-net",
"futures",
"libp2p",
"libp2p-stream",
+1
View File
@@ -10,6 +10,7 @@ members = [
"crates/core/brahman-broker",
"crates/core/brahman-admin",
"crates/shared/brahman-sidecar",
"crates/shared/brahman-net",
"crates/core/ente-card",
"crates/core/ente-bus",
"crates/core/ente-cas",
@@ -9,6 +9,7 @@ description = "Minga P2P: protocolo de sincronización entre repositorios. Lógi
[dependencies]
minga-core = { path = "../minga-core" }
minga-store = { path = "../minga-store" }
brahman-net = { path = "../../../shared/brahman-net" }
serde = { workspace = true }
postcard = { workspace = true }
thiserror = { workspace = true }
@@ -1,282 +1,22 @@
//! Integración libp2p con behaviour compuesto: streams Minga +
//! Kademlia DHT.
//! Re-export del nodo de la red Brahman especializado para Minga.
//!
//! - **TCP + Noise + Yamux**: transporte autenticado y multiplexado.
//! - **`stream::Behaviour`**: streams bidireccionales para el
//! protocolo `/minga/sync/1.0.0`.
//! - **`kad::Behaviour<MemoryStore>`**: tabla de routing distribuida
//! para descubrimiento. Cada nodo arranca en modo `Server` y
//! responde a queries del DHT.
//! Antes este módulo contenía el swarm libp2p completo. Ahora vive en
//! `brahman-net` (capa P2P compartida con el resto de la familia
//! brahman: `/brahman/handshake/1.0.0`, futuros sub-protocolos). Este
//! módulo se reduce a:
//!
//! El swarm corre en una task tokio dedicada que procesa comandos
//! externos (Dial, Listen, AddDhtPeer, FindClosestPeers) y eventos
//! del swarm (NewListenAddr para señalar address resuelto, eventos
//! Kad para completar queries). Los métodos públicos solo envían
//! comandos por canal.
//! - Re-exportar `BrahmanNet` bajo el alias histórico `LibP2pNode`
//! para zero churn en `MingaPeer`.
//! - Declarar la const `SYNC_PROTOCOL` específica de Minga
//! (`/minga/sync/1.0.0`).
//!
//! Cualquier consumer que necesite armar un nodo P2P puede importar
//! `brahman_net::BrahmanNet` directo y registrar sus propios protocolos
//! sin pasar por minga.
use std::collections::HashMap;
use std::time::Duration;
pub use brahman_net::{BrahmanNet as LibP2pNode, DiscoveredPeer, NodeError};
use futures::StreamExt;
use libp2p::{
identify, identity, kad, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
};
use libp2p_stream as stream;
use tokio::sync::{mpsc, oneshot, Mutex};
use libp2p::StreamProtocol;
/// Sub-protocolo de sync Minga sobre la malla brahman-net.
pub const SYNC_PROTOCOL: StreamProtocol = StreamProtocol::new("/minga/sync/1.0.0");
const IDENTIFY_PROTOCOL: &str = "/minga/0.1.0";
#[derive(NetworkBehaviour)]
struct MingaBehaviour {
stream: stream::Behaviour,
kad: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
}
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("transport build failed: {0}")]
Build(String),
}
#[derive(Debug)]
enum Command {
Dial(Multiaddr),
Listen(Multiaddr),
AddDhtPeer(PeerId, Multiaddr),
FindClosestPeers(PeerId, oneshot::Sender<Vec<DiscoveredPeer>>),
StartProviding(Vec<u8>),
GetProviders(Vec<u8>, oneshot::Sender<Vec<PeerId>>),
}
/// Peer descubierto vía DHT: identidad + direcciones conocidas.
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
pub peer_id: PeerId,
pub addrs: Vec<Multiaddr>,
}
pub struct LibP2pNode {
pub peer_id: PeerId,
cmd_tx: mpsc::UnboundedSender<Command>,
listen_rx: Mutex<mpsc::UnboundedReceiver<Multiaddr>>,
/// Control para abrir/aceptar streams.
pub control: stream::Control,
}
impl LibP2pNode {
pub fn new() -> Result<Self, NodeError> {
let id = identity::Keypair::generate_ed25519();
let peer_id = id.public().to_peer_id();
let mut swarm: Swarm<MingaBehaviour> = SwarmBuilder::with_existing_identity(id)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_behaviour(|key| {
let local = key.public().to_peer_id();
let mut kad =
kad::Behaviour::new(local, kad::store::MemoryStore::new(local));
// Modo Server: respondemos a queries del DHT. Por
// defecto kad arranca en Auto, que requiere detectar
// reachability. Para tests en localhost forzamos Server.
kad.set_mode(Some(kad::Mode::Server));
let identify = identify::Behaviour::new(
identify::Config::new(IDENTIFY_PROTOCOL.to_string(), key.public())
.with_agent_version(format!("minga/{}", env!("CARGO_PKG_VERSION"))),
);
MingaBehaviour {
stream: stream::Behaviour::new(),
kad,
identify,
}
})
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let control = swarm.behaviour().stream.new_control();
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Command>();
let (listen_tx, listen_rx) = mpsc::unbounded_channel::<Multiaddr>();
tokio::spawn(async move {
let mut pending_finds: HashMap<
kad::QueryId,
oneshot::Sender<Vec<DiscoveredPeer>>,
> = HashMap::new();
let mut pending_providers: HashMap<
kad::QueryId,
(Vec<PeerId>, oneshot::Sender<Vec<PeerId>>),
> = HashMap::new();
loop {
tokio::select! {
Some(cmd) = cmd_rx.recv() => {
match cmd {
Command::Dial(addr) => {
let _ = swarm.dial(addr);
}
Command::Listen(addr) => {
let _ = swarm.listen_on(addr);
}
Command::AddDhtPeer(peer, addr) => {
swarm.behaviour_mut().kad.add_address(&peer, addr);
}
Command::FindClosestPeers(target, tx) => {
let qid = swarm.behaviour_mut().kad.get_closest_peers(target);
pending_finds.insert(qid, tx);
}
Command::StartProviding(key) => {
// Best-effort: si falla (sin peers cercanos para
// replicar), seguirá viviendo en el local store
// y se servirá vía get_providers de quien
// tenga conexión con nosotros.
let _ = swarm.behaviour_mut().kad.start_providing(key.into());
}
Command::GetProviders(key, tx) => {
let qid = swarm.behaviour_mut().kad.get_providers(key.into());
pending_providers.insert(qid, (Vec::new(), tx));
}
}
}
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
let _ = listen_tx.send(address);
}
// Identify nos dice las listen-addrs reales del
// peer. Las inyectamos a Kad para poblar el
// routing table sin necesidad de add_dht_peer
// manual — la propagación pasa a ser automática.
SwarmEvent::Behaviour(MingaBehaviourEvent::Identify(
identify::Event::Received { peer_id, info, .. }
)) => {
for addr in info.listen_addrs {
swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
}
SwarmEvent::Behaviour(MingaBehaviourEvent::Kad(
kad::Event::OutboundQueryProgressed { id, result, step, .. }
)) => {
match result {
kad::QueryResult::GetClosestPeers(Ok(ok)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let infos = ok.peers.into_iter()
.map(|p| DiscoveredPeer {
peer_id: p.peer_id,
addrs: p.addrs,
})
.collect();
let _ = tx.send(infos);
}
}
kad::QueryResult::GetClosestPeers(Err(_)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let _ = tx.send(Vec::new());
}
}
kad::QueryResult::GetProviders(Ok(ok)) => {
if let Some((collected, _)) =
pending_providers.get_mut(&id)
{
if let kad::GetProvidersOk::FoundProviders {
providers, ..
} = ok
{
for p in providers {
if !collected.contains(&p) {
collected.push(p);
}
}
}
}
if step.last {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
}
kad::QueryResult::GetProviders(Err(_)) if step.last => {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
_ => {}
}
}
_ => {}
}
}
}
}
});
Ok(Self {
peer_id,
cmd_tx,
listen_rx: Mutex::new(listen_rx),
control,
})
}
pub async fn listen(&self, addr: Multiaddr) -> Multiaddr {
self.cmd_tx
.send(Command::Listen(addr))
.expect("swarm task alive");
let mut rx = self.listen_rx.lock().await;
rx.recv().await.expect("listen address arrives")
}
pub fn dial(&self, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::Dial(addr));
}
/// Añade un peer al routing table de Kademlia. Punto de entrada
/// para bootstrap: tras esto, el nodo puede dirigir queries DHT
/// a través de este peer.
pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::AddDhtPeer(peer, addr));
}
/// Consulta el DHT por los peers más cercanos al `target` PeerId.
/// Devuelve la lista resuelta (vacía si la query falla o si no
/// hay peers conocidos). Bloquea hasta que la query completa.
pub async fn find_closest_peers(&self, target: PeerId) -> Vec<DiscoveredPeer> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::FindClosestPeers(target, tx));
rx.await.unwrap_or_default()
}
/// Anuncia en el DHT que este peer tiene el contenido identificado
/// por `key`. Otros peers pueden luego descubrirlo vía
/// `find_providers(key)`. Best-effort: si la replicación falla
/// inicialmente, el record vive en el store local.
pub fn start_providing(&self, key: &[u8]) {
let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec()));
}
/// Consulta el DHT por peers que han anunciado proveer `key`.
/// Devuelve la lista de `PeerId`s que se reportan como providers.
/// Lista vacía si nadie anuncia.
pub async fn find_providers(&self, key: &[u8]) -> Vec<PeerId> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::GetProviders(key.to_vec(), tx));
rx.await.unwrap_or_default()
}
}
+21
View File
@@ -0,0 +1,21 @@
[package]
name = "brahman-net"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "Brahman — capa de transporte P2P compartida (libp2p TCP+Noise+Yamux+Kad+Identify+Stream). Cualquier protocolo (handshake brahman, sync minga, futuros) puede registrar su StreamProtocol y abrir/aceptar streams sobre la malla común."
[dependencies]
futures = { workspace = true }
libp2p = { workspace = true }
libp2p-stream = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
tokio = { workspace = true }
tokio-util = { workspace = true }
+337
View File
@@ -0,0 +1,337 @@
//! `brahman-net` — capa P2P compartida de la red Brahman.
//!
//! Provee un nodo libp2p genérico que cualquier protocolo de la
//! familia (handshake brahman remoto, sync minga, futuros) puede
//! reusar. La idea: una sola malla, múltiples sub-protocolos
//! multiplexados por `StreamProtocol`.
//!
//! ## Stack
//!
//! - **TCP + Noise + Yamux**: transporte autenticado y multiplexado.
//! - **`stream::Behaviour`**: streams bidireccionales por
//! `StreamProtocol`. Cada protocolo (`/brahman/handshake/1.0.0`,
//! `/minga/sync/1.0.0`, …) se registra independientemente vía el
//! `stream::Control` que `BrahmanNet` expone.
//! - **`kad::Behaviour<MemoryStore>`**: Kademlia DHT en modo Server
//! para discovery (peers cercanos + content providers).
//! - **`identify::Behaviour`**: cada peer anuncia sus listen-addrs
//! reales; las inyectamos automáticamente al routing table de Kad.
//!
//! ## Modelo
//!
//! El swarm corre en una task tokio dedicada. La interfaz pública son:
//! 1. **Comandos** (canal mpsc): `dial`, `listen`, `add_dht_peer`,
//! `find_closest_peers`, `start_providing`, `find_providers`.
//! 2. **`stream::Control`** (acceso directo): para abrir/aceptar
//! streams de un protocolo concreto. Cada protocolo se ocupa de
//! su propia lógica sobre el stream resultante.
//!
//! La separación entre comandos y control permite que la lógica de
//! red (DHT, dial, listen) y la lógica de protocolos (handshake/sync)
//! evolucionen independientes — el protocolo no necesita conocer al
//! swarm, sólo pide streams.
//!
//! ## Identidad
//!
//! Por defecto se genera una keypair Ed25519 efímera. Para identidad
//! persistente (la misma `peer_id` across reboots), pasar la keypair
//! con [`BrahmanNet::with_keypair`]. Esa misma keypair puede ser la
//! base para firmas de Cards (cuando se implemente trust remoto).
#![forbid(unsafe_code)]
#![warn(rust_2018_idioms)]
use std::collections::HashMap;
use std::time::Duration;
use futures::StreamExt;
use libp2p::{
identify, identity, kad, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
};
use libp2p_stream as stream;
use tokio::sync::{mpsc, oneshot, Mutex};
pub use libp2p::{Stream, StreamProtocol};
const IDENTIFY_PROTOCOL: &str = "/brahman-net/0.1.0";
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(NetworkBehaviour)]
struct BrahmanBehaviour {
stream: stream::Behaviour,
kad: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
}
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("transport build failed: {0}")]
Build(String),
}
#[derive(Debug)]
enum Command {
Dial(Multiaddr),
Listen(Multiaddr),
AddDhtPeer(PeerId, Multiaddr),
FindClosestPeers(PeerId, oneshot::Sender<Vec<DiscoveredPeer>>),
StartProviding(Vec<u8>),
GetProviders(Vec<u8>, oneshot::Sender<Vec<PeerId>>),
}
/// Peer descubierto vía DHT: identidad + direcciones conocidas.
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
pub peer_id: PeerId,
pub addrs: Vec<Multiaddr>,
}
/// Nodo Brahman en la malla P2P. Maneja el swarm libp2p y expone
/// API uniforme para listen/dial/DHT/streams.
pub struct BrahmanNet {
/// Identidad libp2p de este nodo. Estable mientras viva la
/// keypair (efímera por default; persistente si pasaste una
/// vía [`with_keypair`]).
pub peer_id: PeerId,
cmd_tx: mpsc::UnboundedSender<Command>,
listen_rx: Mutex<mpsc::UnboundedReceiver<Multiaddr>>,
/// Control para abrir y aceptar streams. Cada protocolo
/// (handshake brahman, sync minga, etc.) llama
/// `control.accept(StreamProtocol::new("/foo/1.0.0"))` para
/// recibir streams entrantes, o `control.open_stream(peer, proto)`
/// para abrirlos. Multiplexado y demultiplexado lo hace libp2p.
pub control: stream::Control,
}
impl BrahmanNet {
/// Crea un nodo con keypair Ed25519 generada al vuelo (peer_id
/// efímero — cambia en cada arranque).
pub fn new() -> Result<Self, NodeError> {
Self::with_keypair(identity::Keypair::generate_ed25519())
}
/// Crea un nodo con una keypair libp2p específica. Usá esto para
/// `peer_id` estable (por ejemplo si tu identidad se persiste a
/// disco, o si la derivás de la identidad criptográfica del
/// módulo).
pub fn with_keypair(keypair: identity::Keypair) -> Result<Self, NodeError> {
let peer_id = keypair.public().to_peer_id();
let mut swarm: Swarm<BrahmanBehaviour> = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_behaviour(|key| {
let local = key.public().to_peer_id();
let mut kad =
kad::Behaviour::new(local, kad::store::MemoryStore::new(local));
// Modo Server: respondemos a queries del DHT. Auto
// requiere detectar reachability; para entornos
// controlados (localhost, redes privadas) Server es
// lo correcto.
kad.set_mode(Some(kad::Mode::Server));
let identify = identify::Behaviour::new(
identify::Config::new(IDENTIFY_PROTOCOL.to_string(), key.public())
.with_agent_version(format!("brahman-net/{}", env!("CARGO_PKG_VERSION"))),
);
BrahmanBehaviour {
stream: stream::Behaviour::new(),
kad,
identify,
}
})
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_swarm_config(|c| c.with_idle_connection_timeout(IDLE_CONNECTION_TIMEOUT))
.build();
let control = swarm.behaviour().stream.new_control();
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Command>();
let (listen_tx, listen_rx) = mpsc::unbounded_channel::<Multiaddr>();
tokio::spawn(async move {
let mut pending_finds: HashMap<
kad::QueryId,
oneshot::Sender<Vec<DiscoveredPeer>>,
> = HashMap::new();
let mut pending_providers: HashMap<
kad::QueryId,
(Vec<PeerId>, oneshot::Sender<Vec<PeerId>>),
> = HashMap::new();
loop {
tokio::select! {
Some(cmd) = cmd_rx.recv() => {
match cmd {
Command::Dial(addr) => {
let _ = swarm.dial(addr);
}
Command::Listen(addr) => {
let _ = swarm.listen_on(addr);
}
Command::AddDhtPeer(peer, addr) => {
swarm.behaviour_mut().kad.add_address(&peer, addr);
}
Command::FindClosestPeers(target, tx) => {
let qid = swarm.behaviour_mut().kad.get_closest_peers(target);
pending_finds.insert(qid, tx);
}
Command::StartProviding(key) => {
// Best-effort: si falla (sin peers cercanos para
// replicar), seguirá viviendo en el local store
// y se servirá vía get_providers de quien tenga
// conexión con nosotros.
let _ = swarm.behaviour_mut().kad.start_providing(key.into());
}
Command::GetProviders(key, tx) => {
let qid = swarm.behaviour_mut().kad.get_providers(key.into());
pending_providers.insert(qid, (Vec::new(), tx));
}
}
}
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
let _ = listen_tx.send(address);
}
// Identify nos dice las listen-addrs reales del
// peer. Las inyectamos a Kad para poblar el
// routing table sin necesidad de add_dht_peer
// manual — la propagación pasa a ser automática.
SwarmEvent::Behaviour(BrahmanBehaviourEvent::Identify(
identify::Event::Received { peer_id, info, .. }
)) => {
for addr in info.listen_addrs {
swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
}
SwarmEvent::Behaviour(BrahmanBehaviourEvent::Kad(
kad::Event::OutboundQueryProgressed { id, result, step, .. }
)) => {
match result {
kad::QueryResult::GetClosestPeers(Ok(ok)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let infos = ok.peers.into_iter()
.map(|p| DiscoveredPeer {
peer_id: p.peer_id,
addrs: p.addrs,
})
.collect();
let _ = tx.send(infos);
}
}
kad::QueryResult::GetClosestPeers(Err(_)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let _ = tx.send(Vec::new());
}
}
kad::QueryResult::GetProviders(Ok(ok)) => {
if let Some((collected, _)) =
pending_providers.get_mut(&id)
{
if let kad::GetProvidersOk::FoundProviders {
providers, ..
} = ok
{
for p in providers {
if !collected.contains(&p) {
collected.push(p);
}
}
}
}
if step.last {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
}
kad::QueryResult::GetProviders(Err(_)) if step.last => {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
_ => {}
}
}
_ => {}
}
}
}
}
});
Ok(Self {
peer_id,
cmd_tx,
listen_rx: Mutex::new(listen_rx),
control,
})
}
/// Empieza a escuchar en `addr`. Bloquea hasta que el listener
/// publique su dirección real (Multiaddr resuelta — útil cuando
/// pediste `/ip4/0.0.0.0/tcp/0` y querés saber qué puerto te tocó).
pub async fn listen(&self, addr: Multiaddr) -> Multiaddr {
self.cmd_tx
.send(Command::Listen(addr))
.expect("swarm task alive");
let mut rx = self.listen_rx.lock().await;
rx.recv().await.expect("listen address arrives")
}
/// Inicia conexión con un peer en `addr`. No-op si ya hay
/// conexión. Best-effort — fallos se loggean al swarm pero no se
/// propagan al caller (consistente con libp2p).
pub fn dial(&self, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::Dial(addr));
}
/// Añade un peer al routing table de Kademlia. Punto de entrada
/// para bootstrap: tras esto, el nodo puede dirigir queries DHT
/// a través de este peer.
pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::AddDhtPeer(peer, addr));
}
/// Consulta el DHT por los peers más cercanos al `target` PeerId.
/// Devuelve la lista resuelta (vacía si la query falla o si no
/// hay peers conocidos). Bloquea hasta que la query completa.
pub async fn find_closest_peers(&self, target: PeerId) -> Vec<DiscoveredPeer> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::FindClosestPeers(target, tx));
rx.await.unwrap_or_default()
}
/// Anuncia en el DHT que este peer tiene el contenido identificado
/// por `key`. Otros peers pueden luego descubrirlo vía
/// [`find_providers`](Self::find_providers). Best-effort: si la
/// replicación falla inicialmente, el record vive en el store
/// local hasta que llegue conexión.
pub fn start_providing(&self, key: &[u8]) {
let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec()));
}
/// Consulta el DHT por peers que han anunciado proveer `key`.
/// Devuelve la lista de `PeerId`s que se reportan como providers.
/// Lista vacía si nadie anuncia.
pub async fn find_providers(&self, key: &[u8]) -> Vec<PeerId> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::GetProviders(key.to_vec(), tx));
rx.await.unwrap_or_default()
}
}