diff --git a/Cargo.lock b/Cargo.lock index 3e20a2e..3ab86cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1139,6 +1139,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "brahman-broker" +version = "0.1.0" +dependencies = [ + "brahman-card", + "serde", + "thiserror 2.0.18", + "ulid", +] + [[package]] name = "brahman-card" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 94bc8e9..4907797 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ # ============================================================ "crates/core/brahman-card", "crates/core/brahman-handshake", + "crates/core/brahman-broker", "crates/core/ente-card", "crates/core/ente-bus", "crates/core/ente-cas", diff --git a/crates/core/brahman-broker/Cargo.toml b/crates/core/brahman-broker/Cargo.toml new file mode 100644 index 0000000..7d33864 --- /dev/null +++ b/crates/core/brahman-broker/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "brahman-broker" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "Brahman — broker de tipos: empareja productores↔consumidores por TypeRef con matching híbrido (exact + structural) y prioridad." + +[dependencies] +brahman-card = { path = "../brahman-card" } +serde = { workspace = true } +thiserror = { workspace = true } +ulid = { workspace = true } diff --git a/crates/core/brahman-broker/src/lib.rs b/crates/core/brahman-broker/src/lib.rs new file mode 100644 index 0000000..3a7ff4b --- /dev/null +++ b/crates/core/brahman-broker/src/lib.rs @@ -0,0 +1,701 @@ +//! `brahman-broker` — empareja productores y consumidores por tipo de flujo. +//! +//! El broker indexa [`brahman_card::Card`]s registradas por `SessionId` y, +//! para cada `flow.input` de un consumidor, busca el `flow.output` +//! compatible de mejor calidad entre los demás. Tres ejes: +//! +//! 1. **Estrategia de matching** ([`MatchStrategy`]): +//! - `Exact`: igualdad estricta de [`brahman_card::TypeRef`]. +//! - `Structural`: misma forma (mismo `package` + `name` para Wit; +//! ignora `interface`). +//! - `ExactThenStructural`: prefiere exact; cae en structural si no hay. +//! +//! 2. **Override `pin_to`**: si el consumidor declara `pin_to = "label"`, +//! el broker prefiere productores cuya Card tenga ese `label` (siempre +//! que el tipo siga matcheando). Si la pista no resuelve, cae en +//! matching por tipo normal. +//! +//! 3. **Prioridad**: empate de tipo se resuelve por +//! [`brahman_card::Priority`] del productor (mayor gana). Empate de +//! prioridad se resuelve lexicográficamente por `label` (estable y +//! determinista). +//! +//! El broker es **stateless w.r.t. routes**: cada `find_producer_for` o +//! `all_matches` se calcula bajo demanda. La única persistencia es el +//! índice de Cards registradas. Esto permite re-evaluar matches cuando +//! cambia el set sin invalidar caches. + +#![forbid(unsafe_code)] +#![warn(rust_2018_idioms)] + +use std::collections::BTreeMap; + +use brahman_card::{Card, Flow, Priority, TypeRef}; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +/// Identificador de sesión emitido por el handshake. Idéntico al usado por +/// `brahman-handshake` (no es un re-export para evitar la dependencia). +pub type SessionId = Ulid; + +/// Estrategia de matching de tipos. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum MatchStrategy { + /// Igualdad estricta de `TypeRef`. + Exact, + /// Misma forma: para `Wit`, mismo `package` + `name`; para + /// `Primitive`, mismo `name`. + Structural, + /// Híbrido: intenta `Exact` primero; si no matchea, `Structural`. + /// Reporta cuál estrategia ganó en [`Match::via`]. + #[default] + ExactThenStructural, +} + +/// Configuración del broker. +#[derive(Debug, Clone, Default)] +pub struct BrokerConfig { + pub strategy: MatchStrategy, +} + +/// Vista mínima de una Card que el broker necesita. +#[derive(Debug, Clone)] +pub struct BrokeredCard { + pub session: SessionId, + pub label: String, + pub priority: Priority, + pub inputs: Vec, + pub outputs: Vec, +} + +impl BrokeredCard { + fn from_card(session: SessionId, card: &Card) -> Self { + Self { + session, + label: card.label.clone(), + priority: card.priority, + inputs: card.flow.input.clone(), + outputs: card.flow.output.clone(), + } + } +} + +/// Punto extremo de un flujo: qué sesión + nombre del flow dentro de su Card. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Endpoint { + pub session: SessionId, + pub flow_name: String, +} + +/// Match concreto entre un consumidor y un productor. +#[derive(Debug, Clone)] +pub struct Match { + pub consumer: Endpoint, + pub consumer_label: String, + pub producer: Endpoint, + pub producer_label: String, + /// Tipo del flow (lado consumidor — lado productor coincide en + /// estrategia Exact, puede diferir en `interface` en Structural). + pub ty: TypeRef, + /// Estrategia que efectivamente matcheó. + pub via: MatchStrategy, + /// `true` si el match fue resuelto por `pin_to` y no por type-search. + pub pinned: bool, +} + +// ===================================================================== +// Broker +// ===================================================================== + +/// El broker. Registra Cards por SessionId, computa matches bajo demanda. +#[derive(Debug, Clone, Default)] +pub struct Broker { + cards: BTreeMap, + config: BrokerConfig, +} + +impl Broker { + pub fn new(config: BrokerConfig) -> Self { + Self { + cards: BTreeMap::new(), + config, + } + } + + /// Registra una Card. Devuelve `Some(prev)` si reemplazó una existente. + pub fn register(&mut self, session: SessionId, card: &Card) -> Option { + self.cards.insert(session, BrokeredCard::from_card(session, card)) + } + + /// Quita una Card por sesión. + pub fn unregister(&mut self, session: SessionId) -> Option { + self.cards.remove(&session) + } + + /// Cardinalidad del registro. + pub fn len(&self) -> usize { + self.cards.len() + } + + pub fn is_empty(&self) -> bool { + self.cards.is_empty() + } + + /// Iterador sobre las sesiones registradas. + pub fn sessions(&self) -> impl Iterator + '_ { + self.cards.keys().copied() + } + + /// Busca el mejor productor para un input específico de un consumidor. + /// + /// Algoritmo: + /// 1. Resuelve el flow input en el consumidor. + /// 2. Si tiene `pin_to`, prefiere productores con ese `label` que + /// matcheen el tipo (cualquier estrategia configurada). + /// 3. Si no hay pin_to o la pista falló, escanea todos los outputs + /// de las otras Cards. Filtra por compatibilidad de tipo. + /// 4. Ordena por (priority desc, label asc) y devuelve el primero. + pub fn find_producer_for(&self, consumer: SessionId, input_name: &str) -> Option { + let cons = self.cards.get(&consumer)?; + let input = cons.inputs.iter().find(|f| f.name == input_name)?; + + // pin_to: short-circuit. Si la pista resuelve, gana. + if let Some(pin) = &input.pin_to { + for prod in self.cards.values() { + if prod.session == consumer || &prod.label != pin { + continue; + } + for out in &prod.outputs { + if let Some(via) = self.types_match(&input.ty, &out.ty) { + return Some(self.make_match(cons, prod, input, out, via, true)); + } + } + } + // Fall through: pin no resuelto, type-search general. + } + + let mut candidates: Vec<(&BrokeredCard, &Flow, MatchStrategy)> = Vec::new(); + for prod in self.cards.values() { + if prod.session == consumer { + continue; + } + for out in &prod.outputs { + if let Some(via) = self.types_match(&input.ty, &out.ty) { + candidates.push((prod, out, via)); + } + } + } + + candidates.sort_by(|(a, _, _), (b, _, _)| { + b.priority + .cmp(&a.priority) + .then_with(|| a.label.cmp(&b.label)) + }); + + let (prod, out, via) = candidates.into_iter().next()?; + Some(self.make_match(cons, prod, input, out, via, false)) + } + + /// Calcula todos los matches consumer→producer en el set actual. + /// Útil para introspección o para que el Admin emita rutas en lote. + pub fn all_matches(&self) -> Vec { + let mut out = Vec::new(); + for cons in self.cards.values() { + for input in &cons.inputs { + if let Some(m) = self.find_producer_for(cons.session, &input.name) { + out.push(m); + } + } + } + out + } + + fn types_match(&self, consumer_ty: &TypeRef, producer_ty: &TypeRef) -> Option { + match self.config.strategy { + MatchStrategy::Exact => exact_match(consumer_ty, producer_ty).then_some(MatchStrategy::Exact), + MatchStrategy::Structural => { + structural_match(consumer_ty, producer_ty).then_some(MatchStrategy::Structural) + } + MatchStrategy::ExactThenStructural => { + if exact_match(consumer_ty, producer_ty) { + Some(MatchStrategy::Exact) + } else if structural_match(consumer_ty, producer_ty) { + Some(MatchStrategy::Structural) + } else { + None + } + } + } + } + + fn make_match( + &self, + cons: &BrokeredCard, + prod: &BrokeredCard, + input: &Flow, + output: &Flow, + via: MatchStrategy, + pinned: bool, + ) -> Match { + Match { + consumer: Endpoint { + session: cons.session, + flow_name: input.name.clone(), + }, + consumer_label: cons.label.clone(), + producer: Endpoint { + session: prod.session, + flow_name: output.name.clone(), + }, + producer_label: prod.label.clone(), + ty: input.ty.clone(), + via, + pinned, + } + } +} + +// ===================================================================== +// Predicados de matching (libres, testeables aislados) +// ===================================================================== + +fn exact_match(a: &TypeRef, b: &TypeRef) -> bool { + a == b +} + +fn structural_match(a: &TypeRef, b: &TypeRef) -> bool { + match (a, b) { + (TypeRef::Primitive { name: na }, TypeRef::Primitive { name: nb }) => na == nb, + ( + TypeRef::Wit { + package: pa, name: na, .. + }, + TypeRef::Wit { + package: pb, name: nb, .. + }, + ) => pa == pb && na == nb, + _ => false, + } +} + +// ===================================================================== +// Tests +// ===================================================================== + +#[cfg(test)] +mod tests { + use super::*; + use brahman_card::{Card, Flows, Payload, Supervision, CARD_SCHEMA_VERSION}; + + fn card(label: &str, priority: Priority, flows: Flows) -> Card { + Card { + schema_version: CARD_SCHEMA_VERSION, + id: Ulid::new(), + label: label.into(), + payload: Payload::Virtual, + supervision: Supervision::OneShot, + priority, + flow: flows, + ..Default::default() + } + } + + fn prim(name: &str) -> TypeRef { + TypeRef::Primitive { name: name.into() } + } + + fn wit(pkg: &str, iface: Option<&str>, name: &str) -> TypeRef { + TypeRef::Wit { + package: pkg.into(), + interface: iface.map(|s| s.into()), + name: name.into(), + } + } + + fn flow(name: &str, ty: TypeRef, pin: Option<&str>) -> Flow { + Flow { + name: name.into(), + ty, + pin_to: pin.map(|s| s.into()), + } + } + + #[test] + fn exact_match_same_typeref() { + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::Exact, + }); + let producer = card( + "dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("results", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("query", prim("string"), None)], + output: vec![], + }, + ); + let s_prod = Ulid::new(); + let s_cons = Ulid::new(); + b.register(s_prod, &producer); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "query").expect("match"); + assert_eq!(m.producer_label, "dht"); + assert_eq!(m.via, MatchStrategy::Exact); + assert!(!m.pinned); + } + + #[test] + fn structural_ignores_interface() { + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::Structural, + }); + let producer = card( + "dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow( + "out", + wit("brahman:dht", Some("v1"), "entity-result"), + None, + )], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow( + "in", + wit("brahman:dht", Some("v2"), "entity-result"), + None, + )], + output: vec![], + }, + ); + let s_prod = Ulid::new(); + let s_cons = Ulid::new(); + b.register(s_prod, &producer); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + assert_eq!(m.via, MatchStrategy::Structural); + } + + #[test] + fn exact_strategy_rejects_interface_mismatch() { + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::Exact, + }); + let producer = card( + "dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow( + "out", + wit("brahman:dht", Some("v1"), "entity-result"), + None, + )], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow( + "in", + wit("brahman:dht", Some("v2"), "entity-result"), + None, + )], + output: vec![], + }, + ); + b.register(Ulid::new(), &producer); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + assert!(b.find_producer_for(s_cons, "in").is_none()); + } + + #[test] + fn exact_then_structural_prefers_exact() { + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::ExactThenStructural, + }); + // Productor 1: match estructural (interface diferente) + let p_struct = card( + "dht-cache", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow( + "out", + wit("brahman:dht", Some("v2"), "entity-result"), + None, + )], + }, + ); + // Productor 2: match exact (interface igual) + let p_exact = card( + "dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow( + "out", + wit("brahman:dht", Some("v1"), "entity-result"), + None, + )], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow( + "in", + wit("brahman:dht", Some("v1"), "entity-result"), + None, + )], + output: vec![], + }, + ); + b.register(Ulid::new(), &p_struct); + b.register(Ulid::new(), &p_exact); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + // El exact gana incluso si tiene priority igual: por estrategia. + assert_eq!(m.producer_label, "dht"); + assert_eq!(m.via, MatchStrategy::Exact); + } + + #[test] + fn pin_to_overrides_type_search() { + let mut b = Broker::new(BrokerConfig::default()); + // Dos productores que producen el mismo tipo. + let p1 = card( + "dht-prod", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let p2 = card( + "dht-test", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), Some("dht-test"))], + output: vec![], + }, + ); + b.register(Ulid::new(), &p1); + b.register(Ulid::new(), &p2); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + assert_eq!(m.producer_label, "dht-test"); + assert!(m.pinned); + } + + #[test] + fn pin_to_unresolvable_falls_back_to_type_match() { + let mut b = Broker::new(BrokerConfig::default()); + let p = card( + "real-dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), Some("nonexistent"))], + output: vec![], + }, + ); + b.register(Ulid::new(), &p); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + assert_eq!(m.producer_label, "real-dht"); + assert!(!m.pinned); + } + + #[test] + fn priority_breaks_ties() { + let mut b = Broker::new(BrokerConfig::default()); + let p_low = card( + "z-dht", + Priority::Low, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let p_high = card( + "a-dht", + Priority::High, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![], + }, + ); + b.register(Ulid::new(), &p_low); + b.register(Ulid::new(), &p_high); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + assert_eq!(m.producer_label, "a-dht"); // priority High > Low + } + + #[test] + fn label_alpha_breaks_priority_ties() { + let mut b = Broker::new(BrokerConfig::default()); + let p1 = card( + "z-dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let p2 = card( + "a-dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![], + }, + ); + b.register(Ulid::new(), &p1); + b.register(Ulid::new(), &p2); + let s_cons = Ulid::new(); + b.register(s_cons, &consumer); + + let m = b.find_producer_for(s_cons, "in").expect("match"); + assert_eq!(m.producer_label, "a-dht"); // alfabético gana + } + + #[test] + fn unregister_removes_producer() { + let mut b = Broker::new(BrokerConfig::default()); + let p = card( + "dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![], + }, + ); + let s_p = Ulid::new(); + b.register(s_p, &p); + let s_c = Ulid::new(); + b.register(s_c, &consumer); + + assert!(b.find_producer_for(s_c, "in").is_some()); + b.unregister(s_p); + assert!(b.find_producer_for(s_c, "in").is_none()); + } + + #[test] + fn no_self_loops() { + let mut b = Broker::new(BrokerConfig::default()); + let same = card( + "echo", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![flow("out", prim("string"), None)], + }, + ); + let s = Ulid::new(); + b.register(s, &same); + + // Solo una Card registrada — no hay otra que produzca string. + assert!(b.find_producer_for(s, "in").is_none()); + } + + #[test] + fn all_matches_lists_pairs() { + let mut b = Broker::new(BrokerConfig::default()); + let dht = card( + "dht", + Priority::Normal, + Flows { + input: vec![flow("query", prim("string"), None)], + output: vec![flow("results", prim("bytes"), None)], + }, + ); + let ui = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("data", prim("bytes"), None)], + output: vec![flow("user-input", prim("string"), None)], + }, + ); + b.register(Ulid::new(), &dht); + b.register(Ulid::new(), &ui); + + let matches = b.all_matches(); + assert_eq!(matches.len(), 2); + // dht.query ← ui.user-input y ui.data ← dht.results + let pairs: Vec<_> = matches + .iter() + .map(|m| (m.consumer_label.as_str(), m.producer_label.as_str())) + .collect(); + assert!(pairs.contains(&("dht", "ui"))); + assert!(pairs.contains(&("ui", "dht"))); + } +} diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index 8c7d40e..de43442 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -360,8 +360,9 @@ pub enum Lifecycle { Widget, } -/// Prioridad de scheduling. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +/// Prioridad de scheduling. Orden: `Low < Normal < High < Critical` — +/// usable como tiebreaker en el broker (mayor priority gana). +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Priority { Low,