feat(brahman-net+handshake): stop_providing automatico en cleanup

Cierra el pendiente conocido del DHT: hasta ahora cuando una sesion
con outputs cerraba (Farewell, EOF, error), el record que la
anunciaba en el DHT seguia vivo hasta su TTL natural (~24h en kad
default). Consumers remotos podian descubrir un peer "vivo" que ya
no servia nada.

Cambios:
- BrahmanNet::stop_providing(key) (nuevo): contraparte simetrica de
  start_providing. Manda Command::StopProviding al swarm que llama
  kad.stop_providing(&key). Borra el record local al instante;
  replicas remotas siguen expirando por TTL (kad no expone retraccion
  cross-peer, simetrico al hecho de que start_providing tambien
  propaga eventualmente).
- brahman_handshake::network::withdraw_outputs(net, card) (nuevo):
  contraparte de announce_outputs. Itera card.flow.output y llama
  net.stop_providing(flow_dht_key(...)) por cada uno.
- server::cleanup: extrae la ResolvedCard removida del registro de
  sesiones (en lugar de descartarla) y, si config.net esta set,
  llama withdraw_outputs(net, &card) antes de broadcast_match_diffs.

Tests: nuevo E2E dht_discovery_withdraws_on_session_cleanup:
1. A registra Card con flow.output = monad-list:json.
2. B descubre a A via find_remote_providers (assert before contains
   a_peer).
3. Cliente local de A hace farewell -> cleanup -> withdraw_outputs.
4. Espera a que la sesion salga del registro + 100ms para que el
   swarm procese el Command.
5. Nueva query desde B: after NO debe contener a_peer.

