Files
Sergio 53dbdf0f1d chore: monorepo inicial con arje + minga + yahweh absorbidos
Workspace en 4 ejes (core/modules/apps/shared):

- core/: 24 crates de arje (Init systemd-compatible: ente-card, ente-zero,
  ente-kernel, ente-bus, ente-cas, ente-soma, ente-wasm, ente-snapshot,
  ente-brain, ente-echo, ente-policy-provider, + 12 crates *-compat)
- modules/semantic_dht/: 5 crates de minga (minga-core con AST/CAS/MST,
  minga-p2p con libp2p Kad, minga-store, minga-vfs, minga-cli)
- modules/ui_engine/: 11 crates de yahweh (libs/{core,theme,bus,providers},
  widgets/{tree,splitter,tabs,tiled,container_core,text_input})
- apps/: 5 crates de yahweh (file_explorer, database_explorer, text_viewer,
  image_viewer, yahweh-shell)
- shared_wit/protocol.wit: handshake/lifecycle inicial

Cargo.toml unificado: thiserror bumped a 2 (transparente para arje), tokio
"full", paths intra-workspace de yahweh redirigidos a su nueva ubicación.

cargo check --workspace: 0 errores, 17 warnings (dead code preexistente).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 04:45:44 +00:00

162 lines
5.8 KiB
Rust

//! Test de integración real con libp2p.
//!
//! Dos `LibP2pNode`s independientes en localhost:
//! - cada uno con su propia identidad libp2p,
//! - conectados por TCP (con cifrado Noise + multiplexado Yamux),
//! - intercambiando una sesión completa de sync vía bidirectional
//! streams sobre el protocolo `/minga/sync/1.0.0`.
//!
//! Lo único que el wire añade respecto al harness in-memory es el
//! transporte. La lógica del protocolo y el state machine son los
//! mismos — eso es exactamente lo que queríamos demostrar.
use std::time::Duration;
use futures::StreamExt;
use minga_core::{parse, ContentHash, Keypair, MemStore, Mst, NodeStore};
use minga_p2p::{run_sync_async, LibP2pNode, SyncSession, SYNC_PROTOCOL};
use tokio_util::compat::FuturesAsyncReadCompatExt;
fn kp(seed: u8) -> Keypair {
Keypair::from_seed(&[seed; 32])
}
fn build_repo(sources: &[&str]) -> (Mst, MemStore, Vec<ContentHash>) {
let mut mst = Mst::new();
let mut store = MemStore::new();
let mut roots = Vec::new();
for src in sources {
let n = parse::rust(src).unwrap();
let h = store.put(&n);
mst.insert(h);
roots.push(h);
}
(mst, store, roots)
}
#[tokio::test]
async fn libp2p_sync_two_peers_over_tcp() {
let node_a = LibP2pNode::new().unwrap();
let node_b = LibP2pNode::new().unwrap();
let peer_b = node_b.peer_id;
// Solo B necesita escuchar; A inicia el dial.
let addr_b = node_b
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await;
// B acepta streams del protocolo Minga en una tarea.
let only_b_sources = &["fn from_b(x: i32) -> i32 { x + 1 }"];
let (mst_b, store_b, _) = build_repo(only_b_sources);
let session_b = SyncSession::without_attestations(mst_b, store_b, kp(2));
let mut control_b = node_b.control.clone();
let task_b = tokio::spawn(async move {
let mut incoming = control_b.accept(SYNC_PROTOCOL).unwrap();
let (_peer, stream) = incoming.next().await.expect("incoming stream");
run_sync_async(session_b, stream.compat()).await
});
// A dializa B y abre stream. Reintenta hasta que la conexión esté
// arriba (puede tardar unos ms el handshake Noise+Yamux).
node_a.dial(addr_b);
let mut control_a = node_a.control.clone();
let stream_a = {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
match control_a.open_stream(peer_b, SYNC_PROTOCOL).await {
Ok(s) => break s,
Err(_) if std::time::Instant::now() < deadline => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => panic!("no se pudo abrir stream tras 5s: {e:?}"),
}
}
};
let only_a_sources = &["fn from_a() -> i32 { 0 }"];
let (mst_a, store_a, _) = build_repo(only_a_sources);
let session_a = SyncSession::without_attestations(mst_a, store_a, kp(1));
let task_a = tokio::spawn(async move { run_sync_async(session_a, stream_a.compat()).await });
let result_a = task_a.await.expect("task A").expect("sync A");
let result_b = task_b.await.expect("task B").expect("sync B");
// Convergencia tras viajar sobre TCP real.
assert_eq!(result_a.mst().root_hash(), result_b.mst().root_hash());
assert_eq!(result_a.mst().len(), 2);
assert_eq!(result_b.mst().len(), 2);
// Cada peer terminó con la identidad libp2p del otro autenticada.
// (Las identidades libp2p no son las mismas que los DIDs Minga —
// las primeras autentican el canal, los segundos firman contenido.)
assert!(result_a.peer_did().is_some());
assert!(result_b.peer_did().is_some());
}
#[tokio::test]
async fn libp2p_sync_with_attestations() {
use minga_core::{Attestation, AttestationStore};
let node_a = LibP2pNode::new().unwrap();
let node_b = LibP2pNode::new().unwrap();
let peer_b = node_b.peer_id;
let addr_b = node_b
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await;
let kp_a = kp(10);
let kp_b = kp(20);
let (mst_a, store_a, roots_a) = build_repo(&["fn signed_by_a() -> i32 { 1 }"]);
let (mst_b, store_b, roots_b) = build_repo(&["fn signed_by_b() -> i32 { 2 }"]);
let mut atts_a = AttestationStore::new();
atts_a.add(Attestation::create(&kp_a, roots_a[0])).unwrap();
let mut atts_b = AttestationStore::new();
atts_b.add(Attestation::create(&kp_b, roots_b[0])).unwrap();
let session_a = SyncSession::new(mst_a, store_a, atts_a, kp_a.clone());
let session_b = SyncSession::new(mst_b, store_b, atts_b, kp_b.clone());
let mut control_b = node_b.control.clone();
let task_b = tokio::spawn(async move {
let mut incoming = control_b.accept(SYNC_PROTOCOL).unwrap();
let (_peer, stream) = incoming.next().await.expect("incoming stream");
run_sync_async(session_b, stream.compat()).await
});
node_a.dial(addr_b);
let mut control_a = node_a.control.clone();
let stream_a = {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
match control_a.open_stream(peer_b, SYNC_PROTOCOL).await {
Ok(s) => break s,
Err(_) if std::time::Instant::now() < deadline => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => panic!("no se pudo abrir stream: {e:?}"),
}
}
};
let task_a = tokio::spawn(async move { run_sync_async(session_a, stream_a.compat()).await });
let result_a = task_a.await.unwrap().unwrap();
let result_b = task_b.await.unwrap().unwrap();
// Atestaciones cruzaron criptográficamente verificadas.
assert_eq!(
result_a.attestations().authors_of(&roots_b[0]),
vec![kp_b.did()]
);
assert_eq!(
result_b.attestations().authors_of(&roots_a[0]),
vec![kp_a.did()]
);
}