chore: monorepo inicial con arje + minga + yahweh absorbidos

Workspace en 4 ejes (core/modules/apps/shared):

- core/: 24 crates de arje (Init systemd-compatible: ente-card, ente-zero,
  ente-kernel, ente-bus, ente-cas, ente-soma, ente-wasm, ente-snapshot,
  ente-brain, ente-echo, ente-policy-provider, + 12 crates *-compat)
- modules/semantic_dht/: 5 crates de minga (minga-core con AST/CAS/MST,
  minga-p2p con libp2p Kad, minga-store, minga-vfs, minga-cli)
- modules/ui_engine/: 11 crates de yahweh (libs/{core,theme,bus,providers},
  widgets/{tree,splitter,tabs,tiled,container_core,text_input})
- apps/: 5 crates de yahweh (file_explorer, database_explorer, text_viewer,
  image_viewer, yahweh-shell)
- shared_wit/protocol.wit: handshake/lifecycle inicial

Cargo.toml unificado: thiserror bumped a 2 (transparente para arje), tokio
"full", paths intra-workspace de yahweh redirigidos a su nueva ubicación.

cargo check --workspace: 0 errores, 17 warnings (dead code preexistente).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-08 04:45:44 +00:00
commit 53dbdf0f1d
176 changed files with 34845 additions and 0 deletions
@@ -0,0 +1,100 @@
//! Driver de sincronización sobre I/O asíncrona.
//!
//! Bridge entre la `SyncSession` puramente lógica y cualquier
//! transporte que implemente `AsyncRead + AsyncWrite`. Encuadre
//! length-prefixed: cada `Message` se serializa con postcard y se
//! envía precedido de un `u32 LE` con su longitud en bytes.
//!
//! La estructura del bucle es:
//! 1. Drenar todos los `Message`s pendientes a la salida.
//! 2. Si la sesión declara `is_done`, salir.
//! 3. Bloquear esperando un `Message` entrante; alimentarlo a la
//! sesión y volver al paso 1.
//!
//! Esto funciona porque cada paso del state machine emite los
//! mensajes que necesita inmediatamente — nunca quedan colgados
//! mensajes por un `Message` futuro. La única espera real ocurre en
//! el paso 3, cuando estamos esperando que el peer responda.
use std::collections::VecDeque;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::message::Message;
use crate::session::SyncSession;
/// Cota dura sobre el tamaño de un frame, para evitar que un peer
/// malicioso (o un bug) cause asignaciones desbocadas. 16 MB es de
/// sobra para mensajes de sync — un `AttestPush` de cien mil
/// atestaciones cabe en ~13 MB.
const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
#[derive(Debug, thiserror::Error)]
pub enum AsyncSyncError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("decode postcard: {0}")]
Decode(#[from] postcard::Error),
#[error("frame demasiado grande: {0} bytes")]
FrameTooLarge(u32),
#[error("la sesión cerró sin alcanzar `is_done`")]
UnexpectedClose,
}
/// Ejecuta una sesión de sincronización completa sobre una stream
/// duplex. Devuelve la `SyncSession` resultante (con el `Mst`,
/// `MemStore` y `AttestationStore` ya mergeados con el peer).
pub async fn run_sync_async<S>(
mut session: SyncSession,
mut stream: S,
) -> Result<SyncSession, AsyncSyncError>
where
S: AsyncRead + AsyncWrite + Unpin,
{
let mut outbound: VecDeque<Message> = session.start().into();
loop {
while let Some(msg) = outbound.pop_front() {
send_frame(&mut stream, &msg).await?;
}
if session.is_done() {
return Ok(session);
}
let msg = recv_frame(&mut stream).await?;
outbound.extend(session.handle(msg));
}
}
async fn send_frame<S>(stream: &mut S, msg: &Message) -> Result<(), AsyncSyncError>
where
S: AsyncWrite + Unpin,
{
let bytes = msg.encode();
let len = bytes.len() as u32;
if len > MAX_FRAME_SIZE {
return Err(AsyncSyncError::FrameTooLarge(len));
}
stream.write_all(&len.to_le_bytes()).await?;
stream.write_all(&bytes).await?;
stream.flush().await?;
Ok(())
}
async fn recv_frame<S>(stream: &mut S) -> Result<Message, AsyncSyncError>
where
S: AsyncRead + Unpin,
{
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_le_bytes(len_buf);
if len > MAX_FRAME_SIZE {
return Err(AsyncSyncError::FrameTooLarge(len));
}
let mut buf = vec![0u8; len as usize];
stream.read_exact(&mut buf).await?;
Ok(Message::decode(&buf)?)
}
@@ -0,0 +1,89 @@
//! Harness in-memory determinístico para correr dos `SyncSession`s
//! una contra la otra y verificar invariantes del protocolo.
use std::collections::VecDeque;
use crate::message::Message;
use crate::session::SyncSession;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SyncStats {
pub challenges: usize,
pub hellos: usize,
pub probe_reqs: usize,
pub probe_ress: usize,
pub fetches: usize,
pub delivers: usize,
pub attest_pushes: usize,
pub dones: usize,
}
impl SyncStats {
fn record(&mut self, m: &Message) {
match m {
Message::Challenge { .. } => self.challenges += 1,
Message::Hello { .. } => self.hellos += 1,
Message::ProbeReq { .. } => self.probe_reqs += 1,
Message::ProbeRes { .. } => self.probe_ress += 1,
Message::Fetch { .. } => self.fetches += 1,
Message::Deliver { .. } => self.delivers += 1,
Message::AttestPush { .. } => self.attest_pushes += 1,
Message::Done => self.dones += 1,
}
}
pub fn total(&self) -> usize {
self.challenges
+ self.hellos
+ self.probe_reqs
+ self.probe_ress
+ self.fetches
+ self.delivers
+ self.attest_pushes
+ self.dones
}
}
/// Ejecuta la sincronización entre dos sesiones hasta convergencia.
///
/// Pánico si la conversación termina sin que ambas partes alcancen
/// `is_done()` — eso sería un deadlock del protocolo y una regresión.
pub fn run_sync(a: &mut SyncSession, b: &mut SyncSession) -> SyncStats {
let mut from_a: VecDeque<Message> = VecDeque::new();
let mut from_b: VecDeque<Message> = VecDeque::new();
let mut stats = SyncStats::default();
from_a.extend(a.start());
from_b.extend(b.start());
loop {
let mut progress = false;
if let Some(msg) = from_a.pop_front() {
stats.record(&msg);
for out in b.handle(msg) {
from_b.push_back(out);
}
progress = true;
}
if let Some(msg) = from_b.pop_front() {
stats.record(&msg);
for out in a.handle(msg) {
from_a.push_back(out);
}
progress = true;
}
if !progress {
break;
}
}
assert!(
a.is_done() && b.is_done(),
"deadlock: sync terminó sin que ambos peers cerraran"
);
stats
}
@@ -0,0 +1,26 @@
//! minga-p2p: protocolo de sincronización entre repositorios Minga.
//!
//! Este crate define el **protocolo** y la **máquina de estados** de la
//! sincronización P2P, sin acoplarse a un transporte concreto. Un peer
//! manipula una `SyncSession` (puramente lógica) que consume mensajes
//! entrantes y produce mensajes salientes; el transporte real —libp2p,
//! HTTP, in-memory, lo que sea— se reduce a serializar/deserializar y
//! mover bytes.
//!
//! Este orden refleja el principio bottom-up del proyecto: validamos la
//! convergencia del protocolo con un `harness` in-memory determinístico
//! antes de invertir en async runtime + libp2p.
pub mod async_driver;
pub mod harness;
pub mod message;
pub mod network;
pub mod peer;
pub mod session;
pub use async_driver::{run_sync_async, AsyncSyncError};
pub use harness::{run_sync, SyncStats};
pub use message::Message;
pub use network::{DiscoveredPeer, LibP2pNode, NodeError, SYNC_PROTOCOL};
pub use peer::{MingaPeer, PeerOpenError, PeerSyncError};
pub use session::SyncSession;
@@ -0,0 +1,94 @@
//! Mensajes del protocolo de sincronización (versión recursiva sobre
//! la estructura del MST).
//!
//! El protocolo es simétrico — ambos peers ejecutan el mismo rol y
//! emiten los mismos mensajes — y consta de seis tipos:
//!
//! 1. `Hello { root_subtree_hash }` anuncia el hash Merkle del MST raíz
//! del emisor. Si ambos hashes coinciden, los dos repos son idénticos
//! y la sincronización termina sin un solo byte adicional.
//!
//! 2. `ProbeReq { subtree_hash }` solicita la **estructura** (level +
//! keys + child_hashes) de un subárbol previamente anunciado por el
//! otro peer. Es lo que permite descender el árbol del peer paso a
//! paso, podando ramas idénticas por igualdad de hash.
//!
//! 3. `ProbeRes { subtree_hash, probe }` responde con el `NodeProbe`,
//! o `None` si el subárbol era el vacío. Cada subárbol que el peer
//! no reconoce dispara un `ProbeReq` recursivo; cuando el peer ya
//! tiene un subárbol con el mismo hash, la rama se poda.
//!
//! 4. `Fetch { hash }` y `Deliver { hash, stored }` mueven los nodos
//! propiamente dichos. El receptor del `Deliver` **verifica
//! criptográficamente** que `hash_stored(stored) == hash` antes de
//! insertar — un peer malicioso no puede colar un `StoredNode`
//! distinto bajo un hash anunciado.
//!
//! 5. `Done` cierra el lado del emisor: ya recibió el `Hello` del otro,
//! no tiene probes ni fetches pendientes. Cuando ambos `Done`s han
//! cruzado, la sesión termina con ambos repos convergentes.
use minga_core::{Attestation, ContentHash, Did, NodeProbe, Signature, StoredNode};
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum Message {
/// Reto de session-handshake: 32 bytes aleatorios. Cada peer envía
/// uno al inicio. El otro lado lo incrustará en el payload del
/// `Hello` que firme con su llave privada — así un `Hello`
/// capturado en una sesión no puede replayearse en otra (que
/// tendrá un nonce distinto).
Challenge {
nonce: [u8; 32],
},
/// Saludo autenticado anti-replay: el emisor presenta su DID, el
/// hash del subárbol raíz de su MST, y una firma sobre el payload
/// `(peer_did || root_subtree_hash || nonce_recibido_del_peer)`.
/// El receptor reconstruye el payload con su PROPIO nonce (el que
/// envió en su Challenge) y verifica con la llave pública del
/// peer. Sin Challenge previo no hay Hello válido posible.
Hello {
peer_did: Did,
root_subtree_hash: ContentHash,
signature: Signature,
},
ProbeReq {
subtree_hash: ContentHash,
},
ProbeRes {
subtree_hash: ContentHash,
probe: Option<NodeProbe>,
},
Fetch {
hash: ContentHash,
},
Deliver {
hash: ContentHash,
stored: StoredNode,
},
/// Empuje de atestaciones: el emisor entrega al peer las pruebas
/// criptográficas de autoría que conoce. Cada `Attestation` es
/// auto-verificable (firma + autor + contenido), así que el
/// receptor puede validar y mezclar sin confiar en la palabra del
/// remitente. Se envían tras el `Hello` autenticado para que el
/// peer verifique la identidad del remitente antes de procesarlas.
AttestPush {
attestations: Vec<Attestation>,
},
Done,
}
impl Message {
/// Codifica el mensaje a bytes vía postcard. Diseñado para
/// transferir sobre cualquier transporte que mueva `Vec<u8>`.
/// Postcard es compacto, sin overhead de schema runtime.
pub fn encode(&self) -> Vec<u8> {
postcard::to_allocvec(self).expect("postcard encoding cannot fail for our types")
}
/// Decodifica bytes a un `Message`. `Err` si los bytes son
/// malformados o no representan un `Message` válido.
pub fn decode(bytes: &[u8]) -> Result<Self, postcard::Error> {
postcard::from_bytes(bytes)
}
}
@@ -0,0 +1,282 @@
//! Integración libp2p con behaviour compuesto: streams Minga +
//! Kademlia DHT.
//!
//! - **TCP + Noise + Yamux**: transporte autenticado y multiplexado.
//! - **`stream::Behaviour`**: streams bidireccionales para el
//! protocolo `/minga/sync/1.0.0`.
//! - **`kad::Behaviour<MemoryStore>`**: tabla de routing distribuida
//! para descubrimiento. Cada nodo arranca en modo `Server` y
//! responde a queries del DHT.
//!
//! El swarm corre en una task tokio dedicada que procesa comandos
//! externos (Dial, Listen, AddDhtPeer, FindClosestPeers) y eventos
//! del swarm (NewListenAddr para señalar address resuelto, eventos
//! Kad para completar queries). Los métodos públicos solo envían
//! comandos por canal.
use std::collections::HashMap;
use std::time::Duration;
use futures::StreamExt;
use libp2p::{
identify, identity, kad, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
};
use libp2p_stream as stream;
use tokio::sync::{mpsc, oneshot, Mutex};
pub const SYNC_PROTOCOL: StreamProtocol = StreamProtocol::new("/minga/sync/1.0.0");
const IDENTIFY_PROTOCOL: &str = "/minga/0.1.0";
#[derive(NetworkBehaviour)]
struct MingaBehaviour {
stream: stream::Behaviour,
kad: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
}
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("transport build failed: {0}")]
Build(String),
}
#[derive(Debug)]
enum Command {
Dial(Multiaddr),
Listen(Multiaddr),
AddDhtPeer(PeerId, Multiaddr),
FindClosestPeers(PeerId, oneshot::Sender<Vec<DiscoveredPeer>>),
StartProviding(Vec<u8>),
GetProviders(Vec<u8>, oneshot::Sender<Vec<PeerId>>),
}
/// Peer descubierto vía DHT: identidad + direcciones conocidas.
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
pub peer_id: PeerId,
pub addrs: Vec<Multiaddr>,
}
pub struct LibP2pNode {
pub peer_id: PeerId,
cmd_tx: mpsc::UnboundedSender<Command>,
listen_rx: Mutex<mpsc::UnboundedReceiver<Multiaddr>>,
/// Control para abrir/aceptar streams.
pub control: stream::Control,
}
impl LibP2pNode {
pub fn new() -> Result<Self, NodeError> {
let id = identity::Keypair::generate_ed25519();
let peer_id = id.public().to_peer_id();
let mut swarm: Swarm<MingaBehaviour> = SwarmBuilder::with_existing_identity(id)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_behaviour(|key| {
let local = key.public().to_peer_id();
let mut kad =
kad::Behaviour::new(local, kad::store::MemoryStore::new(local));
// Modo Server: respondemos a queries del DHT. Por
// defecto kad arranca en Auto, que requiere detectar
// reachability. Para tests en localhost forzamos Server.
kad.set_mode(Some(kad::Mode::Server));
let identify = identify::Behaviour::new(
identify::Config::new(IDENTIFY_PROTOCOL.to_string(), key.public())
.with_agent_version(format!("minga/{}", env!("CARGO_PKG_VERSION"))),
);
MingaBehaviour {
stream: stream::Behaviour::new(),
kad,
identify,
}
})
.map_err(|e| NodeError::Build(format!("{e}")))?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
let control = swarm.behaviour().stream.new_control();
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Command>();
let (listen_tx, listen_rx) = mpsc::unbounded_channel::<Multiaddr>();
tokio::spawn(async move {
let mut pending_finds: HashMap<
kad::QueryId,
oneshot::Sender<Vec<DiscoveredPeer>>,
> = HashMap::new();
let mut pending_providers: HashMap<
kad::QueryId,
(Vec<PeerId>, oneshot::Sender<Vec<PeerId>>),
> = HashMap::new();
loop {
tokio::select! {
Some(cmd) = cmd_rx.recv() => {
match cmd {
Command::Dial(addr) => {
let _ = swarm.dial(addr);
}
Command::Listen(addr) => {
let _ = swarm.listen_on(addr);
}
Command::AddDhtPeer(peer, addr) => {
swarm.behaviour_mut().kad.add_address(&peer, addr);
}
Command::FindClosestPeers(target, tx) => {
let qid = swarm.behaviour_mut().kad.get_closest_peers(target);
pending_finds.insert(qid, tx);
}
Command::StartProviding(key) => {
// Best-effort: si falla (sin peers cercanos para
// replicar), seguirá viviendo en el local store
// y se servirá vía get_providers de quien
// tenga conexión con nosotros.
let _ = swarm.behaviour_mut().kad.start_providing(key.into());
}
Command::GetProviders(key, tx) => {
let qid = swarm.behaviour_mut().kad.get_providers(key.into());
pending_providers.insert(qid, (Vec::new(), tx));
}
}
}
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
let _ = listen_tx.send(address);
}
// Identify nos dice las listen-addrs reales del
// peer. Las inyectamos a Kad para poblar el
// routing table sin necesidad de add_dht_peer
// manual — la propagación pasa a ser automática.
SwarmEvent::Behaviour(MingaBehaviourEvent::Identify(
identify::Event::Received { peer_id, info, .. }
)) => {
for addr in info.listen_addrs {
swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
}
SwarmEvent::Behaviour(MingaBehaviourEvent::Kad(
kad::Event::OutboundQueryProgressed { id, result, step, .. }
)) => {
match result {
kad::QueryResult::GetClosestPeers(Ok(ok)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let infos = ok.peers.into_iter()
.map(|p| DiscoveredPeer {
peer_id: p.peer_id,
addrs: p.addrs,
})
.collect();
let _ = tx.send(infos);
}
}
kad::QueryResult::GetClosestPeers(Err(_)) if step.last => {
if let Some(tx) = pending_finds.remove(&id) {
let _ = tx.send(Vec::new());
}
}
kad::QueryResult::GetProviders(Ok(ok)) => {
if let Some((collected, _)) =
pending_providers.get_mut(&id)
{
if let kad::GetProvidersOk::FoundProviders {
providers, ..
} = ok
{
for p in providers {
if !collected.contains(&p) {
collected.push(p);
}
}
}
}
if step.last {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
}
kad::QueryResult::GetProviders(Err(_)) if step.last => {
if let Some((providers, tx)) =
pending_providers.remove(&id)
{
let _ = tx.send(providers);
}
}
_ => {}
}
}
_ => {}
}
}
}
}
});
Ok(Self {
peer_id,
cmd_tx,
listen_rx: Mutex::new(listen_rx),
control,
})
}
pub async fn listen(&self, addr: Multiaddr) -> Multiaddr {
self.cmd_tx
.send(Command::Listen(addr))
.expect("swarm task alive");
let mut rx = self.listen_rx.lock().await;
rx.recv().await.expect("listen address arrives")
}
pub fn dial(&self, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::Dial(addr));
}
/// Añade un peer al routing table de Kademlia. Punto de entrada
/// para bootstrap: tras esto, el nodo puede dirigir queries DHT
/// a través de este peer.
pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) {
let _ = self.cmd_tx.send(Command::AddDhtPeer(peer, addr));
}
/// Consulta el DHT por los peers más cercanos al `target` PeerId.
/// Devuelve la lista resuelta (vacía si la query falla o si no
/// hay peers conocidos). Bloquea hasta que la query completa.
pub async fn find_closest_peers(&self, target: PeerId) -> Vec<DiscoveredPeer> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::FindClosestPeers(target, tx));
rx.await.unwrap_or_default()
}
/// Anuncia en el DHT que este peer tiene el contenido identificado
/// por `key`. Otros peers pueden luego descubrirlo vía
/// `find_providers(key)`. Best-effort: si la replicación falla
/// inicialmente, el record vive en el store local.
pub fn start_providing(&self, key: &[u8]) {
let _ = self.cmd_tx.send(Command::StartProviding(key.to_vec()));
}
/// Consulta el DHT por peers que han anunciado proveer `key`.
/// Devuelve la lista de `PeerId`s que se reportan como providers.
/// Lista vacía si nadie anuncia.
pub async fn find_providers(&self, key: &[u8]) -> Vec<PeerId> {
let (tx, rx) = oneshot::channel();
let _ = self
.cmd_tx
.send(Command::GetProviders(key.to_vec(), tx));
rx.await.unwrap_or_default()
}
}
@@ -0,0 +1,313 @@
//! `MingaPeer`: API de alto nivel para un nodo Minga "always-on".
//!
//! Envuelve `LibP2pNode` con estado compartido (`Mst` + `MemStore` +
//! `AttestationStore` + `Keypair`) protegido por un `Mutex` async, y
//! expone:
//! - `run_passive_accept()`: lanza un bucle que acepta streams de
//! sync continuamente, procesa cada uno en una task paralela, y
//! mergea el resultado al estado compartido.
//! - `sync_with(peer_id)`: inicia un sync activo con un peer conocido.
//! - `snapshot()`: instantánea del estado actual.
//!
//! Modelo de concurrencia: cada sync entrante toma un *clone* del
//! estado, ejecuta la sesión sobre la copia, y al terminar mergea las
//! novedades al estado compartido. Múltiples syncs pueden correr en
//! paralelo; el merge final adquiere el lock brevemente. Eventualmente
//! consistente: un sync que empezó antes que un merge terminado puede
//! no ver esas novedades, pero el siguiente sync sí.
use std::path::Path;
use std::sync::Arc;
use futures::StreamExt;
use libp2p::{Multiaddr, PeerId, Stream};
use tokio::sync::Mutex;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use minga_core::{AttestationStore, ContentHash, Keypair, MemStore, Mst, NodeStore, SemanticNode};
use minga_store::{PersistentRepo, StoreError};
use crate::async_driver::{run_sync_async, AsyncSyncError};
use crate::network::{DiscoveredPeer, LibP2pNode, NodeError, SYNC_PROTOCOL};
use crate::session::SyncSession;
#[derive(Debug, thiserror::Error)]
pub enum PeerSyncError {
#[error("open stream: {0}")]
OpenStream(#[from] libp2p_stream::OpenStreamError),
#[error("sync: {0}")]
AsyncSync(#[from] AsyncSyncError),
}
#[derive(Debug, thiserror::Error)]
pub enum PeerOpenError {
#[error("network: {0}")]
Network(#[from] NodeError),
#[error("store: {0}")]
Store(#[from] StoreError),
}
struct PeerState {
mst: Mst,
store: MemStore,
attestations: AttestationStore,
keypair: Keypair,
/// Backing persistente opcional. Si está presente, todo cambio
/// de estado escribe a disco vía write-through.
persistent: Option<Arc<PersistentRepo>>,
}
pub struct MingaPeer {
node: LibP2pNode,
state: Arc<Mutex<PeerState>>,
}
impl MingaPeer {
pub fn new(
keypair: Keypair,
mst: Mst,
store: MemStore,
attestations: AttestationStore,
) -> Result<Self, NodeError> {
let node = LibP2pNode::new()?;
let state = Arc::new(Mutex::new(PeerState {
mst,
store,
attestations,
keypair,
persistent: None,
}));
Ok(Self { node, state })
}
/// Abre o crea un peer persistente sobre `path`. Si el directorio
/// no contiene un repo, se crea vacío. Si lo contiene, se carga
/// el estado completo (MST, nodos, atestaciones) en memoria.
/// Cualquier cambio posterior se escribe a disco vía write-through.
pub fn open(keypair: Keypair, path: impl AsRef<Path>) -> Result<Self, PeerOpenError> {
let repo = Arc::new(PersistentRepo::open(path)?);
// Cargar MST desde disco.
let mut mst = Mst::new();
for r in repo.mst.iter() {
mst.insert(r?);
}
// Cargar nodos desde disco.
let mut store = MemStore::new();
for r in repo.nodes.iter() {
let (h, node) = r?;
store.put_chunked(h, node);
}
// Cargar atestaciones desde disco.
let mut attestations = AttestationStore::new();
for r in repo.attestations.iter() {
let att = r?;
// `add` re-verifica criptográficamente. Lo persistido ya
// estaba verificado, pero re-validar es cheap insurance.
let _ = attestations.add(att);
}
let node = LibP2pNode::new()?;
let state = Arc::new(Mutex::new(PeerState {
mst,
store,
attestations,
keypair,
persistent: Some(repo),
}));
Ok(Self { node, state })
}
pub fn peer_id(&self) -> PeerId {
self.node.peer_id
}
pub async fn listen(&self, addr: Multiaddr) -> Multiaddr {
self.node.listen(addr).await
}
pub fn dial(&self, addr: Multiaddr) {
self.node.dial(addr);
}
/// Añade un peer al routing table de Kademlia (bootstrap).
pub fn add_dht_peer(&self, peer: PeerId, addr: Multiaddr) {
self.node.add_dht_peer(peer, addr);
}
/// Consulta DHT por los peers más cercanos al `target`.
pub async fn find_closest_peers(&self, target: PeerId) -> Vec<DiscoveredPeer> {
self.node.find_closest_peers(target).await
}
/// Anuncia en el DHT que este peer provee el contenido `hash`.
/// Otros peers podrán descubrirlo vía `find_providers(hash)`.
pub fn announce_provider(&self, hash: ContentHash) {
self.node.start_providing(&hash.0);
}
/// Consulta el DHT por peers que han anunciado proveer este
/// contenido. La unión de los `PeerId`s permite a quien busque
/// `hash` decidir a quién dial directamente para sincronizar.
pub async fn find_providers(&self, hash: ContentHash) -> Vec<PeerId> {
self.node.find_providers(&hash.0).await
}
/// Lanza el bucle de aceptación pasiva. Devuelve un `JoinHandle`
/// que el caller puede mantener vivo (o ignorar — la task se
/// aborta al cerrar el runtime).
///
/// Cada stream entrante dispara un sync en una task aislada que
/// trabaja sobre un clone del estado y mergea al final.
pub fn run_passive_accept(&self) -> tokio::task::JoinHandle<()> {
let mut control = self.node.control.clone();
let state = Arc::clone(&self.state);
tokio::spawn(async move {
let mut incoming = control
.accept(SYNC_PROTOCOL)
.expect("only one accept handle per protocol");
while let Some((_peer, stream)) = incoming.next().await {
let state = Arc::clone(&state);
tokio::spawn(handle_incoming(stream, state));
}
})
}
/// Inicia un sync activo con un peer del que ya tenemos conexión
/// (vía `dial` previo). Toma un snapshot del estado, corre la
/// sesión, y mergea novedades al volver.
pub async fn sync_with(&self, peer_id: PeerId) -> Result<(), PeerSyncError> {
let mut control = self.node.control.clone();
let stream = control.open_stream(peer_id, SYNC_PROTOCOL).await?;
let session = self.snapshot_session().await;
let result = run_sync_async(session, stream.compat()).await?;
self.merge_back(result).await;
Ok(())
}
async fn snapshot_session(&self) -> SyncSession {
let s = self.state.lock().await;
SyncSession::new(
s.mst.clone(),
s.store.clone(),
s.attestations.clone(),
s.keypair.clone(),
)
}
async fn merge_back(&self, session: SyncSession) {
let (new_mst, new_store, new_atts) = session.into_parts();
let mut s = self.state.lock().await;
merge_into_state(&mut s, new_mst, new_store, new_atts);
}
/// Instantánea del estado actual (mst + store + attestations).
pub async fn snapshot(&self) -> (Mst, MemStore, AttestationStore) {
let s = self.state.lock().await;
(s.mst.clone(), s.store.clone(), s.attestations.clone())
}
/// Inserta un árbol directamente en el estado del peer (sin sync).
/// Si el peer está respaldado por disco, también lo persiste.
/// Anuncia automáticamente al peer como proveedor del contenido en
/// el DHT — de esa forma cualquier otro peer puede descubrirlo
/// preguntando "¿quién tiene este hash?".
/// Devuelve el `ContentHash` raíz del árbol.
pub async fn ingest(&self, node: &SemanticNode) -> ContentHash {
let mut s = self.state.lock().await;
let h = s.store.put(node);
s.mst.insert(h);
if let Some(repo) = &s.persistent {
let _ = repo.nodes.put(node);
let _ = repo.mst.insert(h);
}
drop(s);
// Anunciamos como proveedores en el DHT. Best-effort: si no
// hay peers cercanos para replicar, el record vive local hasta
// que llegue una conexión.
self.node.start_providing(&h.0);
h
}
/// Inserta una atestación en el peer. Si el peer es persistente,
/// también la escribe a disco. Falla si la firma no verifica.
pub async fn ingest_attestation(
&self,
att: minga_core::Attestation,
) -> Result<(), minga_core::AttestationError> {
let mut s = self.state.lock().await;
s.attestations.add(att.clone())?;
if let Some(repo) = &s.persistent {
let _ = repo.attestations.add(att);
}
Ok(())
}
/// Fuerza un flush del backing persistente a disco. No hace nada
/// si el peer es solo en memoria.
pub async fn flush(&self) -> Result<(), StoreError> {
let s = self.state.lock().await;
if let Some(repo) = &s.persistent {
repo.flush()?;
}
Ok(())
}
}
async fn handle_incoming(stream: Stream, state: Arc<Mutex<PeerState>>) {
let session = {
let s = state.lock().await;
SyncSession::new(
s.mst.clone(),
s.store.clone(),
s.attestations.clone(),
s.keypair.clone(),
)
};
if let Ok(result) = run_sync_async(session, stream.compat()).await {
let (new_mst, new_store, new_atts) = result.into_parts();
let mut s = state.lock().await;
merge_into_state(&mut s, new_mst, new_store, new_atts);
}
// Errores de sync se ignoran: cada sesión es independiente, una
// sesión rota no debería tumbar el peer entero. Una iteración
// futura puede contar errores para telemetría.
}
fn merge_into_state(
state: &mut PeerState,
new_mst: Mst,
new_store: MemStore,
new_atts: AttestationStore,
) {
// Write-through: cada inserción en memoria también va al backing
// persistente si existe. Errores de IO se ignoran (best-effort);
// el estado en memoria sigue siendo la fuente de verdad inmediata
// y un siguiente sync re-popula lo que se haya perdido.
for h in new_mst.iter() {
state.mst.insert(*h);
if let Some(repo) = &state.persistent {
let _ = repo.mst.insert(*h);
}
}
for (h, node) in new_store.iter() {
state.store.put_chunked(*h, node.clone());
if let Some(repo) = &state.persistent {
let _ = repo.nodes.put_chunked(*h, node);
}
}
for att in new_atts.all() {
if state.attestations.add(att.clone()).is_ok() {
// Solo persistimos las que pasaron verificación en memoria.
if let Some(repo) = &state.persistent {
let _ = repo.attestations.add(att.clone());
}
}
}
}
@@ -0,0 +1,461 @@
//! Máquina de estados de sincronización recursiva sobre la estructura
//! del MST, con verificación criptográfica de cada nodo entregado.
//!
//! La sesión es **pura**: no hace IO, no toca la red, no usa async. El
//! transporte la alimenta vía `handle(msg)` y consume sus salidas como
//! `Vec<Message>`.
//!
//! ## Algoritmo
//!
//! 1. Cada peer construye al inicio un `own_probes: HashMap<ContentHash,
//! NodeProbe>` que indexa cada nodo interno de su MST por su hash
//! Merkle de subárbol. Es la tabla con la que respondemos
//! `ProbeReq`s en `O(1)`.
//!
//! 2. Cada peer envía `Hello` con el hash de su raíz. Si el peer
//! contrario reconoce ese hash en su propio `own_probes` (o coincide
//! con su propia raíz, o es la raíz vacía), no hay nada estructural
//! que descubrir — la rama está ya alineada.
//!
//! 3. Si el hash no se reconoce, el peer emite un `ProbeReq` para
//! pedirle al otro la estructura de ese subárbol. Cuando llega el
//! `ProbeRes`, el peer:
//! - Para cada **clave** del probe que no tiene en su MST, programa
//! un `Fetch` (la clave entrará al MST cuando llegue su `Deliver`).
//! - Para cada **child_hash** del probe que no aparece en
//! `own_probes`, recurre con un nuevo `ProbeReq`. Si el child_hash
//! ya está en `own_probes`, la rama se poda — toda esa subestructura
//! es idéntica a la nuestra.
//!
//! 4. Cuando un peer recibe un `Deliver`, verifica que el hash
//! anunciado coincida con el `hash_stored` real del nodo. Si no,
//! descarta. Si sí, inserta en el `MemStore` y, si el hash venía de
//! la raíz del MST del peer (no de un descendiente), también lo
//! inserta en su MST.
//!
//! 5. Cada `StoredNode` recibido contiene los hashes de sus hijos. Si
//! el receptor no los tiene, los pide vía `Fetch` (sync transitivo).
//!
//! 6. Un peer envía `Done` cuando: emitió y recibió `Hello`, no tiene
//! probes pendientes, ni fetches pendientes (raíz o hijo). La sesión
//! cierra cuando ambos `Done`s han cruzado.
use minga_core::{
cas::ContentHash, empty_subtree_hash, hash_stored, AttestationStore, Did, Keypair, MemStore,
Mst, NodeProbe, NodeStore,
};
use rand::rngs::OsRng;
use rand::RngCore;
use std::collections::{HashMap, HashSet};
use crate::message::Message;
/// Construye el payload firmado del `Hello` con orden fijo:
/// `verifier_nonce(32) || peer_did(32) || root_subtree_hash(32) = 96 bytes`.
/// El `verifier_nonce` es el nonce que emitió el peer que verificará
/// la firma; al firmar sobre él se vincula la firma a esta sesión.
/// Cualquier cambio al formato es incompatible al protocolo.
pub(crate) fn hello_payload(
verifier_nonce: &[u8; 32],
did: &Did,
root: &ContentHash,
) -> [u8; 96] {
let mut p = [0u8; 96];
p[..32].copy_from_slice(verifier_nonce);
p[32..64].copy_from_slice(&did.0);
p[64..].copy_from_slice(&root.0);
p
}
pub struct SyncSession {
mst: Mst,
store: MemStore,
attestations: AttestationStore,
/// Llave del peer local: firma el `Hello` y queda asociada al
/// `Did` que el peer remoto verá.
keypair: Keypair,
/// Identidad del peer remoto, capturada tras verificar la firma
/// de su `Hello`.
peer_did: Option<Did>,
own_probes: HashMap<ContentHash, NodeProbe>,
own_root_subtree_hash: ContentHash,
awaited_probes: HashSet<ContentHash>,
seen_probes: HashSet<ContentHash>,
awaiting_root: HashSet<ContentHash>,
awaiting_child: HashSet<ContentHash>,
rejected_hellos: usize,
rejected_delivers: usize,
/// Contador de atestaciones rechazadas: firma rota, llegada antes
/// de autenticar al peer, o cualquier otra inconsistencia que el
/// `AttestationStore` rechace.
rejected_attests: usize,
/// Nonce aleatorio que **nosotros** emitimos en `Challenge`. La
/// firma del `Hello` del peer debe ser sobre este nonce.
self_nonce: [u8; 32],
/// Nonce que el peer publicó en su `Challenge` — sobre este
/// nonce firmamos nosotros nuestro `Hello`.
peer_nonce: Option<[u8; 32]>,
sent_challenge: bool,
received_challenge: bool,
sent_hello: bool,
received_hello: bool,
sent_attestations: bool,
sent_done: bool,
received_done: bool,
}
impl SyncSession {
pub fn new(
mst: Mst,
store: MemStore,
attestations: AttestationStore,
keypair: Keypair,
) -> Self {
let own_probes = mst.build_probe_index();
let own_root_subtree_hash = mst.root_hash();
let mut self_nonce = [0u8; 32];
OsRng.fill_bytes(&mut self_nonce);
Self {
mst,
store,
attestations,
keypair,
peer_did: None,
own_probes,
own_root_subtree_hash,
awaited_probes: HashSet::new(),
seen_probes: HashSet::new(),
awaiting_root: HashSet::new(),
awaiting_child: HashSet::new(),
rejected_hellos: 0,
rejected_delivers: 0,
rejected_attests: 0,
self_nonce,
peer_nonce: None,
sent_challenge: false,
received_challenge: false,
sent_hello: false,
received_hello: false,
sent_attestations: false,
sent_done: false,
received_done: false,
}
}
/// Conveniencia para sesiones sin atestaciones previas. Equivalente
/// a `new(mst, store, AttestationStore::new(), keypair)`.
pub fn without_attestations(mst: Mst, store: MemStore, keypair: Keypair) -> Self {
Self::new(mst, store, AttestationStore::new(), keypair)
}
/// Mensaje inicial: `Challenge` con un nonce aleatorio. El `Hello`
/// y las atestaciones llegarán como respuesta al `Challenge` del
/// otro peer (cuando lo recibamos, ya tendremos su nonce sobre el
/// que firmar nuestra identidad).
pub fn start(&mut self) -> Vec<Message> {
if self.sent_challenge {
return Vec::new();
}
self.sent_challenge = true;
let mut out = vec![Message::Challenge {
nonce: self.self_nonce,
}];
out.extend(self.maybe_done());
out
}
pub fn handle(&mut self, msg: Message) -> Vec<Message> {
let mut out = Vec::new();
match msg {
Message::Challenge { nonce } => {
if self.received_challenge {
// Challenge duplicado: ignoramos. Un peer
// legítimo no debería enviar dos.
return out;
}
self.received_challenge = true;
self.peer_nonce = Some(nonce);
// Ahora podemos firmar nuestro Hello sobre el nonce
// del peer — lo que ata la firma a esta sesión.
let payload =
hello_payload(&nonce, &self.keypair.did(), &self.own_root_subtree_hash);
let signature = self.keypair.sign(&payload);
self.sent_hello = true;
out.push(Message::Hello {
peer_did: self.keypair.did(),
root_subtree_hash: self.own_root_subtree_hash,
signature,
});
// Empuje de atestaciones: el peer ya nos verificará
// como remitente cuando reciba nuestro Hello.
let atts: Vec<_> = self.attestations.all().cloned().collect();
if !atts.is_empty() {
out.push(Message::AttestPush { attestations: atts });
}
self.sent_attestations = true;
}
Message::Hello {
peer_did,
root_subtree_hash,
signature,
} => {
// ── Autenticación del peer + anti-replay ─────────
// La firma debe ser sobre nuestro `self_nonce` (que
// emitimos en nuestro Challenge), atándola a esta
// sesión. Un Hello capturado de otra sesión tendría
// un nonce distinto y la verificación fallaría.
let payload = hello_payload(&self.self_nonce, &peer_did, &root_subtree_hash);
if !peer_did.verify(&payload, &signature) {
self.rejected_hellos += 1;
return out;
}
self.peer_did = Some(peer_did);
self.received_hello = true;
if self.should_probe(&root_subtree_hash) {
self.awaited_probes.insert(root_subtree_hash);
out.push(Message::ProbeReq {
subtree_hash: root_subtree_hash,
});
}
}
Message::ProbeReq { subtree_hash } => {
let probe = self.own_probes.get(&subtree_hash).cloned();
// Si el subárbol pedido era vacío (o desconocido para
// nosotros), respondemos con `None` — el peer lo
// tratará como un punto sin descendientes que descubrir.
out.push(Message::ProbeRes {
subtree_hash,
probe,
});
}
Message::ProbeRes {
subtree_hash,
probe,
} => {
self.awaited_probes.remove(&subtree_hash);
self.seen_probes.insert(subtree_hash);
if let Some(probe) = probe {
out.extend(self.process_probe(&probe));
}
}
Message::Fetch { hash } => {
if let Some(stored) = self.store.get(&hash).cloned() {
out.push(Message::Deliver { hash, stored });
}
// Si no lo tenemos, callamos. El peer no debería estar
// pidiéndonos algo que no le hayamos anunciado.
}
Message::Deliver { hash, stored } => {
// ── Verificación criptográfica ────────────────────
// Recomputamos el hash del nodo entregado a partir de
// sus componentes. Si no coincide con el anunciado,
// alguien (peer malicioso o ruido en transporte) está
// intentando colar contenido distinto bajo un hash que
// no le corresponde. Descartamos silenciosamente y
// contamos para diagnóstico.
if hash_stored(&stored) != hash {
self.rejected_delivers += 1;
// No tocamos awaiting_*: la solicitud sigue
// pendiente y el peer (legítimo o no) puede
// reintentarla.
return out;
}
let was_root = self.awaiting_root.remove(&hash);
self.awaiting_child.remove(&hash);
// Antes de mover `stored`, descubrimos qué hijos
// faltan y los pedimos.
let mut new_fetches = Vec::new();
for ch in &stored.children {
if !self.store.contains(ch)
&& !self.awaiting_root.contains(ch)
&& !self.awaiting_child.contains(ch)
{
self.awaiting_child.insert(*ch);
new_fetches.push(*ch);
}
}
self.store.put_chunked(hash, stored);
if was_root {
self.mst.insert(hash);
}
for h in new_fetches {
out.push(Message::Fetch { hash: h });
}
}
Message::AttestPush { attestations } => {
// Antes de procesar atestaciones del peer, exigimos
// haber autenticado su identidad. Un push antes del
// `Hello` es protocolo malformado o ataque — todas las
// atestaciones se cuentan como rechazadas.
if !self.received_hello {
self.rejected_attests += attestations.len();
return out;
}
for att in attestations {
// `AttestationStore::add` re-verifica cada firma.
// Una sola atestación corrupta no contamina las
// demás del lote.
if self.attestations.add(att).is_err() {
self.rejected_attests += 1;
}
}
}
Message::Done => {
self.received_done = true;
}
}
out.extend(self.maybe_done());
out
}
fn process_probe(&mut self, probe: &NodeProbe) -> Vec<Message> {
let mut out = Vec::new();
// Cada clave del probe que no tenemos pasa a `awaiting_root` y
// generamos un Fetch. Si ya está en el store (sin estar aún en
// el MST), simplemente la promovemos al MST sin pedirla.
for k in &probe.keys {
if self.mst.contains(k) {
continue;
}
if self.store.contains(k) {
self.mst.insert(*k);
continue;
}
if self.awaiting_root.contains(k) {
continue;
}
self.awaiting_root.insert(*k);
out.push(Message::Fetch { hash: *k });
}
// Para cada subárbol hijo, decidimos si recurrir o podar:
// - el vacío se reconoce por hash sin red,
// - los que ya tenemos en `own_probes` (igualdad de hash =
// subestructura idéntica) se podan,
// - los ya vistos o solicitados no se duplican,
// - el resto dispara un `ProbeReq` recursivo.
for ch in &probe.child_hashes {
if self.should_probe(ch) {
self.awaited_probes.insert(*ch);
out.push(Message::ProbeReq { subtree_hash: *ch });
}
}
out
}
/// Decide si vale la pena solicitar un probe sobre `h`. Cuatro
/// razones para NO pedirlo:
/// - es el subárbol vacío (lo conocemos por convención),
/// - coincide con nuestra propia raíz (igualdad estructural),
/// - aparece en `own_probes` (ya tenemos un subárbol idéntico),
/// - ya lo solicitamos o ya lo recibimos.
fn should_probe(&self, h: &ContentHash) -> bool {
if *h == empty_subtree_hash() {
return false;
}
if *h == self.own_root_subtree_hash {
return false;
}
if self.own_probes.contains_key(h) {
return false;
}
if self.awaited_probes.contains(h) || self.seen_probes.contains(h) {
return false;
}
true
}
fn maybe_done(&mut self) -> Vec<Message> {
if self.sent_done {
return Vec::new();
}
if !self.sent_challenge || !self.received_challenge {
return Vec::new();
}
if !self.sent_hello || !self.received_hello {
return Vec::new();
}
if !self.sent_attestations {
return Vec::new();
}
if !self.awaited_probes.is_empty() {
return Vec::new();
}
if !self.awaiting_root.is_empty() || !self.awaiting_child.is_empty() {
return Vec::new();
}
self.sent_done = true;
vec![Message::Done]
}
pub fn is_done(&self) -> bool {
self.sent_done && self.received_done
}
pub fn rejected_delivers(&self) -> usize {
self.rejected_delivers
}
pub fn rejected_hellos(&self) -> usize {
self.rejected_hellos
}
pub fn rejected_attests(&self) -> usize {
self.rejected_attests
}
pub fn attestations(&self) -> &AttestationStore {
&self.attestations
}
/// Identidad del peer remoto, capturada tras verificar su `Hello`.
/// `None` si todavía no llegó un `Hello` válido.
pub fn peer_did(&self) -> Option<Did> {
self.peer_did
}
pub fn local_did(&self) -> Did {
self.keypair.did()
}
/// Nonce aleatorio que esta sesión emitió en su `Challenge`.
/// Expuesto principalmente para tests y debugging — el nonce
/// viaja en claro por el wire y no es secreto.
pub fn self_nonce(&self) -> [u8; 32] {
self.self_nonce
}
pub fn mst(&self) -> &Mst {
&self.mst
}
pub fn store(&self) -> &MemStore {
&self.store
}
pub fn into_parts(self) -> (Mst, MemStore, AttestationStore) {
(self.mst, self.store, self.attestations)
}
}