diff --git a/CHANGELOG.md b/CHANGELOG.md index a0a0e3f..23e5c6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,44 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(brahman-net+handshake): stop_providing automático en cleanup de sesión +Cierra el pendiente conocido del DHT: hasta ahora cuando una sesión +con outputs cerraba (Farewell, EOF, error), el record que la +anunciaba en el DHT seguía vivo hasta su TTL natural (~24h en kad +default). Consumers remotos podían descubrir un peer "vivo" que ya +no servía nada. + +Cambios: +- **`BrahmanNet::stop_providing(key)`** (nuevo): contraparte simétrica + de `start_providing`. Manda `Command::StopProviding` al swarm que + llama `kad.stop_providing(&key)`. Borra el record del provider + store local al instante; replicas en peers remotos siguen + expirando por TTL (kad no expone retracción cross-peer, simétrico + al hecho de que `start_providing` también propaga eventualmente). +- **`brahman_handshake::network::withdraw_outputs(net, card)`** + (nuevo): contraparte de `announce_outputs`. Itera `card.flow.output` + y llama `net.stop_providing(flow_dht_key(...))` por cada uno. +- **`server::cleanup`**: extrae la `ResolvedCard` removida del registro + de sesiones (en lugar de descartarla con `remove`) y, si + `config.net` está set, llama `withdraw_outputs(net, &card)` antes + de `broadcast_match_diffs`. + +Tests: nuevo E2E `dht_discovery_withdraws_on_session_cleanup`: +1. A registra Card con `flow.output = monad-list:json`. +2. B descubre a A vía `find_remote_providers` — confirma + `before.contains(&a_peer)`. +3. Cliente local de A hace `farewell` → cleanup → withdraw_outputs. +4. Espera a que la sesión salga del registro (señal de cleanup + completado) + 100ms para que el swarm procese el Command. +5. Nueva query desde B: `after` NO debe contener `a_peer`. + +3 tests verdes en `network_discovery.rs` (positivo, negativo, +withdraw). 18 tests totales en handshake + net. + +Pendiente futuro: retracción cross-peer en kad (requeriría extensión +del protocolo libp2p, no soportada hoy). Aceptable: simétrico al +modelo de propagación eventual del DHT. + ### feat(ente-zero): wire de Arje con brahman-net (red P2P opcional + identidad persistente) Cierra el último pendiente del plan de red: Arje ahora puede arrancar opcionalmente con `BrahmanNet` configurado, persistir su identidad diff --git a/crates/core/brahman-handshake/src/network.rs b/crates/core/brahman-handshake/src/network.rs index 0c0d496..ceb818d 100644 --- a/crates/core/brahman-handshake/src/network.rs +++ b/crates/core/brahman-handshake/src/network.rs @@ -220,6 +220,22 @@ pub fn announce_outputs(net: &BrahmanNet, card: &Card) { } } +/// Retira los anuncios DHT previos de [`announce_outputs`] para esta +/// `card`. Llamado desde `cleanup` cuando una sesión cierra (Farewell, +/// EOF, error). El record local se borra al instante; copias +/// replicadas en peers remotos expiran por TTL natural de kad. +pub fn withdraw_outputs(net: &BrahmanNet, card: &Card) { + for flow in &card.flow.output { + let key = flow_dht_key(&flow.name, &flow.ty); + debug!( + target: "brahman_handshake::network", + flow = %flow.name, + "withdraw_output → DHT (stop_providing)" + ); + net.stop_providing(&key); + } +} + /// Consulta el DHT por peers que han anunciado proveer el flow /// `(flow_name, type_ref)`. Devuelve la lista resuelta de `PeerId`s. /// Lista vacía si nadie anuncia, si la query timeout-ea, o si el diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 898d2bb..af93bd6 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -388,10 +388,10 @@ where } } -/// Limpieza atómica de TRES vistas: registro de sesiones + broker + -/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF, -/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron -/// el match contra ésta. +/// Limpieza atómica de las vistas registradas + (si net activo) retiro +/// de anuncios DHT de los outputs de la Card. Se ejecuta tanto si la +/// sesión cierra por Farewell, EOF, o error. Tras desregistrar, emite +/// diffs a las sesiones que perdieron el match contra ésta. async fn cleanup( session_id: SessionId, sessions: &SessionRegistry, @@ -399,12 +399,18 @@ async fn cleanup( last_matches: &LastMatches, config: &ServerConfig, ) { - sessions.lock().await.remove(&session_id); + // Tomamos la Card ANTES de borrarla — si net está configurado + // necesitamos sus outputs para llamar withdraw_outputs. `remove` + // devuelve el valor extraído. + let removed_card = sessions.lock().await.remove(&session_id); push_table.lock().await.remove(&session_id); last_matches.lock().await.remove(&session_id); if let Some(broker) = &config.broker { broker.lock().await.unregister(session_id); } + if let (Some(net), Some(resolved)) = (&config.net, removed_card) { + crate::network::withdraw_outputs(net, &resolved.card); + } broadcast_match_diffs(push_table, last_matches, config).await; } diff --git a/crates/core/brahman-handshake/tests/network_discovery.rs b/crates/core/brahman-handshake/tests/network_discovery.rs index 16b8dce..08a8d3d 100644 --- a/crates/core/brahman-handshake/tests/network_discovery.rs +++ b/crates/core/brahman-handshake/tests/network_discovery.rs @@ -228,3 +228,110 @@ async fn dht_discovery_negative_unknown_flow() { local.farewell().await.ok(); } + +/// stop_providing test: A registra Card con flow X, B descubre a A. +/// El cliente local de A hace farewell → cleanup llama +/// withdraw_outputs → A se quita del provider local store. Una nueva +/// query desde B (que rutea por A, único peer en el DHT) ya no debe +/// listarlo. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn dht_discovery_withdraws_on_session_cleanup() { + let tmp = TempDir::new().unwrap(); + let a_unix = tmp.path().join("a.sock"); + let a_broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let a_net = Arc::new(BrahmanNet::new().unwrap()); + let a_peer = a_net.peer_id; + + let a_server = Arc::new( + Server::bind( + &a_unix, + ServerConfig { + init_attached: true, + broker: Some(a_broker), + net: Some(a_net.clone()), + }, + ) + .unwrap(), + ); + let sessions = a_server.sessions(); + + let a_addr = a_net.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap()).await; + let mut a_full = a_addr.clone(); + a_full.push(Protocol::P2p(a_peer)); + + tokio::spawn(run_libp2p_accept_loop(a_server.clone(), a_net.clone())); + { + let s = a_server.clone(); + tokio::spawn(async move { + loop { + match s.accept_one().await { + Ok(session) => { + tokio::spawn(async move { + let _ = session.handle().await; + }); + } + Err(_) => break, + } + } + }); + } + + // Card con un flow output anunciable. + let card = provider_card("test.withdraws", "monad-list", "json"); + let local = brahman_handshake::client::Client::connect(&a_unix, card) + .await + .expect("registro local en A"); + + let b_net = BrahmanNet::new().unwrap(); + b_net.dial(a_full); + tokio::time::sleep(Duration::from_millis(500)).await; + + // Confirmación previa: A es discoverable. + let before = find_remote_providers( + &b_net, + "monad-list", + &TypeRef::Primitive { + name: "json".into(), + }, + ) + .await; + assert!( + before.contains(&a_peer), + "antes del farewell A debería ser discoverable. got: {:?}", + before + ); + + // Farewell del cliente local → server.cleanup → withdraw_outputs. + local.farewell().await.ok(); + + // Esperamos a que la sesión salga del registro de A (señal de + // que cleanup completó). + let mut waited = 0; + while !sessions.lock().await.is_empty() && waited < 50 { + tokio::time::sleep(Duration::from_millis(20)).await; + waited += 1; + } + assert!( + sessions.lock().await.is_empty(), + "sesión debería estar removida tras farewell" + ); + + // Pequeño margen extra para que el Command::StopProviding lo + // procese el swarm task (no es await-able desde fuera). + tokio::time::sleep(Duration::from_millis(100)).await; + + // Nueva query: A ya no debería listarse como provider. + let after = find_remote_providers( + &b_net, + "monad-list", + &TypeRef::Primitive { + name: "json".into(), + }, + ) + .await; + assert!( + !after.contains(&a_peer), + "tras farewell + withdraw_outputs, A NO debería ser discoverable. got: {:?}", + after + ); +} diff --git a/crates/shared/brahman-net/src/lib.rs b/crates/shared/brahman-net/src/lib.rs index 59e1a8e..501be05 100644 --- a/crates/shared/brahman-net/src/lib.rs +++ b/crates/shared/brahman-net/src/lib.rs @@ -84,6 +84,7 @@ enum Command { AddDhtPeer(PeerId, Multiaddr), FindClosestPeers(PeerId, oneshot::Sender>), StartProviding(Vec), + StopProviding(Vec), GetProviders(Vec, oneshot::Sender>), } @@ -211,6 +212,18 @@ impl BrahmanNet { // conexión con nosotros. let _ = swarm.behaviour_mut().kad.start_providing(key.into()); } + Command::StopProviding(key) => { + // Quitamos el record local del provider store. + // Los peers cercanos eventualmente expiran su + // copia replicada por TTL natural (~24h en + // libp2p kad default); para retiro inmediato + // habría que enviar un republish con sentinel, + // pero kad no expone esa primitiva. Aceptable + // para el caso "el provider local desapareció": + // queries que pasen por nosotros dejan de + // listarnos al instante. + swarm.behaviour_mut().kad.stop_providing(&key.into()); + } Command::GetProviders(key, tx) => { let qid = swarm.behaviour_mut().kad.get_providers(key.into()); pending_providers.insert(qid, (Vec::new(), tx)); @@ -355,6 +368,16 @@ impl BrahmanNet { let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec())); } + /// Retira el anuncio previo de [`start_providing`] para `key`. + /// El record local se borra al instante (queries que lleguen a + /// nosotros dejan de listarnos). Los records replicados en peers + /// remotos viven hasta su TTL — kad no expone primitiva para + /// retracción inmediata cross-peer. Aceptable: simétrico al + /// caso "el provider apareció" (también propagación eventual). + pub fn stop_providing(&self, key: &[u8]) { + let _ = self.cmd_tx.send(Command::StopProviding(key.to_vec())); + } + /// Consulta el DHT por peers que han anunciado proveer `key`. /// Devuelve la lista de `PeerId`s que se reportan como providers. /// Lista vacía si nadie anuncia.