4b8bd389c9a008efe640296f940cab5a57983d73
7 Commits
| Author | SHA1 | Message | Date | |
|---|---|---|---|---|
|
|
d98a2b6b7c |
feat(brahman-handshake+ente-zero): denylist + hot reload de policy de peers
Consolida PeerAllowlist + nueva denylist en un unico PeerPolicy con
allow + deny + hot reload via notify. Cubre los dos pendientes
documentados en el commit anterior y simplifica la API hacia un solo
punto de entrada.
API consolidada en brahman_handshake::peer_policy:
- PeerPolicy::open() — todo permitido (default).
- PeerPolicy::from_sets(allow, deny) — politica inline para tests.
- PeerPolicy::from_files(allow_path?, deny_path?) — carga ambos
archivos opcionales.
- PeerPolicy::evaluate(peer) -> Decision { Admit | DeniedByDenylist
| NotInAllowlist }. Decision lleva reason() para logging.
- PeerPolicy::reload() — recarga atomica desde paths asociados.
Si un archivo falla, conserva la version anterior (un typo no
baja la politica activa).
- PeerPolicy::spawn_watcher() -> JoinHandle — vigila los archivos
via notify, debounce 250ms (coalesce de eventos por save), recarga
atomica al detectar cambio.
Orden de evaluacion: deny-first.
1. peer in denylist -> DeniedByDenylist.
2. allowlist set y peer no in allowlist -> NotInAllowlist.
3. resto -> Admit.
Deny gana sobre allow (un peer en ambas es rechazado): la denylist
es la primitiva de "kill switch".
Watcher: vigila el directorio padre del archivo, no el archivo
mismo. Razon: editores tipicos hacen rename-and-replace que rompe
el watch del archivo pero no del dir. Filtra eventos por path al
procesar.
Wire en server: ServerConfig.allowlist -> ServerConfig.policy:
Option<PeerPolicy> (rename, scope local).
Wire en Arje (ente-zero): nueva env BRAHMAN_PEER_DENYLIST complementa
BRAHMAN_PEER_ALLOWLIST. setup_brahman_policy carga + spawn watcher
y devuelve (policy, JoinHandle) — el handle se conserva en main
para que el thread no aborte.
Activacion completa con todas las capas:
BRAHMAN_LISTEN_MULTIADDR=/ip4/0.0.0.0/tcp/4101 \\
BRAHMAN_PEER_ALLOWLIST=/etc/brahman/allow.txt \\
BRAHMAN_PEER_DENYLIST=/etc/brahman/deny.txt \\
ente-zero
# Editar deny.txt en caliente entra en efecto en ~250ms sin restart.
Tests: 10 unit en peer_policy (incluido watcher_reloads_on_file_change
con notify real) + 1 E2E nuevo libp2p_handshake_denylist_blocks_
listed_peer. 30 tests verdes en brahman-handshake. Sin regresion en
ente-zero.
Lo que cierra: politica completa (open/allow/deny/both), hot reload
sin restart, atomicidad de la recarga, resiliencia ante typos.
Pendientes futuros: aplicar policy a nivel de swarm via
libp2p_allow_block_list::Behaviour (rechazar antes del Noise
handshake), rotacion de keypair sin perder peer_id.
|
||
|
|
2059af4fb9 |
feat(brahman-handshake): Fase 2 — discovery remoto via DHT por flow type
Tercer paso del plan "el encuentro entre Entes no se restringe a
local". Cuando un Init local acepta una sesion cuya Card declara
outputs, anuncia al DHT (Kademlia, via brahman-net) que el provee
esos flow types. Cualquier nodo conectado al mismo DHT puede
consultar y obtener la lista de PeerId's que sirven el flow.
API nueva en brahman_handshake::network:
- flow_dht_key(flow_name, type_ref) -> [u8; 32]: blake3 hash de
"brahman-flow|v1|{flow}|{type_canon}". Determinista cross-host.
Cambiar la canonicalizacion rompe compatibilidad — el prefijo v1
documenta version del esquema y obliga a bump al modificar.
- announce_outputs(net, card): start_providing por cada flow.output.
Idempotente, fire-and-forget.
- find_remote_providers(net, flow_name, type_ref) -> Vec<PeerId>:
query DHT. Lista vacia si nadie anuncia.
Wire en el server:
- ServerConfig gana pub net: Option<Arc<BrahmanNet>>. Si esta set,
cada Card registrada con outputs se anuncia automaticamente al DHT
desde register_session. None = server "ciego al DHT".
- Debug manual de ServerConfig (BrahmanNet no es Debug).
Canonicalizacion del TypeRef:
- Primitive { name } -> "prim:{name}"
- Wit { package, interface, name } -> "wit:{package}#{interface_or_empty}#{name}"
Tests: 2 nuevos en tests/network_discovery.rs:
- dht_discovery_finds_remote_provider: 2 nodos, A registra Card con
flow.output = monad-list:json, B dial-ea a A, B llama
find_remote_providers y descubre el peer_id de A.
- dht_discovery_negative_unknown_flow: B busca flow inexistente,
devuelve [] sin colgarse.
Callers actualizados con net: None: tests existentes + ente-zero
(arje aun no expone red; pasar Some(Arc<BrahmanNet>) cuando quiera
publicar al DHT remoto).
Lo que esto desbloquea: un nouser daemon en maquina A puede ser
descubierto por nouser-explorer en maquina B sin conocimiento previo
del peer — solo necesitan compartir DHT (via bootstrap inicial).
Pendiente para Fase 3: trust (firma Ed25519 en Cards remotas) +
stop_providing al cleanup de sesion.
|
||
|
|
73dadbb166 |
feat(brahman-handshake): Fase 1 — handshake brahman sobre stream libp2p
Segundo paso del plan "el encuentro entre Entes no se restringe a
local". El protocolo brahman (Hello / HelloAck / Ping / Pong /
MatchEvent / Farewell, frames postcard length-prefixed) ahora tambien
viaja sobre streams libp2p de la malla brahman-net — el mismo Init
acepta sesiones por Unix socket Y por libp2p indistintamente, y un
consumer remoto puede dial-ar al multiaddr y completar handshake.
Cambios:
- Session<S> y Client<S> genericos: ambos dejan de estar atados a
UnixStream y pasan a ser genericos sobre S: AsyncRead + AsyncWrite
+ Unpin + Send + 'static. El path Unix queda como
Client = Client<UnixStream> (default generico). Constructores
nuevos: Server::session_from_stream(stream),
Client::connect_with_stream(stream, card, wit).
- Refactor del post-handshake con split: tokio::select! sobre
&mut self.stream requeria S: Sync indirectamente, y libp2p::Stream
no es Sync. Reemplazado por tokio::io::split(stream) -> reader loop
principal + writer task separada que drena el push channel. Writer
compartido bajo Arc<Mutex<WriteHalf<S>>> para serializar Pong/Error
inline con los MatchEvents pusheados. Cleanup garantizado en todas
las ramas. La logica del post-handshake migra a funciones libres
(run_post_handshake, handle_inbound_frame, cleanup,
broadcast_match_diffs, do_handshake, register_session,
validate_hello).
- Nuevo modulo brahman-handshake::network:
- BRAHMAN_HANDSHAKE_PROTOCOL = "/brahman/handshake/1.0.0"
- LibP2pHandshakeStream = Compat<libp2p::Stream>
- run_libp2p_accept_loop(server, net): accept loop que delega cada
stream entrante a session_from_stream(stream.compat()). Sesiones
libp2p y Unix conviven en el mismo Server — comparten broker,
push table, last_matches.
- connect_libp2p(net, peer, card, wit): abre stream libp2p al peer
y arranca handshake.
- NetworkError tipado.
Deps: brahman-handshake gana brahman-net, futures, tokio-util.
brahman-net re-exporta Multiaddr, PeerId, Stream, StreamProtocol,
Protocol, OpenStreamError para que callers no necesiten dep directa
a libp2p.
Tests: 9 verdes en el path Unix (sin regresion). Nuevo
tests/network_libp2p.rs E2E que arma server con BrahmanNet, hace
listen TCP, monta accept loop; cliente con su propio BrahmanNet
dial-ea al peer_id, completa handshake remoto, ping, farewell.
Verifica que la sesion se registro durante la conversacion y se
removio tras farewell.
|
||
|
|
8a83a26de0 |
feat(handshake): notificación push de matches del broker al cliente
El servidor empuja MatchEvent (Available | Lost) a los consumers cuando
sus inputs cambian de match — sea porque un productor llegó, porque
otro mejor lo desplazó, o porque desapareció.
Mecánica:
- Frame::MatchEvent con MatchEventKind { Available, Lost } y los datos
del match (consumer_flow, producer_session/label/flow, ty, via, pinned).
- Server: SessionTxTable (Arc<Mutex<HashMap<SessionId, mpsc::Sender>>>)
+ LastMatches (último match conocido por consumer/input). En cada
register/unregister, broadcast_match_diffs recomputa con el broker
y emite SOLO los diffs respecto al estado anterior.
- Session::run_post_handshake usa tokio::select! para multiplexar
read_frame del cliente y rx.recv() de su tx push.
- Cleanup ahora también limpia push_table y last_matches y dispara un
broadcast (para notificar a quienes pierden el match).
- Client: VecDeque<MatchEvent> bufferea eventos que llegan mezclados
con respuestas a Ping. API:
- take_event() — non-blocking, drena buffer
- await_event(timeout) — bloquea hasta evento o timeout
- ping() ahora drena MatchEvents intermedios hasta encontrar el Pong.
Capacity del canal push por sesión: 32 frames (try_send no-blocking;
si se llena, los eventos extra se descartan — se documenta como
ephemeral, el cliente puede re-consultar via brahman-status).
Test nuevo en brahman-handshake/tests/handshake.rs:
- match_event_pushed_on_producer_arrival: consumer espera, no recibe
evento → llega productor → recibe Available → productor se va →
recibe Lost.
Example nuevo: brahman-handshake/examples/subscriber.rs — cliente que
loguea cada MatchEvent en tiempo real. Útil para ver la dinámica del
broker. Pings cada 25s para keepalive.
Demo end-to-end verificada (4 eventos, 3 ya cubren el ciclo completo):
T+0.3 alpha llega → Available ← demo.alpha.out
T+0.8 beta llega → (sin evento: alpha gana por orden alfabético)
T+1.3 alpha killed → Available ← demo.beta.out (re-evaluación)
T+1.8 beta killed → Lost ← <none>
El broker emite diff: ningún evento cuando un nuevo productor llega
sin desplazar al ganador actual.
Tests: 28/28 (handshake integ 6→7). cargo check --workspace: 0 errores.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
||
|
|
df9d10cc52 |
feat(ente-zero): enchufa el handshake server al Init real
ente-zero (PID 1 del fractal arje) ahora levanta el server de
brahman-handshake junto al ente-bus existente, escuchando en
\$BRAHMAN_INIT_SOCKET (default \$XDG_RUNTIME_DIR/brahman-init.sock).
Es un canal paralelo dedicado a módulos brahman-conscientes que se
presentan con una Card y declaran flujos tipados.
Cambios:
- crates/core/brahman-handshake/src/transport.rs: helper nuevo con
resolución XDG_RUNTIME_DIR → TMPDIR, override por var de entorno
BRAHMAN_INIT_SOCKET. Test unitario para el override.
- crates/core/brahman-handshake/Cargo.toml: example "probe" + dev-dep
anyhow. Probe sirve como herramienta de diagnóstico para conectar
contra cualquier server vivo.
- crates/core/brahman-handshake/examples/probe.rs: cliente mínimo que
hace Hello → Ping → Farewell e imprime el HelloAck recibido.
- crates/core/ente-zero/Cargo.toml: dependencias brahman-handshake
+ brahman-broker.
- crates/core/ente-zero/src/main.rs: en primordial_loop, tras spawn
del ente-bus, crea Arc<Mutex<Broker>> compartido y llama
Server::bind. Si el bind falla (FS no escribible, socket en uso),
loggea y degrada a "modo bus-only" — la doctrina PID 1 no rompe por
subsistemas opcionales (mismo patrón que uevents).
Validación end-to-end manual:
$ BRAHMAN_INIT_SOCKET=/tmp/e2e.sock ./target/debug/ente-zero &
$ BRAHMAN_INIT_SOCKET=/tmp/e2e.sock cargo run --example probe
HelloAck: session=01KR41Q8... server=0.1.0 protocol=0.1.0 init_attached=true
Pong: ts=1778252489714ms
Farewell OK
Tests: 27/27 (broker 11 + card 8 + handshake codec+transport 2 + integ 6).
cargo check --workspace: 0 errores.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
||
|
|
07d77a335f |
feat(handshake): integra el broker con el ciclo de sesiones
ServerConfig acepta un Option<Arc<Mutex<Broker>>> compartido. Cuando está
presente, el servidor lo mantiene en sincronía con las sesiones:
- Tras un Hello aceptado, register_session indexa la Card en el broker
ANTES de insertar en el SessionRegistry y de emitir HelloAck.
- Al cerrar la sesión (Farewell, EOF, o error en run_post_handshake), un
cleanup() unificado llama unregister en el broker y remove en el
SessionRegistry. Garantizado por refactor de Session::handle a
do_handshake → run_post_handshake → cleanup.
Tests nuevos en handshake.rs:
- broker_registers_and_unregisters_with_session: confirma el ciclo
register → farewell → unregister.
- broker_matches_two_live_modules: dos clientes (productor + consumidor)
conectados; el broker resuelve find_producer_for(consumer.session, "in")
→ producer "dht". Tras farewell del productor, el match desaparece.
Fix colateral: brahman-card::TypeRef pasa de internally-tagged
(#[serde(tag = "kind")]) a externally-tagged (default). Postcard no
soporta internally-tagged en formatos no self-describing — sin este
cambio el wire de Hello con Cards que tengan flujos no codificaba.
JSON cambia de {"kind":"primitive","name":"x"} a
{"primitive":{"name":"x"}}. Documentado en el doc-comment de TypeRef.
26/26 tests verdes (broker 11 + card 8 + handshake codec 1 + integ 6).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
||
|
|
814390feec |
feat(core): brahman-handshake — protocolo runtime Init↔módulo
Crate nuevo en crates/core/brahman-handshake que implementa el handshake real del shared_wit/protocol.wit como wire format Rust↔Rust sobre Unix socket con frames length-prefixed + cuerpo postcard. Componentes: - src/messages.rs: Hello, HelloAck, Ping, Pong, Farewell, HandshakeError y Frame (enum-suma). HandshakeError ya implementa thiserror::Error y cruza el wire. - src/codec.rs: write_frame / read_frame asíncronos con MAX_FRAME_BYTES de 4 MiB. Test interno de roundtrip. - src/server.rs: Server::bind crea el listener en Unix socket; emite ResolvedCard tras validar la Card y devuelve ULID como SessionId. ServerConfig.init_attached se reporta en HelloAck. - src/client.rs: Client::connect hace pre-validación local de la Card (fail fast), envía Hello, parsea HelloAck. ping() y farewell() expuestos. - tests/handshake.rs: 4 tests de integración: * full_handshake_roundtrip — happy path con 3 pings + farewell * rejects_invalid_card_client_side — label vacío rechazado pre-envío * server_rejects_protocol_mismatch — protocol_version 999.0.0 → Error * ping_before_hello_rejected — Ping sin Hello previo → Rejected Limitación conocida: postcard no serializa serde_json::Value (variantes Array/Object con length dinámico). Se removieron por eso los campos `extensions` (Card) y `extra` (Permissions). Forward-compat queda cubierta por schema_version + protocol_version negotiation; si más adelante necesitamos preservar campos JSON desconocidos, irá en un WireCard separado o un envelope. 13/13 tests verdes (brahman-card 8 + brahman-handshake codec 1 + integ 4). cargo check --workspace: 0 errores. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> |