Files
arje/crates/ente-bus/src/lib.rs
T
Sergio badf4257ec audit-verify, autopromote, histogramas y hot-reload caps
- audit::verify_chain_from_cas() recorre prev_sha desde head hasta genesis,
  valida sha256(blob)==sha del CAS para detectar tampering. Endpoint
  VerifyAudit + brainctl audit-verify.
- autopromote loop background: cada N segundos detecta cristales sobre
  threshold y los promueve sin operador. Anti-doble vía HashSet de pares
  (ant, con). Flag --autopromote-secs.
- GapHistogram con buckets exponenciales (1ms..1000s) capturado en
  observer.record(). top_gap_pairs(K) limita cardinalidad. Expuesto en
  Prometheus como ente_brain_pair_gap_seconds histogram per pair.
- BusRequest::UpdateCapabilities con auth obligatorio (sólo el dueño puede
  modificar sus caps). Incarnated.dynamic_provides separa runtime de la
  Card immutable. Graph mediator actualiza providers index + revoca grants.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 23:32:54 +00:00

215 lines
7.5 KiB
Rust

//! ente-bus: bus de capacidades interno del fractal.
//!
//! Wire format: Unix SOCK_STREAM con framing length-prefijo (u32 BE) + payload
//! postcard. Bidireccional pero por ahora request-response síncrono.
//!
//! Identidad: cada Ente hijo recibe `ENTE_BUS_SOCK` y `ENTE_ID` en su entorno.
//! El cliente lee ambos vía `BusClient::from_env`.
use ente_card::Capability;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::str::FromStr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
use ulid::Ulid;
pub const ENV_BUS_SOCK: &str = "ENTE_BUS_SOCK";
pub const ENV_ENTE_ID: &str = "ENTE_ID";
pub const MAX_FRAME: usize = 1 << 20; // 1 MiB — protección contra OOM
/// Credenciales del peer extraídas vía SO_PEERCRED al accept del bus.
/// Imposibles de falsear desde el cliente — el kernel las inyecta.
/// Definidas aquí (no en ente-zero) porque conceptualmente son atributo
/// del protocolo del bus, no del init.
#[derive(Debug, Clone, Copy)]
pub struct PeerCreds {
pub pid: i32,
pub uid: u32,
pub gid: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BusMessage {
pub from: Option<Ulid>,
pub seq: u64,
pub payload: BusPayload,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BusPayload {
Request(BusRequest),
Response(BusResponse),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BusRequest {
/// Saludo. El Ente anuncia que está vivo y declara sus capacidades.
/// El Init usa esto para saber que el child arrancó correctamente,
/// independientemente de su exit code.
Announce { capabilities: Vec<Capability> },
/// Listar Entes vivos. Útil para debugging y para Entes-supervisor.
ListEntes,
/// Control de estado del fractal. Traducido desde D-Bus por compat-logind.
PowerOff { interactive: bool },
Reboot { interactive: bool },
Suspend { interactive: bool },
Hibernate { interactive: bool },
/// Invocación genérica de capacidad. `cap` debe estar provista por algún
/// Ente del grafo; el blob es el argumento opaco que el proveedor parsea.
Invoke { cap: Capability, blob: Vec<u8> },
/// Actualización dinámica del set de capacidades del Ente que llama.
/// Sólo aplicable al `from_authenticated` — un Ente sólo puede modificar
/// sus propias caps. La Card original (immutable) no se toca; la mutación
/// va al `dynamic_provides` del Incarnated.
UpdateCapabilities {
adds: Vec<Capability>,
removes: Vec<Capability>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BusResponse {
Ok,
Error(String),
Entes(Vec<EnteInfo>),
Invoked { result: Vec<u8> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnteInfo {
pub id: Ulid,
pub label: String,
pub provides: Vec<Capability>,
pub pid: Option<i32>,
}
pub async fn write_frame<W: AsyncWriteExt + Unpin>(w: &mut W, msg: &BusMessage) -> anyhow::Result<()> {
let bytes = postcard::to_stdvec(msg)?;
if bytes.len() > MAX_FRAME {
anyhow::bail!("frame too large: {} > {}", bytes.len(), MAX_FRAME);
}
w.write_u32(bytes.len() as u32).await?;
w.write_all(&bytes).await?;
Ok(())
}
pub async fn read_frame<R: AsyncReadExt + Unpin>(r: &mut R) -> anyhow::Result<BusMessage> {
let len = r.read_u32().await? as usize;
if len > MAX_FRAME {
anyhow::bail!("frame oversize: {len}");
}
let mut buf = vec![0u8; len];
r.read_exact(&mut buf).await?;
Ok(postcard::from_bytes(&buf)?)
}
pub struct BusClient {
stream: UnixStream,
seq: u64,
self_id: Option<Ulid>,
}
/// Trait que un Ente proveedor implementa para servir invokes que el bus le
/// forwarda. Sync por simplicidad — un handler async se cubre con
/// `tokio::task::block_in_place` o un canal hacia un task externo.
pub trait InvokeHandler {
fn handle(&mut self, cap: ente_card::Capability, blob: Vec<u8>) -> BusResponse;
}
/// Conexión long-lived para Entes que proveen capacidades. A diferencia de
/// `BusClient` (request-response y desconecta), `BusServer`:
/// 1. Anuncia su identidad al bus
/// 2. Mantiene la conexión abierta
/// 3. Atiende invokes forwardeados por el bus en bucle
pub struct BusServer {
stream: UnixStream,
self_id: Ulid,
}
impl BusServer {
pub async fn from_env() -> anyhow::Result<Self> {
let path = std::env::var(ENV_BUS_SOCK)
.map_err(|_| anyhow::anyhow!("{} no definido", ENV_BUS_SOCK))?;
let id_s = std::env::var(ENV_ENTE_ID)
.map_err(|_| anyhow::anyhow!("{} no definido", ENV_ENTE_ID))?;
let self_id = Ulid::from_str(&id_s)
.map_err(|_| anyhow::anyhow!("{} no es un Ulid válido: {id_s}", ENV_ENTE_ID))?;
let stream = UnixStream::connect(&path).await?;
Ok(Self { stream, self_id })
}
pub async fn announce(&mut self, capabilities: Vec<ente_card::Capability>) -> anyhow::Result<()> {
let req = BusMessage {
from: Some(self.self_id),
seq: 1,
payload: BusPayload::Request(BusRequest::Announce { capabilities }),
};
write_frame(&mut self.stream, &req).await?;
let resp = read_frame(&mut self.stream).await?;
match resp.payload {
BusPayload::Response(BusResponse::Ok) => Ok(()),
BusPayload::Response(other) => anyhow::bail!("Announce rechazado: {other:?}"),
BusPayload::Request(_) => anyhow::bail!("expected Response, got Request"),
}
}
/// Bucle principal del proveedor. Atiende invokes hasta que la conexión
/// se cierra (el bus muere o el Ente recibe SIGTERM y nosotros dropeamos).
pub async fn serve<H: InvokeHandler>(mut self, mut handler: H) -> anyhow::Result<()> {
loop {
let msg = read_frame(&mut self.stream).await?;
let resp = match msg.payload {
BusPayload::Request(BusRequest::Invoke { cap, blob }) => {
handler.handle(cap, blob)
}
BusPayload::Request(other) => {
BusResponse::Error(format!("BusServer no maneja {other:?}"))
}
BusPayload::Response(_) => continue,
};
let out = BusMessage {
from: Some(self.self_id),
seq: msg.seq,
payload: BusPayload::Response(resp),
};
write_frame(&mut self.stream, &out).await?;
}
}
}
impl BusClient {
pub async fn connect(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let stream = UnixStream::connect(path).await?;
let self_id = std::env::var(ENV_ENTE_ID)
.ok()
.and_then(|s| Ulid::from_str(&s).ok());
Ok(Self { stream, seq: 0, self_id })
}
pub async fn from_env() -> anyhow::Result<Self> {
let path = std::env::var(ENV_BUS_SOCK)
.map_err(|_| anyhow::anyhow!("{} no definido", ENV_BUS_SOCK))?;
Self::connect(&path).await
}
pub async fn call(&mut self, req: BusRequest) -> anyhow::Result<BusResponse> {
self.seq = self.seq.wrapping_add(1);
let msg = BusMessage {
from: self.self_id,
seq: self.seq,
payload: BusPayload::Request(req),
};
write_frame(&mut self.stream, &msg).await?;
let resp = read_frame(&mut self.stream).await?;
match resp.payload {
BusPayload::Response(r) => Ok(r),
BusPayload::Request(_) => anyhow::bail!("expected response, got request"),
}
}
}