diff --git a/CHANGELOG.md b/CHANGELOG.md index e95a324..ca484ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,57 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(brahman-handshake): Fase 2 — discovery remoto vía DHT por flow type +Tercer paso del plan "el encuentro entre Entes no se restringe a +local". Cuando un Init local acepta una sesión cuya Card declara +outputs, anuncia al DHT (Kademlia, vía `brahman-net`) que él provee +esos flow types. Cualquier nodo conectado al mismo DHT puede +consultar y obtener la lista de `PeerId`s que sirven el flow. + +API nueva en `brahman_handshake::network`: +- `flow_dht_key(flow_name, type_ref) -> [u8; 32]`: blake3 hash de + `"brahman-flow|v1|{flow}|{type_canon}"`. Determinístico cross-host. + Cambiar la canonicalización rompe compatibilidad — el prefijo `v1` + documenta la versión del esquema y obliga a bump al modificar. +- `announce_outputs(net, card)`: llama `start_providing` en el DHT + por cada `Flow` en `card.flow.output`. Idempotente, fire-and-forget. +- `find_remote_providers(net, flow_name, type_ref) -> Vec`: + query DHT por la key derivada. Lista vacía si nadie anuncia o si + la query no resuelve dentro del timeout interno de Kad. + +Wire en el server: +- `ServerConfig` gana `pub net: Option>`. Si está set, + cada Card registrada con outputs se anuncia automáticamente al DHT + desde `register_session`. `None` = server "ciego al DHT" (correcto + cuando no hay conectividad o el operador no quiere exponer). +- `ServerConfig` ahora tiene `Debug` manual (BrahmanNet no implementa + Debug; loggeamos sólo presencia/ausencia). + +Canonicalización del TypeRef: +- `Primitive { name }` → `prim:{name}` +- `Wit { package, interface, name }` → `wit:{package}#{interface_or_empty}#{name}` + +Tests: 2 nuevos en `tests/network_discovery.rs`: +- `dht_discovery_finds_remote_provider`: dos nodos, A registra Card + con `flow.output = monad-list:json`, B dial-ea a A y descubre el + `peer_id` de A vía `find_remote_providers`. Asserts contains. +- `dht_discovery_negative_unknown_flow`: B busca un flow que nadie + anunció, devuelve lista vacía sin colgarse. + +Lo que esto desbloquea: +- Un `nouser daemon` corriendo en máquina A puede ser descubierto por + un `nouser-explorer` en máquina B sin conocimiento previo del peer + — sólo necesitan compartir DHT (vía bootstrap inicial). +- La cadena completa "explorer → daemon → llm-provider" puede cruzar + máquinas, no sólo procesos. + +Lo que queda para Fase 3 (trust): +- Cards remotas se aceptan hoy sin verificación. Para producción se + necesita firma Ed25519 sobre la Card y verificación antes de + aceptar el Hello remoto. Local sigue confiando en SO_PEERCRED. +- Stop-providing al cleanup de sesión (hoy records DHT viven hasta + TTL ~24h aunque la sesión cierre). + ### feat(brahman-handshake): Fase 1 — handshake brahman sobre stream libp2p Segundo paso del plan "el encuentro entre Entes no se restringe a local". El protocolo brahman (Hello / HelloAck / Ping / Pong / diff --git a/Cargo.lock b/Cargo.lock index 15aeb83..258a2f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1199,6 +1199,7 @@ name = "brahman-handshake" version = "0.1.0" dependencies = [ "anyhow", + "blake3", "brahman-broker", "brahman-card", "brahman-net", diff --git a/crates/core/brahman-handshake/Cargo.toml b/crates/core/brahman-handshake/Cargo.toml index b931377..057a003 100644 --- a/crates/core/brahman-handshake/Cargo.toml +++ b/crates/core/brahman-handshake/Cargo.toml @@ -12,6 +12,7 @@ description = "Brahman — handshake runtime Init↔módulo. Local sobre Unix so brahman-card = { path = "../brahman-card" } brahman-broker = { path = "../brahman-broker" } brahman-net = { path = "../../shared/brahman-net" } +blake3 = { workspace = true } futures = { workspace = true } serde = { workspace = true } postcard = { workspace = true } diff --git a/crates/core/brahman-handshake/src/network.rs b/crates/core/brahman-handshake/src/network.rs index d032062..c1af3a4 100644 --- a/crates/core/brahman-handshake/src/network.rs +++ b/crates/core/brahman-handshake/src/network.rs @@ -43,11 +43,11 @@ use std::sync::Arc; -use brahman_card::{Card, WitInterface}; +use brahman_card::{Card, TypeRef, WitInterface}; use brahman_net::{BrahmanNet, OpenStreamError, PeerId, Stream, StreamProtocol}; use futures::StreamExt; use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; -use tracing::warn; +use tracing::{debug, warn}; use crate::client::{Client, ClientError}; use crate::server::Server; @@ -138,3 +138,92 @@ pub async fn connect_libp2p( let client = Client::connect_with_stream(stream.compat(), card, wit).await?; Ok(client) } + +// ===================================================================== +// Discovery remoto via DHT — Fase 2 +// ===================================================================== +// +// Cuando un Ente registra una Card con outputs en el Init local, el +// Init anuncia al DHT (`net.start_providing(key)`) bajo una key +// derivada de `(flow_name, TypeRef)`. Cualquier nodo conectado al +// mismo DHT puede consultar `find_remote_providers(flow_name, type)` +// y obtener la lista de `PeerId`s que dijeron proveer ese flow. +// +// La key es **estable y libre de colisiones** entre versiones del +// monorepo: usa blake3 sobre un canon textual `brahman-flow|{name}|{type_canon}`. +// Cambiar la canonicalización rompe el discovery cross-version, así +// que cualquier modificación requiere bump de versión documentado. + +/// Prefijo de namespace para todas las keys DHT del subprotocolo +/// brahman. Discrimina contra otros usos del mismo DHT (sync minga, +/// futuros) — protege contra colisiones accidentales. +const FLOW_KEY_PREFIX: &str = "brahman-flow|v1|"; + +/// Canonicaliza un `TypeRef` a string estable. Cambios aquí rompen +/// la compatibilidad de discovery cross-version; bump documentado +/// en `FLOW_KEY_PREFIX` al modificar. +fn canonicalize_type(t: &TypeRef) -> String { + match t { + TypeRef::Primitive { name } => format!("prim:{}", name), + TypeRef::Wit { + package, + interface, + name, + } => format!( + "wit:{}#{}#{}", + package, + interface.as_deref().unwrap_or(""), + name + ), + } +} + +/// Deriva la key del DHT para un `(flow_name, type_ref)` específico. +/// blake3-32B determinístico — la misma tupla en cualquier máquina +/// produce la misma key. +pub fn flow_dht_key(flow_name: &str, type_ref: &TypeRef) -> [u8; 32] { + let canon = format!( + "{}{}|{}", + FLOW_KEY_PREFIX, + flow_name, + canonicalize_type(type_ref) + ); + *blake3::hash(canon.as_bytes()).as_bytes() +} + +/// Anuncia al DHT que este nodo provee cada output flow declarado +/// en `card`. Llamarlo tras `register_session` propaga la +/// disponibilidad a todos los peers que comparten DHT con éste. +/// +/// Idempotente: re-anunciar la misma key actualiza el TTL del record +/// en el DHT. Best-effort: si `start_providing` falla por falta de +/// peers cercanos (DHT vacío), el record vive en el store local +/// hasta que llegue una conexión. +pub fn announce_outputs(net: &BrahmanNet, card: &Card) { + for flow in &card.flow.output { + let key = flow_dht_key(&flow.name, &flow.ty); + debug!( + target: "brahman_handshake::network", + flow = %flow.name, + "announce_output → DHT" + ); + net.start_providing(&key); + } +} + +/// Consulta el DHT por peers que han anunciado proveer el flow +/// `(flow_name, type_ref)`. Devuelve la lista resuelta de `PeerId`s. +/// Lista vacía si nadie anuncia, si la query timeout-ea, o si el +/// DHT no ha encontrado providers. +/// +/// Para cada `PeerId` devuelto, el caller puede luego dial-ar al +/// peer (a sus addrs conocidas vía Identify) y abrir un sub-handshake +/// remoto con [`connect_libp2p`]. +pub async fn find_remote_providers( + net: &BrahmanNet, + flow_name: &str, + type_ref: &TypeRef, +) -> Vec { + let key = flow_dht_key(flow_name, type_ref); + net.find_providers(&key).await +} diff --git a/crates/core/brahman-handshake/src/server.rs b/crates/core/brahman-handshake/src/server.rs index 11511f6..61a363f 100644 --- a/crates/core/brahman-handshake/src/server.rs +++ b/crates/core/brahman-handshake/src/server.rs @@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use brahman_broker::{Broker, Endpoint}; use brahman_card::{Card, ResolvedCard, WitInterface, CARD_SCHEMA_VERSION}; +use brahman_net::BrahmanNet; use tokio::io::{split, AsyncRead, AsyncWrite, WriteHalf}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, Mutex}; @@ -39,7 +40,7 @@ type LastMatches = Arc>>>; const PUSH_CHANNEL_CAPACITY: usize = 32; /// Configuración del servidor. -#[derive(Debug, Clone, Default)] +#[derive(Clone, Default)] pub struct ServerConfig { /// `true` si el Init está atado al servidor (se reporta en `HelloAck`). pub init_attached: bool, @@ -47,6 +48,27 @@ pub struct ServerConfig { /// `register` tras un Hello aceptado y `unregister` al cerrar la /// sesión (Farewell o EOF). Si es `None`, el broker no se usa. pub broker: Option, + /// Capa P2P compartida. Si está presente, cada Card registrada + /// con outputs se anuncia automáticamente al DHT vía + /// [`brahman_handshake::network::announce_outputs`], permitiendo + /// que un consumer remoto los descubra con + /// [`brahman_handshake::network::find_remote_providers`]. Si es + /// `None`, el server queda "ciego al DHT" — sólo matchea sesiones + /// locales (lo cual es correcto cuando no hay conectividad o no + /// se desea exponer al exterior). + pub net: Option>, +} + +// Manual Debug porque BrahmanNet no implementa Debug (libp2p Swarm +// no es Debug). Sólo loggeamos los campos relevantes para tracing. +impl std::fmt::Debug for ServerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ServerConfig") + .field("init_attached", &self.init_attached) + .field("broker", &self.broker.as_ref().map(|_| "")) + .field("net", &self.net.as_ref().map(|_| "")) + .finish() + } } /// Servidor de handshake escuchando en un Unix socket. @@ -508,6 +530,12 @@ async fn register_session( .await .register(session_id, &card, wit.clone()); } + // Si el server tiene net configurado, anunciar los outputs al + // DHT para que peers remotos puedan descubrirlos. Idempotente + // y best-effort — fallos de Kad no propagan al handshake. + if let Some(net) = &config.net { + crate::network::announce_outputs(net, &card); + } let resolved = match wit { Some(w) => ResolvedCard::from_conscious(card, w), None => ResolvedCard::from_agnostic(card), diff --git a/crates/core/brahman-handshake/tests/handshake.rs b/crates/core/brahman-handshake/tests/handshake.rs index ac20ff9..23b7a05 100644 --- a/crates/core/brahman-handshake/tests/handshake.rs +++ b/crates/core/brahman-handshake/tests/handshake.rs @@ -58,7 +58,7 @@ fn sock_path(name: &str) -> std::path::PathBuf { #[tokio::test] async fn full_handshake_roundtrip() { let path = sock_path("happy"); - let server = Server::bind(&path, ServerConfig { init_attached: true, broker: None }).unwrap(); + let server = Server::bind(&path, ServerConfig { init_attached: true, broker: None, net: None }).unwrap(); let session_handle = tokio::spawn({ async move { @@ -193,6 +193,7 @@ async fn broker_registers_and_unregisters_with_session() { ServerConfig { init_attached: false, broker: Some(broker.clone()), + net: None, }, ) .unwrap(); @@ -235,6 +236,7 @@ async fn broker_matches_two_live_modules() { ServerConfig { init_attached: false, broker: Some(broker.clone()), + net: None, }, ) .unwrap(); @@ -310,6 +312,7 @@ async fn match_event_pushed_on_producer_arrival() { ServerConfig { init_attached: false, broker: Some(broker.clone()), + net: None, }, ) .unwrap(); diff --git a/crates/core/brahman-handshake/tests/network_discovery.rs b/crates/core/brahman-handshake/tests/network_discovery.rs new file mode 100644 index 0000000..16b8dce --- /dev/null +++ b/crates/core/brahman-handshake/tests/network_discovery.rs @@ -0,0 +1,230 @@ +//! Test E2E de Fase 2: discovery remoto vía DHT. +//! +//! Pipeline: +//! 1. **Provider node (A)**: arma server con `BrahmanNet` configurado; +//! listen TCP; un cliente local registra una Card con un output +//! flow. El server llama `announce_outputs` automáticamente, lo +//! que hace `start_providing` en el DHT bajo la key derivada del +//! flow. +//! 2. **Consumer node (B)**: arma su propio `BrahmanNet`; dial-ea al +//! multiaddr del provider para que ambos se conozcan vía Identify +//! (esto popula sus respectivos routing tables de Kademlia). +//! 3. **B llama `find_remote_providers(flow_name, type)`**: la query +//! DHT propaga vía Kad, y eventually el provider responde con su +//! `PeerId`. +//! 4. **Verificación**: el `PeerId` que B descubre coincide con el +//! de A. +//! +//! Notas: +//! - Kademlia replication factor por defecto es 20; con 2 nodos no +//! hay propagación material — A es el único provider, B llega a A +//! vía la conexión directa establecida en step 2 y obtiene el record +//! del store local de A. +//! - El test usa flow `monad-list:json` por familiaridad (es el flow +//! real que `nouser daemon` declara). Sirve también como prueba de +//! que el sistema completo (daemon + DHT) funcionaría con cero +//! cambios en la Card. + +use std::collections::BTreeSet; +use std::sync::Arc; +use std::time::Duration; + +use brahman_broker::{Broker, BrokerConfig}; +use brahman_card::{ + ulid::Ulid, Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef, + CARD_SCHEMA_VERSION, +}; +use brahman_handshake::network::{find_remote_providers, run_libp2p_accept_loop}; +use brahman_handshake::server::{Server, ServerConfig}; +use brahman_net::{BrahmanNet, Multiaddr, Protocol}; +use tempfile::TempDir; +use tokio::sync::Mutex; + +fn provider_card(label: &str, flow_name: &str, type_name: &str) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.into(), + provides: BTreeSet::new(), + requires: BTreeSet::new(), + permissions: Default::default(), + soma: Default::default(), + payload: Payload::Virtual, + supervision: Supervision::Delegate, + lifecycle: Lifecycle::Daemon, + priority: Priority::Normal, + kind: CardKind::Ente, + flow: Flows { + input: vec![], + output: vec![Flow { + name: flow_name.into(), + ty: TypeRef::Primitive { + name: type_name.into(), + }, + pin_to: None, + }], + }, + ..Default::default() + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn dht_discovery_finds_remote_provider() { + // ---- Node A (provider): server + libp2p net + Card con output ---- + let tmp = TempDir::new().unwrap(); + let a_unix = tmp.path().join("a.sock"); + + let a_broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let a_net = Arc::new(BrahmanNet::new().unwrap()); + let a_peer = a_net.peer_id; + + let a_server = Arc::new( + Server::bind( + &a_unix, + ServerConfig { + init_attached: true, + broker: Some(a_broker.clone()), + net: Some(a_net.clone()), // ← clave Fase 2: anuncia al DHT + }, + ) + .unwrap(), + ); + + let listen_addr: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + let a_addr = a_net.listen(listen_addr).await; + let mut a_full_addr = a_addr.clone(); + a_full_addr.push(Protocol::P2p(a_peer)); + + tokio::spawn(run_libp2p_accept_loop(a_server.clone(), a_net.clone())); + + // Unix accept loop: necesario para que Client::connect al socket + // local no cuelgue (Server no se auto-accepta; el caller arma el + // loop). Cada session entrante corre en su propia task. + { + let s = a_server.clone(); + tokio::spawn(async move { + loop { + match s.accept_one().await { + Ok(session) => { + tokio::spawn(async move { + let _ = session.handle().await; + }); + } + Err(_) => break, + } + } + }); + } + + // Registrar la Card local en A con un flow output. + let card = provider_card("test.engine_remote", "monad-list", "json"); + let mut local_client = brahman_handshake::client::Client::connect(&a_unix, card) + .await + .expect("registro local en A"); + + // ---- Node B (consumer): otro net que dial-a a A ---- + let b_net = BrahmanNet::new().unwrap(); + b_net.dial(a_full_addr.clone()); + + // Esperar a que la conexión se establezca y Identify popule el + // routing table de Kad. En localhost con 2 peers, ~250ms es de + // sobra; sumamos margen para CI. + tokio::time::sleep(Duration::from_millis(500)).await; + + // ---- Discovery: B busca providers de "monad-list:json" ---- + let providers = find_remote_providers( + &b_net, + "monad-list", + &TypeRef::Primitive { + name: "json".into(), + }, + ) + .await; + + assert!( + providers.contains(&a_peer), + "B debería descubrir a A vía DHT. Encontrados: {:?}, esperado: {}", + providers, + a_peer + ); + + // Sanidad: el cliente local sigue vivo durante todo el test (lo + // que mantiene la Card registrada y por tanto el record DHT vivo). + local_client.farewell().await.ok(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn dht_discovery_negative_unknown_flow() { + // Mismo setup que el test happy-path, pero B busca un flow que A + // NO ofrece. Debe devolver lista vacía dentro del timeout + // razonable (no colgarse). + let tmp = TempDir::new().unwrap(); + let a_unix = tmp.path().join("a.sock"); + let a_broker = Arc::new(Mutex::new(Broker::new(BrokerConfig::default()))); + let a_net = Arc::new(BrahmanNet::new().unwrap()); + let a_peer = a_net.peer_id; + + let a_server = Arc::new( + Server::bind( + &a_unix, + ServerConfig { + init_attached: true, + broker: Some(a_broker), + net: Some(a_net.clone()), + }, + ) + .unwrap(), + ); + + let a_addr = a_net.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap()).await; + let mut a_full = a_addr.clone(); + a_full.push(Protocol::P2p(a_peer)); + + tokio::spawn(run_libp2p_accept_loop(a_server.clone(), a_net.clone())); + + // Unix accept loop: necesario para que Client::connect al socket + // local no cuelgue (Server no se auto-accepta; el caller arma el + // loop). Cada session entrante corre en su propia task. + { + let s = a_server.clone(); + tokio::spawn(async move { + loop { + match s.accept_one().await { + Ok(session) => { + tokio::spawn(async move { + let _ = session.handle().await; + }); + } + Err(_) => break, + } + } + }); + } + + let card = provider_card("test.engine_other", "monad-list", "json"); + let mut local = brahman_handshake::client::Client::connect(&a_unix, card) + .await + .unwrap(); + + let b_net = BrahmanNet::new().unwrap(); + b_net.dial(a_full); + tokio::time::sleep(Duration::from_millis(500)).await; + + // Buscamos un flow que NADIE anunció. + let providers = find_remote_providers( + &b_net, + "flow-que-no-existe", + &TypeRef::Primitive { + name: "json".into(), + }, + ) + .await; + + assert!( + providers.is_empty(), + "no debería haber providers para un flow inexistente, got: {:?}", + providers + ); + + local.farewell().await.ok(); +} diff --git a/crates/core/brahman-handshake/tests/network_libp2p.rs b/crates/core/brahman-handshake/tests/network_libp2p.rs index 5b94812..94a6527 100644 --- a/crates/core/brahman-handshake/tests/network_libp2p.rs +++ b/crates/core/brahman-handshake/tests/network_libp2p.rs @@ -55,6 +55,7 @@ async fn libp2p_handshake_roundtrip() { ServerConfig { init_attached: true, broker: Some(broker.clone()), + net: None, }, ) .unwrap(), diff --git a/crates/core/ente-zero/src/main.rs b/crates/core/ente-zero/src/main.rs index db5c526..5c9b7f1 100644 --- a/crates/core/ente-zero/src/main.rs +++ b/crates/core/ente-zero/src/main.rs @@ -164,6 +164,10 @@ async fn primordial_loop( brahman_handshake::server::ServerConfig { init_attached: true, broker: Some(brahman_broker.clone()), + // Fase 2: el Init aún no expone red P2P por default. Cuando + // arje quiera publicar Cards al DHT remoto, pasar aquí un + // `Some(Arc)` con la malla configurada. + net: None, }, ) { Ok(server) => {