feat(brahman-handshake+ente-zero): denylist + hot reload de policy de peers

Consolida PeerAllowlist + nueva denylist en un unico PeerPolicy con
allow + deny + hot reload via notify. Cubre los dos pendientes
documentados en el commit anterior y simplifica la API hacia un solo
punto de entrada.

API consolidada en brahman_handshake::peer_policy:
- PeerPolicy::open() — todo permitido (default).
- PeerPolicy::from_sets(allow, deny) — politica inline para tests.
- PeerPolicy::from_files(allow_path?, deny_path?) — carga ambos
  archivos opcionales.
- PeerPolicy::evaluate(peer) -> Decision { Admit | DeniedByDenylist
  | NotInAllowlist }. Decision lleva reason() para logging.
- PeerPolicy::reload() — recarga atomica desde paths asociados.
  Si un archivo falla, conserva la version anterior (un typo no
  baja la politica activa).
- PeerPolicy::spawn_watcher() -> JoinHandle — vigila los archivos
  via notify, debounce 250ms (coalesce de eventos por save), recarga
  atomica al detectar cambio.

Orden de evaluacion: deny-first.
1. peer in denylist -> DeniedByDenylist.
2. allowlist set y peer no in allowlist -> NotInAllowlist.
3. resto -> Admit.

Deny gana sobre allow (un peer en ambas es rechazado): la denylist
es la primitiva de "kill switch".

Watcher: vigila el directorio padre del archivo, no el archivo
mismo. Razon: editores tipicos hacen rename-and-replace que rompe
el watch del archivo pero no del dir. Filtra eventos por path al
procesar.

Wire en server: ServerConfig.allowlist -> ServerConfig.policy:
Option<PeerPolicy> (rename, scope local).

Wire en Arje (ente-zero): nueva env BRAHMAN_PEER_DENYLIST complementa
BRAHMAN_PEER_ALLOWLIST. setup_brahman_policy carga + spawn watcher
y devuelve (policy, JoinHandle) — el handle se conserva en main
para que el thread no aborte.

Activacion completa con todas las capas:
  BRAHMAN_LISTEN_MULTIADDR=/ip4/0.0.0.0/tcp/4101 \\
  BRAHMAN_PEER_ALLOWLIST=/etc/brahman/allow.txt \\
  BRAHMAN_PEER_DENYLIST=/etc/brahman/deny.txt \\
  ente-zero
# Editar deny.txt en caliente entra en efecto en ~250ms sin restart.

Tests: 10 unit en peer_policy (incluido watcher_reloads_on_file_change
con notify real) + 1 E2E nuevo libp2p_handshake_denylist_blocks_
listed_peer. 30 tests verdes en brahman-handshake. Sin regresion en
ente-zero.

Lo que cierra: politica completa (open/allow/deny/both), hot reload
sin restart, atomicidad de la recarga, resiliencia ante typos.