3 tests verdes en network_discovery.rs (positivo, negativo, withdraw).
18 tests totales en handshake + net.
This commit is contained in:
Sergio
2026-05-09 15:10:30 +00:00
parent fa0ed98880
commit 2e6afd0973
5 changed files with 195 additions and 5 deletions
@@ -220,6 +220,22 @@ pub fn announce_outputs(net: &BrahmanNet, card: &Card) {
}
}
/// Retira los anuncios DHT previos de [`announce_outputs`] para esta
/// `card`. Llamado desde `cleanup` cuando una sesión cierra (Farewell,
/// EOF, error). El record local se borra al instante; copias
/// replicadas en peers remotos expiran por TTL natural de kad.
pub fn withdraw_outputs(net: &BrahmanNet, card: &Card) {
for flow in &card.flow.output {
let key = flow_dht_key(&flow.name, &flow.ty);
debug!(
target: "brahman_handshake::network",
flow = %flow.name,
"withdraw_output → DHT (stop_providing)"
);
net.stop_providing(&key);
}
}
/// Consulta el DHT por peers que han anunciado proveer el flow
/// `(flow_name, type_ref)`. Devuelve la lista resuelta de `PeerId`s.
/// Lista vacía si nadie anuncia, si la query timeout-ea, o si el
+11 -5
View File
@@ -388,10 +388,10 @@ where
}
}
/// Limpieza atómica de TRES vistas: registro de sesiones + broker +
/// canal push. Se ejecuta tanto si la sesión cierra por Farewell, EOF,
/// o error. Tras desregistrar, emite diffs a las sesiones que perdieron
/// el match contra ésta.
/// Limpieza atómica de las vistas registradas + (si net activo) retiro
/// de anuncios DHT de los outputs de la Card. Se ejecuta tanto si la
/// sesión cierra por Farewell, EOF, o error. Tras desregistrar, emite
/// diffs a las sesiones que perdieron el match contra ésta.
async fn cleanup(
session_id: SessionId,
sessions: &SessionRegistry,
@@ -399,12 +399,18 @@ async fn cleanup(
last_matches: &LastMatches,
config: &ServerConfig,
) {
sessions.lock().await.remove(&session_id);
// Tomamos la Card ANTES de borrarla — si net está configurado
// necesitamos sus outputs para llamar withdraw_outputs. `remove`
// devuelve el valor extraído.
let removed_card = sessions.lock().await.remove(&session_id);
push_table.lock().await.remove(&session_id);
last_matches.lock().await.remove(&session_id);
if let Some(broker) = &config.broker {
broker.lock().await.unregister(session_id);
}
if let (Some(net), Some(resolved)) = (&config.net, removed_card) {
crate::network::withdraw_outputs(net, &resolved.card);
}
broadcast_match_diffs(push_table, last_matches, config).await;
}
@@ -228,3 +228,110 @@ async fn dht_discovery_negative_unknown_flow() {
local.farewell().await.ok();
}
/// stop_providing test: A registra Card con flow X, B descubre a A.
/// El cliente local de A hace farewell → cleanup llama
/// withdraw_outputs → A se quita del provider local store. Una nueva
/// query desde B (que rutea por A, único peer en el DHT) ya no debe
/// listarlo.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn dht_discovery_withdraws_on_session_cleanup() {
let tmp = TempDir::new().unwrap();
let a_unix = tmp.path().join("a.sock");
let a_broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default())));
let a_net = Arc::new(BrahmanNet::new().unwrap());
let a_peer = a_net.peer_id;
let a_server = Arc::new(
Server::bind(
&a_unix,
ServerConfig {
init_attached: true,
broker: Some(a_broker),
net: Some(a_net.clone()),
},
)
.unwrap(),
);
let sessions = a_server.sessions();
let a_addr = a_net.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap()).await;
let mut a_full = a_addr.clone();
a_full.push(Protocol::P2p(a_peer));
tokio::spawn(run_libp2p_accept_loop(a_server.clone(), a_net.clone()));
{
let s = a_server.clone();
tokio::spawn(async move {
loop {
match s.accept_one().await {
Ok(session) => {
tokio::spawn(async move {
let _ = session.handle().await;
});
}
Err(_) => break,
}
}
});
}
// Card con un flow output anunciable.
let card = provider_card("test.withdraws", "monad-list", "json");
let local = brahman_handshake::client::Client::connect(&a_unix, card)
.await
.expect("registro local en A");
let b_net = BrahmanNet::new().unwrap();
b_net.dial(a_full);
tokio::time::sleep(Duration::from_millis(500)).await;
// Confirmación previa: A es discoverable.
let before = find_remote_providers(
&b_net,
"monad-list",
&TypeRef::Primitive {
name: "json".into(),
},
)
.await;
assert!(
before.contains(&a_peer),
"antes del farewell A debería ser discoverable. got: {:?}",
before
);
// Farewell del cliente local → server.cleanup → withdraw_outputs.
local.farewell().await.ok();
// Esperamos a que la sesión salga del registro de A (señal de
// que cleanup completó).
let mut waited = 0;
while !sessions.lock().await.is_empty() && waited < 50 {
tokio::time::sleep(Duration::from_millis(20)).await;
waited += 1;
}
assert!(
sessions.lock().await.is_empty(),
"sesión debería estar removida tras farewell"
);
// Pequeño margen extra para que el Command::StopProviding lo
// procese el swarm task (no es await-able desde fuera).
tokio::time::sleep(Duration::from_millis(100)).await;
// Nueva query: A ya no debería listarse como provider.
let after = find_remote_providers(
&b_net,
"monad-list",
&TypeRef::Primitive {
name: "json".into(),
},
)
.await;
assert!(
!after.contains(&a_peer),
"tras farewell + withdraw_outputs, A NO debería ser discoverable. got: {:?}",
after
);
}
+23
View File
@@ -84,6 +84,7 @@ enum Command {
AddDhtPeer(PeerId, Multiaddr),
FindClosestPeers(PeerId, oneshot::Sender<Vec<DiscoveredPeer>>),
StartProviding(Vec<u8>),
StopProviding(Vec<u8>),
GetProviders(Vec<u8>, oneshot::Sender<Vec<PeerId>>),
}
@@ -211,6 +212,18 @@ impl BrahmanNet {
// conexión con nosotros.
let _ = swarm.behaviour_mut().kad.start_providing(key.into());
}
Command::StopProviding(key) => {
// Quitamos el record local del provider store.
// Los peers cercanos eventualmente expiran su
// copia replicada por TTL natural (~24h en
// libp2p kad default); para retiro inmediato
// habría que enviar un republish con sentinel,
// pero kad no expone esa primitiva. Aceptable
// para el caso "el provider local desapareció":
// queries que pasen por nosotros dejan de
// listarnos al instante.
swarm.behaviour_mut().kad.stop_providing(&key.into());
}
Command::GetProviders(key, tx) => {
let qid = swarm.behaviour_mut().kad.get_providers(key.into());
pending_providers.insert(qid, (Vec::new(), tx));
@@ -355,6 +368,16 @@ impl BrahmanNet {
let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec()));
}
/// Retira el anuncio previo de [`start_providing`] para `key`.
/// El record local se borra al instante (queries que lleguen a
/// nosotros dejan de listarnos). Los records replicados en peers
/// remotos viven hasta su TTL — kad no expone primitiva para
/// retracción inmediata cross-peer. Aceptable: simétrico al
/// caso "el provider apareció" (también propagación eventual).
pub fn stop_providing(&self, key: &[u8]) {
let _ = self.cmd_tx.send(Command::StopProviding(key.to_vec()));
}
/// Consulta el DHT por peers que han anunciado proveer `key`.
/// Devuelve la lista de `PeerId`s que se reportan como providers.
/// Lista vacía si nadie anuncia.