Pendientes futuros: aplicar policy a nivel de swarm via
libp2p_allow_block_list::Behaviour (rechazar antes del Noise
handshake), rotacion de keypair sin perder peer_id.
This commit is contained in:
Sergio
2026-05-09 15:35:00 +00:00
parent 505748dd41
commit d98a2b6b7c
11 changed files with 796 additions and 259 deletions
+1
View File
@@ -14,6 +14,7 @@ brahman-broker = { path = "../brahman-broker" }
brahman-net = { path = "../../shared/brahman-net" }
blake3 = { workspace = true }
futures = { workspace = true }
notify = { workspace = true }
serde = { workspace = true }
postcard = { workspace = true }
tokio = { workspace = true }
+1 -1
View File
@@ -22,7 +22,7 @@ pub mod messages;
pub mod server;
pub mod client;
pub mod network;
pub mod peer_allowlist;
pub mod peer_policy;
pub mod signature;
pub mod transport;
@@ -1,198 +0,0 @@
//! Allowlist explícita de `PeerId`s para el trust gate remoto.
//!
//! Capa de política sobre el trust criptográfico de Fase 3
//! (firma Ed25519 anclada al peer libp2p). Hoy cualquier peer con
//! keypair Ed25519 válida pasa el handshake; con allowlist activa,
//! sólo los peers explícitamente listados.
//!
//! Aplica únicamente al path libp2p — el path Unix sigue usando
//! `SO_PEERCRED` del kernel para autenticación local. Sin allowlist
//! configurada, comportamiento abierto (compatible con todo lo
//! anterior).
//!
//! ## Formato del archivo
//!
//! Texto plano, una entrada por línea:
//! - `PeerId` en formato base58 (canónico de libp2p, lo que muestra
//! `peer_id.to_string()` y lo que aparece en multiaddrs `/p2p/...`).
//! - Líneas vacías y líneas que empiezan con `#` se ignoran.
//!
//! Ejemplo:
//!
//! ```text
//! # allowlist brahman para máquina prod-eu-1
//! 12D3KooWFooBarBazFooBarBazFooBarBazFooBarBazFooBarBaz
//! # operador secundario
//! 12D3KooWQuxQuxQuxQuxQuxQuxQuxQuxQuxQuxQuxQuxQuxQuxQux
//! ```
use std::collections::BTreeSet;
use std::path::Path;
use brahman_net::PeerId;
/// Política de admisión por `PeerId` para conexiones libp2p.
///
/// **Nota**: la allowlist sólo se evalúa cuando el server tiene
/// `expected_peer = Some(...)` (path libp2p). El path Unix la
/// ignora — autenticación local va por SO_PEERCRED.
#[derive(Debug, Clone)]
pub struct PeerAllowlist {
allowed: BTreeSet<PeerId>,
}
#[derive(Debug, thiserror::Error)]
pub enum AllowlistError {
#[error("leer allowlist en {path}: {source}")]
Io {
path: std::path::PathBuf,
#[source]
source: std::io::Error,
},
#[error("línea {line_no} de {path}: PeerId inválido '{value}'")]
InvalidPeerId {
path: std::path::PathBuf,
line_no: usize,
value: String,
},
}
impl PeerAllowlist {
/// Construye una allowlist a partir de un iterable de `PeerId`s.
/// Útil para tests o para listas hardcodeadas.
pub fn from_iter<I>(peers: I) -> Self
where
I: IntoIterator<Item = PeerId>,
{
Self {
allowed: peers.into_iter().collect(),
}
}
/// Carga la allowlist desde un archivo. Cada línea no-vacía y no
/// comentario debe ser un `PeerId` parseable. Errores incluyen
/// el número de línea para facilitar el debug.
pub fn from_file(path: impl AsRef<Path>) -> Result<Self, AllowlistError> {
let path = path.as_ref();
let contents = std::fs::read_to_string(path).map_err(|e| AllowlistError::Io {
path: path.to_path_buf(),
source: e,
})?;
let mut allowed = BTreeSet::new();
for (idx, raw) in contents.lines().enumerate() {
let line_no = idx + 1;
let trimmed = raw.split('#').next().unwrap_or("").trim();
if trimmed.is_empty() {
continue;
}
let peer = trimmed
.parse::<PeerId>()
.map_err(|_| AllowlistError::InvalidPeerId {
path: path.to_path_buf(),
line_no,
value: trimmed.to_string(),
})?;
allowed.insert(peer);
}
Ok(Self { allowed })
}
/// Indica si `peer` está en la lista. Si la lista está vacía,
/// devuelve `false` (la "lista vacía" no es lo mismo que "sin
/// política" — para sin política, no construyas un `PeerAllowlist`
/// y dejá `ServerConfig.allowlist = None`).
pub fn is_allowed(&self, peer: &PeerId) -> bool {
self.allowed.contains(peer)
}
/// Cantidad de peers en la lista. Útil para logs.
pub fn len(&self) -> usize {
self.allowed.len()
}
/// `true` si no hay ningún peer en la lista.
pub fn is_empty(&self) -> bool {
self.allowed.is_empty()
}
/// Itera los `PeerId`s permitidos en orden determinístico.
pub fn iter(&self) -> impl Iterator<Item = &PeerId> {
self.allowed.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
use brahman_net::Keypair;
use tempfile::TempDir;
fn fresh_peer() -> PeerId {
Keypair::generate_ed25519().public().to_peer_id()
}
#[test]
fn from_iter_and_is_allowed() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let p3 = fresh_peer();
let list = PeerAllowlist::from_iter([p1, p2]);
assert!(list.is_allowed(&p1));
assert!(list.is_allowed(&p2));
assert!(!list.is_allowed(&p3));
assert_eq!(list.len(), 2);
}
#[test]
fn from_file_parses_clean() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("allow.txt");
let content = format!(
"# header comment\n\n{}\n # indented comment ignored\n{}\n\n",
p1, p2
);
std::fs::write(&path, content).unwrap();
let list = PeerAllowlist::from_file(&path).unwrap();
assert_eq!(list.len(), 2);
assert!(list.is_allowed(&p1));
assert!(list.is_allowed(&p2));
}
#[test]
fn from_file_supports_inline_comment() {
let p1 = fresh_peer();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("allow.txt");
let content = format!("{} # operador foo\n", p1);
std::fs::write(&path, content).unwrap();
let list = PeerAllowlist::from_file(&path).unwrap();
assert!(list.is_allowed(&p1));
}
#[test]
fn from_file_rejects_invalid_peer_id() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("bad.txt");
std::fs::write(&path, "not-a-real-peer-id\n").unwrap();
let err = PeerAllowlist::from_file(&path).unwrap_err();
match err {
AllowlistError::InvalidPeerId { line_no, .. } => assert_eq!(line_no, 1),
other => panic!("wrong error: {other:?}"),
}
}
#[test]
fn from_file_missing_returns_io_error() {
let err = PeerAllowlist::from_file("/no/such/path/allow.txt").unwrap_err();
assert!(matches!(err, AllowlistError::Io { .. }));
}
#[test]
fn empty_list_rejects_everything() {
let list = PeerAllowlist::from_iter(std::iter::empty());
assert!(list.is_empty());
assert!(!list.is_allowed(&fresh_peer()));
}
}
@@ -0,0 +1,521 @@
//! Política de admisión de peers libp2p: allowlist + denylist con hot
//! reload opcional.
//!
//! Capa de política sobre el trust criptográfico de Fase 3. Combina:
//!
//! - **Denylist**: peers explícitamente baneados. Si está, deny gana.
//! - **Allowlist**: si está set, sólo los peers listados pasan.
//! Si no está set, modo abierto (todo peer Ed25519-válido pasa,
//! sujeto sólo a denylist).
//!
//! Sin denylist y sin allowlist → modo totalmente abierto (compat
//! con todo lo anterior). Con allowlist y denylist a la vez, el
//! orden de evaluación es: deny first → allow check → admit.
//!
//! Aplica únicamente al path libp2p — el path Unix usa SO_PEERCRED
//! del kernel para autenticación local, no PeerId.
//!
//! ## Hot reload
//!
//! Si la política se construyó con [`PeerPolicy::watch_files`], un
//! thread dedicado vigila los archivos de allow/deny vía `notify`.
//! Cualquier cambio (write, create, modify, remove) dispara una
//! recarga atómica con debounce de 250ms (los editores típicos
//! producen varios eventos por save).
//!
//! Errores de reload (parse fallido, archivo eliminado) se loggean
//! pero NO bajan la política existente — aceptamos la versión
//! anterior hasta que el archivo vuelva a parsearse limpio. Esto
//! evita que un error de tipeo deje al Init en modo inseguro.
//!
//! ## Formato del archivo
//!
//! Idéntico para allow y deny: PeerId base58 por línea, `#` para
//! comentarios (línea entera o inline), líneas vacías ignoradas.
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use brahman_net::PeerId;
use tracing::{debug, info, warn};
/// Política de admisión combinada (allow + deny). Clone barato (todos
/// los campos son Arc o referencias inmutables).
#[derive(Clone)]
pub struct PeerPolicy {
inner: Arc<RwLock<PolicyInner>>,
paths: Arc<PolicyPaths>,
}
#[derive(Default)]
struct PolicyInner {
/// `Some(set)`: sólo peers en el set pasan. `None`: modo abierto.
allow: Option<BTreeSet<PeerId>>,
/// Peers baneados. Vacío = sin denylist.
deny: BTreeSet<PeerId>,
}
#[derive(Default)]
struct PolicyPaths {
allow_path: Option<PathBuf>,
deny_path: Option<PathBuf>,
}
/// Decisión del gate de política para un peer dado.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Decision {
/// El peer es admitido (no está en deny y, si hay allow, está en allow).
Admit,
/// El peer está explícitamente en la denylist.
DeniedByDenylist,
/// Hay allowlist configurada y el peer no está en ella.
NotInAllowlist,
}
impl Decision {
pub fn is_admitted(self) -> bool {
matches!(self, Decision::Admit)
}
pub fn reason(self) -> &'static str {
match self {
Decision::Admit => "admit",
Decision::DeniedByDenylist => "explicitly denied",
Decision::NotInAllowlist => "not in allowlist",
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PolicyError {
#[error("leer política en {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("línea {line_no} de {path}: PeerId inválido '{value}'")]
InvalidPeerId {
path: PathBuf,
line_no: usize,
value: String,
},
}
impl PeerPolicy {
/// Política totalmente abierta: todo peer pasa. Útil como default
/// cuando no hay archivos configurados.
pub fn open() -> Self {
Self {
inner: Arc::new(RwLock::new(PolicyInner::default())),
paths: Arc::new(PolicyPaths::default()),
}
}
/// Construye una política inline con sets explícitos. Sin paths
/// asociados, así que `reload` y `watch_files` son no-ops.
pub fn from_sets(allow: Option<BTreeSet<PeerId>>, deny: BTreeSet<PeerId>) -> Self {
Self {
inner: Arc::new(RwLock::new(PolicyInner { allow, deny })),
paths: Arc::new(PolicyPaths::default()),
}
}
/// Carga política desde archivos. Cada path es opcional: `None`
/// significa "esa lista no aplica" (allow=None ⇒ modo abierto;
/// deny=None ⇒ sin baneados). Asocia los paths internamente para
/// que `reload` y `watch_files` los re-lean.
pub fn from_files(
allow_path: Option<&Path>,
deny_path: Option<&Path>,
) -> Result<Self, PolicyError> {
let allow = match allow_path {
Some(p) => Some(parse_peer_set(p)?),
None => None,
};
let deny = match deny_path {
Some(p) => parse_peer_set(p)?,
None => BTreeSet::new(),
};
Ok(Self {
inner: Arc::new(RwLock::new(PolicyInner { allow, deny })),
paths: Arc::new(PolicyPaths {
allow_path: allow_path.map(Path::to_path_buf),
deny_path: deny_path.map(Path::to_path_buf),
}),
})
}
/// Evalúa si `peer` puede registrarse. Toma read lock — barato,
/// concurrente, sin awaits.
pub fn evaluate(&self, peer: &PeerId) -> Decision {
let inner = match self.inner.read() {
Ok(g) => g,
Err(_) => {
// Lock envenenado: degrada a "deny por seguridad".
warn!("policy lock envenenado — deny por defecto");
return Decision::DeniedByDenylist;
}
};
if inner.deny.contains(peer) {
return Decision::DeniedByDenylist;
}
if let Some(allow) = &inner.allow {
if !allow.contains(peer) {
return Decision::NotInAllowlist;
}
}
Decision::Admit
}
/// Tamaño actual de cada lista, para logging. Tupla `(allow_count,
/// deny_count)`. `allow_count = None` significa "modo abierto"
/// (sin allowlist).
pub fn sizes(&self) -> (Option<usize>, usize) {
match self.inner.read() {
Ok(g) => (g.allow.as_ref().map(|s| s.len()), g.deny.len()),
Err(_) => (Some(0), 0),
}
}
/// Recarga atómica desde los paths asociados. Si un archivo
/// falla, la versión anterior persiste y el error se devuelve.
/// Esto evita que un typo en el archivo deje al Init en modo
/// inseguro.
pub fn reload(&self) -> Result<(), PolicyError> {
let new_allow = match &self.paths.allow_path {
Some(p) => Some(parse_peer_set(p)?),
None => None,
};
let new_deny = match &self.paths.deny_path {
Some(p) => parse_peer_set(p)?,
None => BTreeSet::new(),
};
if let Ok(mut inner) = self.inner.write() {
inner.allow = new_allow;
inner.deny = new_deny;
}
Ok(())
}
/// Arranca un thread que vigila los archivos asociados con
/// `notify` y llama [`Self::reload`] cuando cambian. Debounce
/// 250ms para coalescer múltiples eventos por save (los editores
/// hacen Create+Modify+más).
///
/// Devuelve un `JoinHandle` que el caller debe mantener vivo.
/// Drop del handle no detiene el thread (notify watcher es
/// sticky); para detener, terminar el proceso.
///
/// No-op si no hay paths asociados (devuelve un handle dummy
/// que termina inmediatamente).
pub fn spawn_watcher(&self) -> std::io::Result<std::thread::JoinHandle<()>> {
let allow_path = self.paths.allow_path.clone();
let deny_path = self.paths.deny_path.clone();
let policy = self.clone();
if allow_path.is_none() && deny_path.is_none() {
// Sin archivos a vigilar: spawn un thread que termina ya.
return std::thread::Builder::new()
.name("brahman-policy-watcher-noop".into())
.spawn(|| {});
}
std::thread::Builder::new()
.name("brahman-policy-watcher".into())
.spawn(move || {
run_watcher(policy, allow_path, deny_path);
})
}
}
impl std::fmt::Debug for PeerPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (allow, deny) = self.sizes();
f.debug_struct("PeerPolicy")
.field("allow", &allow)
.field("deny", &deny)
.field("allow_path", &self.paths.allow_path)
.field("deny_path", &self.paths.deny_path)
.finish()
}
}
fn parse_peer_set(path: &Path) -> Result<BTreeSet<PeerId>, PolicyError> {
let contents = std::fs::read_to_string(path).map_err(|e| PolicyError::Io {
path: path.to_path_buf(),
source: e,
})?;
let mut out = BTreeSet::new();
for (idx, raw) in contents.lines().enumerate() {
let line_no = idx + 1;
let trimmed = raw.split('#').next().unwrap_or("").trim();
if trimmed.is_empty() {
continue;
}
let peer = trimmed
.parse::<PeerId>()
.map_err(|_| PolicyError::InvalidPeerId {
path: path.to_path_buf(),
line_no,
value: trimmed.to_string(),
})?;
out.insert(peer);
}
Ok(out)
}
const DEBOUNCE_MS: u64 = 250;
fn run_watcher(
policy: PeerPolicy,
allow_path: Option<PathBuf>,
deny_path: Option<PathBuf>,
) {
use notify::{RecursiveMode, Watcher};
let (tx, rx) = std::sync::mpsc::channel::<notify::Result<notify::Event>>();
let mut watcher = match notify::recommended_watcher(move |res| {
let _ = tx.send(res);
}) {
Ok(w) => w,
Err(e) => {
warn!(?e, "notify watcher para policy no se pudo crear — hot reload deshabilitado");
return;
}
};
// Vigilamos los DIRECTORIOS de los archivos, no los archivos
// directos. Los editores típicos hacen rename-and-replace (escriben
// a tmp, rename al destino), lo que rompe el watch del archivo
// pero NO el del directorio. Trade-off: recibimos más eventos
// (cualquier archivo del dir), filtramos por path al procesar.
for p in [&allow_path, &deny_path].iter().filter_map(|x| x.as_ref()) {
if let Some(parent) = p.parent() {
if let Err(e) = watcher.watch(parent, RecursiveMode::NonRecursive) {
warn!(path = %parent.display(), ?e, "watch failed");
}
}
}
let debounce = Duration::from_millis(DEBOUNCE_MS);
let mut pending_at: Option<Instant> = None;
loop {
let timeout = match pending_at {
Some(at) => debounce.saturating_sub(at.elapsed()).max(Duration::from_millis(10)),
None => Duration::from_secs(60), // wakeup periódico, no esencial
};
match rx.recv_timeout(timeout) {
Ok(Ok(event)) => {
// Sólo nos interesan eventos sobre los paths exactos.
let touches_us = event.paths.iter().any(|p| {
Some(p) == allow_path.as_ref() || Some(p) == deny_path.as_ref()
});
if !touches_us {
continue;
}
debug!(?event.kind, "policy file event recibido — debounce");
pending_at = Some(Instant::now());
}
Ok(Err(e)) => {
warn!(?e, "notify error en policy watcher");
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if let Some(at) = pending_at {
if at.elapsed() >= debounce {
match policy.reload() {
Ok(()) => {
let (a, d) = policy.sizes();
info!(
allow = ?a,
deny = d,
"policy hot-reload completo"
);
}
Err(e) => {
warn!(?e, "policy hot-reload falló — manteniendo versión anterior");
}
}
pending_at = None;
}
}
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
warn!("policy watcher channel cerrado — terminando thread");
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use brahman_net::Keypair;
use tempfile::TempDir;
fn fresh_peer() -> PeerId {
Keypair::generate_ed25519().public().to_peer_id()
}
#[test]
fn open_admits_anyone() {
let p = PeerPolicy::open();
assert_eq!(p.evaluate(&fresh_peer()), Decision::Admit);
}
#[test]
fn allow_only_admits_listed() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let policy = PeerPolicy::from_sets(
Some([p1].into_iter().collect()),
BTreeSet::new(),
);
assert_eq!(policy.evaluate(&p1), Decision::Admit);
assert_eq!(policy.evaluate(&p2), Decision::NotInAllowlist);
}
#[test]
fn deny_overrides_open() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let policy = PeerPolicy::from_sets(None, [p1].into_iter().collect());
assert_eq!(policy.evaluate(&p1), Decision::DeniedByDenylist);
assert_eq!(policy.evaluate(&p2), Decision::Admit);
}
#[test]
fn deny_overrides_allow() {
// Conflicto explícito: p1 está en ambas. Deny gana.
let p1 = fresh_peer();
let policy = PeerPolicy::from_sets(
Some([p1].into_iter().collect()),
[p1].into_iter().collect(),
);
assert_eq!(policy.evaluate(&p1), Decision::DeniedByDenylist);
}
#[test]
fn from_files_with_both_lists() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let p3 = fresh_peer();
let tmp = TempDir::new().unwrap();
let allow = tmp.path().join("allow.txt");
let deny = tmp.path().join("deny.txt");
std::fs::write(&allow, format!("{}\n{}\n", p1, p2)).unwrap();
std::fs::write(&deny, format!("# baneado\n{}\n", p2)).unwrap();
let policy = PeerPolicy::from_files(Some(&allow), Some(&deny)).unwrap();
assert_eq!(policy.evaluate(&p1), Decision::Admit);
assert_eq!(policy.evaluate(&p2), Decision::DeniedByDenylist); // deny gana
assert_eq!(policy.evaluate(&p3), Decision::NotInAllowlist);
}
#[test]
fn from_files_only_deny() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let tmp = TempDir::new().unwrap();
let deny = tmp.path().join("deny.txt");
std::fs::write(&deny, format!("{}\n", p1)).unwrap();
let policy = PeerPolicy::from_files(None, Some(&deny)).unwrap();
assert_eq!(policy.evaluate(&p1), Decision::DeniedByDenylist);
assert_eq!(policy.evaluate(&p2), Decision::Admit);
}
#[test]
fn reload_picks_up_changes() {
let p1 = fresh_peer();
let p2 = fresh_peer();
let tmp = TempDir::new().unwrap();
let allow = tmp.path().join("allow.txt");
std::fs::write(&allow, format!("{}\n", p1)).unwrap();
let policy = PeerPolicy::from_files(Some(&allow), None).unwrap();
assert_eq!(policy.evaluate(&p1), Decision::Admit);
assert_eq!(policy.evaluate(&p2), Decision::NotInAllowlist);
// Mutar el archivo: ahora p2 está, p1 no.
std::fs::write(&allow, format!("{}\n", p2)).unwrap();
policy.reload().unwrap();
assert_eq!(policy.evaluate(&p1), Decision::NotInAllowlist);
assert_eq!(policy.evaluate(&p2), Decision::Admit);
}
#[test]
fn reload_failure_preserves_previous_state() {
let p1 = fresh_peer();
let tmp = TempDir::new().unwrap();
let allow = tmp.path().join("allow.txt");
std::fs::write(&allow, format!("{}\n", p1)).unwrap();
let policy = PeerPolicy::from_files(Some(&allow), None).unwrap();
assert_eq!(policy.evaluate(&p1), Decision::Admit);
// Romper el archivo con basura.
std::fs::write(&allow, "this-is-not-a-peer-id\n").unwrap();
let err = policy.reload();
assert!(err.is_err(), "reload con typo debe fallar");
// Estado anterior se mantiene.
assert_eq!(
policy.evaluate(&p1),
Decision::Admit,
"policy debe conservar la versión anterior tras fallo de reload"
);
}
#[test]
fn invalid_file_rejected_at_load() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("bad.txt");
std::fs::write(&path, "not-a-peer-id\n").unwrap();
let err = PeerPolicy::from_files(Some(&path), None).unwrap_err();
assert!(matches!(err, PolicyError::InvalidPeerId { .. }));
}
#[test]
fn watcher_reloads_on_file_change() {
// Test integración del watcher: arma policy con file, spawn
// watcher, modifica el archivo, espera el debounce, verifica
// que la policy refleja el cambio.
let p1 = fresh_peer();
let p2 = fresh_peer();
let tmp = TempDir::new().unwrap();
let allow = tmp.path().join("allow.txt");
std::fs::write(&allow, format!("{}\n", p1)).unwrap();
let policy = PeerPolicy::from_files(Some(&allow), None).unwrap();
let _watcher = policy.spawn_watcher().unwrap();
// Le damos un instante al watcher para subscribirse al dir.
std::thread::sleep(Duration::from_millis(100));
// Mutamos el archivo: p2 reemplaza a p1.
std::fs::write(&allow, format!("{}\n", p2)).unwrap();
// Esperamos > debounce + margen.
let deadline = Instant::now() + Duration::from_secs(3);
while Instant::now() < deadline {
if policy.evaluate(&p2) == Decision::Admit {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
assert_eq!(
policy.evaluate(&p2),
Decision::Admit,
"watcher debería haber recargado la policy"
);
assert_eq!(
policy.evaluate(&p1),
Decision::NotInAllowlist,
"p1 debería haber salido tras el reload"
);
}
}
+19 -18
View File
@@ -57,13 +57,12 @@ pub struct ServerConfig {
/// locales (lo cual es correcto cuando no hay conectividad o no
/// se desea exponer al exterior).
pub net: Option<Arc<BrahmanNet>>,
/// Política de admisión de peers libp2p. Si está presente, el
/// trust gate del path libp2p exige además que el `peer_id`
/// autenticado por Noise esté en la lista. `None` → modo abierto
/// (cualquier peer Ed25519-válido pasa, comportamiento de Fase 3
/// sin restricción adicional). El path Unix la ignora — la
/// allowlist es a nivel libp2p, no de filesystem.
pub allowlist: Option<crate::peer_allowlist::PeerAllowlist>,
/// Política de admisión de peers libp2p (allow + deny + hot
/// reload opcional). Si está presente, el trust gate del path
/// libp2p evalúa cada `peer_id` (ya autenticado por Noise)
/// contra esta política. `None` → modo totalmente abierto
/// (cualquier peer Ed25519-válido pasa). El path Unix la ignora.
pub policy: Option<crate::peer_policy::PeerPolicy>,
}
// Manual Debug porque BrahmanNet no implementa Debug (libp2p Swarm
@@ -74,7 +73,7 @@ impl std::fmt::Debug for ServerConfig {
.field("init_attached", &self.init_attached)
.field("broker", &self.broker.as_ref().map(|_| "<broker>"))
.field("net", &self.net.as_ref().map(|_| "<net>"))
.field("allowlist", &self.allowlist.as_ref().map(|a| a.len()))
.field("policy", &self.policy.as_ref().map(|p| p.sizes()))
.finish()
}
}
@@ -548,22 +547,24 @@ where
return Ok(None);
}
// Allowlist gate (path libp2p): si está configurada, el peer
// autenticado por Noise debe estar en la lista. Se chequea
// ANTES de la firma porque es comparación O(log n) sin crypto
// — ahorra ciclos contra peers no permitidos. La allowlist no
// se aplica al path Unix (autenticación por SO_PEERCRED, no
// por libp2p PeerId).
if let (Some(peer), Some(allowlist)) = (expected_peer, &config.allowlist) {
if !allowlist.is_allowed(&peer) {
// Policy gate (path libp2p): si está configurada, el peer
// autenticado por Noise debe pasar la política (deny first,
// luego allow). Se chequea ANTES de la firma porque es
// comparación O(log n) sin crypto — ahorra ciclos contra peers
// no permitidos. La política no se aplica al path Unix
// (autenticación por SO_PEERCRED, no por libp2p PeerId).
if let (Some(peer), Some(policy)) = (expected_peer, &config.policy) {
let decision = policy.evaluate(&peer);
if !decision.is_admitted() {
write_frame(
stream,
&Frame::Error(HandshakeError::Unauthorized(format!(
"peer {peer} no está en la allowlist"
"peer {peer}: {}",
decision.reason()
))),
)
.await?;
debug!(peer = %peer, "rechazado por allowlist");
debug!(peer = %peer, reason = decision.reason(), "rechazado por policy");
return Ok(None);
}
}
@@ -58,7 +58,7 @@ fn sock_path(name: &str) -> std::path::PathBuf {
#[tokio::test]
async fn full_handshake_roundtrip() {
let path = sock_path("happy");
let server = Server::bind(&path, ServerConfig { init_attached: true, broker: None, net: None, allowlist: None }).unwrap();
let server = Server::bind(&path, ServerConfig { init_attached: true, broker: None, net: None, policy: None }).unwrap();
let session_handle = tokio::spawn({
async move {
@@ -195,7 +195,7 @@ async fn broker_registers_and_unregisters_with_session() {
init_attached: false,
broker: Some(broker.clone()),
net: None,
allowlist: None,
policy: None,
},
)
.unwrap();
@@ -239,7 +239,7 @@ async fn broker_matches_two_live_modules() {
init_attached: false,
broker: Some(broker.clone()),
net: None,
allowlist: None,
policy: None,
},
)
.unwrap();
@@ -316,7 +316,7 @@ async fn match_event_pushed_on_producer_arrival() {
init_attached: false,
broker: Some(broker.clone()),
net: None,
allowlist: None,
policy: None,
},
)
.unwrap();
@@ -85,7 +85,7 @@ async fn dht_discovery_finds_remote_provider() {
init_attached: true,
broker: Some(a_broker.clone()),
net: Some(a_net.clone()), // ← clave Fase 2: anuncia al DHT
allowlist: None,
policy: None,
},
)
.unwrap(),
@@ -172,7 +172,7 @@ async fn dht_discovery_negative_unknown_flow() {
init_attached: true,
broker: Some(a_broker),
net: Some(a_net.clone()),
allowlist: None,
policy: None,
},
)
.unwrap(),
@@ -251,7 +251,7 @@ async fn dht_discovery_withdraws_on_session_cleanup() {
init_attached: true,
broker: Some(a_broker),
net: Some(a_net.clone()),
allowlist: None,
policy: None,
},
)
.unwrap(),
@@ -19,7 +19,7 @@ use brahman_card::{
CARD_SCHEMA_VERSION,
};
use brahman_handshake::network::{connect_libp2p, run_libp2p_accept_loop};
use brahman_handshake::peer_allowlist::PeerAllowlist;
use brahman_handshake::peer_policy::PeerPolicy;
use brahman_handshake::server::{Server, ServerConfig};
use brahman_net::{BrahmanNet, Keypair, Multiaddr, PeerId, Protocol};
use tempfile::TempDir;
@@ -57,7 +57,7 @@ async fn libp2p_handshake_roundtrip() {
init_attached: true,
broker: Some(broker.clone()),
net: None,
allowlist: None,
policy: None,
},
)
.unwrap(),
@@ -136,7 +136,7 @@ async fn libp2p_handshake_rejects_mismatched_signing_key() {
init_attached: true,
broker: None,
net: None,
allowlist: None,
policy: None,
},
)
.unwrap(),
@@ -197,7 +197,10 @@ async fn libp2p_handshake_allowlist_admits_listed_rejects_others() {
init_attached: true,
broker: None,
net: None,
allowlist: Some(PeerAllowlist::from_iter([allowed_peer])),
policy: Some(PeerPolicy::from_sets(
Some([allowed_peer].into_iter().collect()),
std::collections::BTreeSet::new(),
)),
},
)
.unwrap(),
@@ -247,3 +250,74 @@ async fn libp2p_handshake_allowlist_admits_listed_rejects_others() {
assert_eq!(s.len(), 0, "ninguna sesión adicional registrada tras intento denegado");
}
}
/// Denylist gate: A configura `policy` con un peer en la denylist.
/// Modo abierto para todo lo demás (sin allowlist), pero el peer
/// baneado es rechazado aún teniendo Ed25519 válida y peer_id que
/// derivaría limpio del Noise handshake.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn libp2p_handshake_denylist_blocks_listed_peer() {
let banned_kp = Keypair::generate_ed25519();
let banned_peer = banned_kp.public().to_peer_id();
let other_kp = Keypair::generate_ed25519();
let tmp = TempDir::new().unwrap();
let unix_socket = tmp.path().join("brahman-init.sock");
let server = Arc::new(
Server::bind(
&unix_socket,
ServerConfig {
init_attached: true,
broker: None,
net: None,
policy: Some(PeerPolicy::from_sets(
None, // sin allowlist (abierto)
[banned_peer].into_iter().collect(),
)),
},
)
.unwrap(),
);
let sessions = server.sessions();
let server_net = Arc::new(BrahmanNet::new().unwrap());
let server_peer = server_net.peer_id;
let actual = server_net
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await;
let mut full = actual.clone();
full.push(Protocol::P2p(server_peer));
tokio::spawn(run_libp2p_accept_loop(server.clone(), server_net.clone()));
// Cliente baneado: connect debe fallar.
let banned_net = BrahmanNet::with_keypair(banned_kp.clone()).unwrap();
banned_net.dial(full.clone());
tokio::time::sleep(Duration::from_millis(200)).await;
let card_x = sample_card("test.banned");
let result = connect_libp2p(&banned_net, server_peer, card_x, None, &banned_kp).await;
assert!(
result.is_err(),
"peer en denylist debe ser rechazado, got Ok"
);
{
let s = sessions.lock().await;
assert_eq!(s.len(), 0, "el peer baneado no debería tener sesión");
}
// Cliente no-baneado pasa.
let other_net = BrahmanNet::with_keypair(other_kp.clone()).unwrap();
other_net.dial(full.clone());
tokio::time::sleep(Duration::from_millis(200)).await;
let card_ok = sample_card("test.other");
let mut other_client = connect_libp2p(&other_net, server_peer, card_ok, None, &other_kp)
.await
.expect("peer fuera de denylist debe pasar");
{
let s = sessions.lock().await;
assert_eq!(s.len(), 1, "sesión del peer no-baneado registrada");
}
other_client.farewell().await.ok();
}
+67 -31
View File
@@ -168,11 +168,12 @@ async fn primordial_loop(
// reboots).
let brahman_net = setup_brahman_net(dev_mode).await;
// Allowlist opcional de peers libp2p: si BRAHMAN_PEER_ALLOWLIST
// apunta a un archivo, cualquier handshake remoto requiere que
// su peer_id esté en la lista. Sin la env, modo abierto (todo
// peer Ed25519-válido pasa el trust gate de Fase 3).
let brahman_allowlist = setup_brahman_allowlist();
// Política opcional de peers libp2p: allowlist + denylist + hot
// reload. Activada si BRAHMAN_PEER_ALLOWLIST o BRAHMAN_PEER_DENYLIST
// están set. Sin ninguna, modo totalmente abierto (Fase 3 sin
// restricción adicional). El watcher se queda vivo en background
// observando los archivos para hot reload.
let (brahman_policy, _policy_watcher) = setup_brahman_policy();
let brahman_sock = brahman_handshake::transport::default_socket_path();
match brahman_handshake::server::Server::bind(
@@ -181,7 +182,7 @@ async fn primordial_loop(
init_attached: true,
broker: Some(brahman_broker.clone()),
net: brahman_net.clone(),
allowlist: brahman_allowlist.clone(),
policy: brahman_policy.clone(),
},
) {
Ok(server) => {
@@ -706,35 +707,70 @@ async fn setup_brahman_net(
Some(net)
}
/// Carga la allowlist de peers libp2p desde el archivo apuntado por
/// `BRAHMAN_PEER_ALLOWLIST`. Sin la env, devuelve `None` (modo abierto:
/// cualquier peer Ed25519-válido pasa el trust gate). Si la env está
/// pero el archivo falla, loggea y degrada a None — la doctrina PID 1
/// de no romper por subsistemas opcionales se mantiene.
fn setup_brahman_allowlist() -> Option<brahman_handshake::peer_allowlist::PeerAllowlist> {
let path = match std::env::var("BRAHMAN_PEER_ALLOWLIST") {
Ok(s) if !s.is_empty() => s,
_ => {
tracing::debug!("BRAHMAN_PEER_ALLOWLIST no set — modo abierto (todo peer pasa)");
return None;
}
};
match brahman_handshake::peer_allowlist::PeerAllowlist::from_file(&path) {
Ok(list) => {
info!(
path = %path,
peers = list.len(),
"allowlist de peers libp2p cargada"
);
Some(list)
}
/// Carga la política de peers libp2p desde los archivos apuntados por
/// `BRAHMAN_PEER_ALLOWLIST` y/o `BRAHMAN_PEER_DENYLIST`, y arranca un
/// watcher para hot reload sobre cualquier cambio.
///
/// - Sin ninguna env: `(None, None)` → modo totalmente abierto.
/// - Con cualquiera (o ambas) set: política activa + watcher vivo.
/// - Si los archivos fallan al cargar: degrada a `(None, None)`,
/// loggea, NO rompe el bucle primordial (doctrina PID 1).
///
/// Devuelve la política y el `JoinHandle` del watcher (que el caller
/// debe mantener para que el thread no se aborte). Si no hay paths,
/// el watcher es un no-op que termina inmediato.
fn setup_brahman_policy() -> (
Option<brahman_handshake::peer_policy::PeerPolicy>,
Option<std::thread::JoinHandle<()>>,
) {
let allow_path = std::env::var("BRAHMAN_PEER_ALLOWLIST")
.ok()
.filter(|s| !s.is_empty());
let deny_path = std::env::var("BRAHMAN_PEER_DENYLIST")
.ok()
.filter(|s| !s.is_empty());
if allow_path.is_none() && deny_path.is_none() {
tracing::debug!(
"BRAHMAN_PEER_ALLOWLIST y BRAHMAN_PEER_DENYLIST no set — modo abierto (todo peer pasa)"
);
return (None, None);
}
let allow_pb = allow_path.as_deref().map(std::path::Path::new);
let deny_pb = deny_path.as_deref().map(std::path::Path::new);
let policy = match brahman_handshake::peer_policy::PeerPolicy::from_files(allow_pb, deny_pb) {
Ok(p) => p,
Err(e) => {
warn!(
path = %path,
?e,
"BRAHMAN_PEER_ALLOWLIST inválido — degradando a modo abierto (sin restricción)"
allow = ?allow_path,
deny = ?deny_path,
"policy de peers inválida — degradando a modo abierto (sin restricción)"
);
return (None, None);
}
};
let (allow_count, deny_count) = policy.sizes();
info!(
allow = ?allow_count,
deny = deny_count,
allow_path = ?allow_path,
deny_path = ?deny_path,
"policy de peers libp2p cargada"
);
// Spawn watcher para hot reload. Errores aquí no son fatales —
// tendrías política sin reload, que es razonable.
let watcher = match policy.spawn_watcher() {
Ok(h) => Some(h),
Err(e) => {
warn!(?e, "policy watcher no se pudo crear — hot reload deshabilitado");
None
}
}
};
(Some(policy), watcher)
}