feat(core): brahman-broker — matching híbrido productor↔consumidor

Crate nuevo en crates/core/brahman-broker que indexa Cards por SessionId
y empareja flow.input de un consumidor con flow.output del productor más
adecuado.

Tres ejes de matching:

1. Estrategia (MatchStrategy):
   - Exact: igualdad estricta de TypeRef.
   - Structural: misma forma — para Wit, mismo package + name (ignora
     interface); para Primitive, mismo name.
   - ExactThenStructural (default): prefiere Exact; cae en Structural si
     no hay. Reporta cuál ganó en Match.via.

2. Override pin_to: si el consumidor declara pin_to = "label", el broker
   prefiere productores con ese label (siempre que el tipo matchee).
   Si la pista falla, cae en type-search general. Match.pinned indica
   qué camino se siguió.

3. Prioridad: empate de tipo se resuelve por Card.priority (Critical >
   High > Normal > Low). Empate de prioridad se resuelve lexicográfica-
   mente por label (estable y determinista).

API mínima:
- Broker::new(config)
- register(session, &Card) / unregister(session)
- find_producer_for(consumer_session, input_name) -> Option<Match>
- all_matches() -> Vec<Match> (introspección)

El broker es stateless w.r.t. routes: cada query se computa bajo demanda.
Sólo persiste el índice de BrokeredCard (vista mínima: label, priority,
inputs, outputs).

Cambio aditivo en brahman-card: Priority deriva PartialOrd/Ord/Hash para
ser usable como tiebreaker.

Tests: 11/11.
- exact_match_same_typeref
- structural_ignores_interface
- exact_strategy_rejects_interface_mismatch
- exact_then_structural_prefers_exact
- pin_to_overrides_type_search
- pin_to_unresolvable_falls_back_to_type_match
- priority_breaks_ties
- label_alpha_breaks_priority_ties
- unregister_removes_producer
- no_self_loops
- all_matches_lists_pairs

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-08 14:30:21 +00:00
parent 814390feec
commit 5091106c21
5 changed files with 730 additions and 2 deletions
Generated
+10
View File
@@ -1139,6 +1139,16 @@ dependencies = [
"syn 2.0.117", "syn 2.0.117",
] ]
[[package]]
name = "brahman-broker"
version = "0.1.0"
dependencies = [
"brahman-card",
"serde",
"thiserror 2.0.18",
"ulid",
]
[[package]] [[package]]
name = "brahman-card" name = "brahman-card"
version = "0.1.0" version = "0.1.0"
+1
View File
@@ -6,6 +6,7 @@ members = [
# ============================================================ # ============================================================
"crates/core/brahman-card", "crates/core/brahman-card",
"crates/core/brahman-handshake", "crates/core/brahman-handshake",
"crates/core/brahman-broker",
"crates/core/ente-card", "crates/core/ente-card",
"crates/core/ente-bus", "crates/core/ente-bus",
"crates/core/ente-cas", "crates/core/ente-cas",
+15
View File
@@ -0,0 +1,15 @@
[package]
name = "brahman-broker"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "Brahman — broker de tipos: empareja productores↔consumidores por TypeRef con matching híbrido (exact + structural) y prioridad."
[dependencies]
brahman-card = { path = "../brahman-card" }
serde = { workspace = true }
thiserror = { workspace = true }
ulid = { workspace = true }
+701
View File
@@ -0,0 +1,701 @@
//! `brahman-broker` — empareja productores y consumidores por tipo de flujo.
//!
//! El broker indexa [`brahman_card::Card`]s registradas por `SessionId` y,
//! para cada `flow.input` de un consumidor, busca el `flow.output`
//! compatible de mejor calidad entre los demás. Tres ejes:
//!
//! 1. **Estrategia de matching** ([`MatchStrategy`]):
//! - `Exact`: igualdad estricta de [`brahman_card::TypeRef`].
//! - `Structural`: misma forma (mismo `package` + `name` para Wit;
//! ignora `interface`).
//! - `ExactThenStructural`: prefiere exact; cae en structural si no hay.
//!
//! 2. **Override `pin_to`**: si el consumidor declara `pin_to = "label"`,
//! el broker prefiere productores cuya Card tenga ese `label` (siempre
//! que el tipo siga matcheando). Si la pista no resuelve, cae en
//! matching por tipo normal.
//!
//! 3. **Prioridad**: empate de tipo se resuelve por
//! [`brahman_card::Priority`] del productor (mayor gana). Empate de
//! prioridad se resuelve lexicográficamente por `label` (estable y
//! determinista).
//!
//! El broker es **stateless w.r.t. routes**: cada `find_producer_for` o
//! `all_matches` se calcula bajo demanda. La única persistencia es el
//! índice de Cards registradas. Esto permite re-evaluar matches cuando
//! cambia el set sin invalidar caches.
#![forbid(unsafe_code)]
#![warn(rust_2018_idioms)]
use std::collections::BTreeMap;
use brahman_card::{Card, Flow, Priority, TypeRef};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
/// Identificador de sesión emitido por el handshake. Idéntico al usado por
/// `brahman-handshake` (no es un re-export para evitar la dependencia).
pub type SessionId = Ulid;
/// Estrategia de matching de tipos.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum MatchStrategy {
/// Igualdad estricta de `TypeRef`.
Exact,
/// Misma forma: para `Wit`, mismo `package` + `name`; para
/// `Primitive`, mismo `name`.
Structural,
/// Híbrido: intenta `Exact` primero; si no matchea, `Structural`.
/// Reporta cuál estrategia ganó en [`Match::via`].
#[default]
ExactThenStructural,
}
/// Configuración del broker.
#[derive(Debug, Clone, Default)]
pub struct BrokerConfig {
pub strategy: MatchStrategy,
}
/// Vista mínima de una Card que el broker necesita.
#[derive(Debug, Clone)]
pub struct BrokeredCard {
pub session: SessionId,
pub label: String,
pub priority: Priority,
pub inputs: Vec<Flow>,
pub outputs: Vec<Flow>,
}
impl BrokeredCard {
fn from_card(session: SessionId, card: &Card) -> Self {
Self {
session,
label: card.label.clone(),
priority: card.priority,
inputs: card.flow.input.clone(),
outputs: card.flow.output.clone(),
}
}
}
/// Punto extremo de un flujo: qué sesión + nombre del flow dentro de su Card.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Endpoint {
pub session: SessionId,
pub flow_name: String,
}
/// Match concreto entre un consumidor y un productor.
#[derive(Debug, Clone)]
pub struct Match {
pub consumer: Endpoint,
pub consumer_label: String,
pub producer: Endpoint,
pub producer_label: String,
/// Tipo del flow (lado consumidor — lado productor coincide en
/// estrategia Exact, puede diferir en `interface` en Structural).
pub ty: TypeRef,
/// Estrategia que efectivamente matcheó.
pub via: MatchStrategy,
/// `true` si el match fue resuelto por `pin_to` y no por type-search.
pub pinned: bool,
}
// =====================================================================
// Broker
// =====================================================================
/// El broker. Registra Cards por SessionId, computa matches bajo demanda.
#[derive(Debug, Clone, Default)]
pub struct Broker {
cards: BTreeMap<SessionId, BrokeredCard>,
config: BrokerConfig,
}
impl Broker {
pub fn new(config: BrokerConfig) -> Self {
Self {
cards: BTreeMap::new(),
config,
}
}
/// Registra una Card. Devuelve `Some(prev)` si reemplazó una existente.
pub fn register(&mut self, session: SessionId, card: &Card) -> Option<BrokeredCard> {
self.cards.insert(session, BrokeredCard::from_card(session, card))
}
/// Quita una Card por sesión.
pub fn unregister(&mut self, session: SessionId) -> Option<BrokeredCard> {
self.cards.remove(&session)
}
/// Cardinalidad del registro.
pub fn len(&self) -> usize {
self.cards.len()
}
pub fn is_empty(&self) -> bool {
self.cards.is_empty()
}
/// Iterador sobre las sesiones registradas.
pub fn sessions(&self) -> impl Iterator<Item = SessionId> + '_ {
self.cards.keys().copied()
}
/// Busca el mejor productor para un input específico de un consumidor.
///
/// Algoritmo:
/// 1. Resuelve el flow input en el consumidor.
/// 2. Si tiene `pin_to`, prefiere productores con ese `label` que
/// matcheen el tipo (cualquier estrategia configurada).
/// 3. Si no hay pin_to o la pista falló, escanea todos los outputs
/// de las otras Cards. Filtra por compatibilidad de tipo.
/// 4. Ordena por (priority desc, label asc) y devuelve el primero.
pub fn find_producer_for(&self, consumer: SessionId, input_name: &str) -> Option<Match> {
let cons = self.cards.get(&consumer)?;
let input = cons.inputs.iter().find(|f| f.name == input_name)?;
// pin_to: short-circuit. Si la pista resuelve, gana.
if let Some(pin) = &input.pin_to {
for prod in self.cards.values() {
if prod.session == consumer || &prod.label != pin {
continue;
}
for out in &prod.outputs {
if let Some(via) = self.types_match(&input.ty, &out.ty) {
return Some(self.make_match(cons, prod, input, out, via, true));
}
}
}
// Fall through: pin no resuelto, type-search general.
}
let mut candidates: Vec<(&BrokeredCard, &Flow, MatchStrategy)> = Vec::new();
for prod in self.cards.values() {
if prod.session == consumer {
continue;
}
for out in &prod.outputs {
if let Some(via) = self.types_match(&input.ty, &out.ty) {
candidates.push((prod, out, via));
}
}
}
candidates.sort_by(|(a, _, _), (b, _, _)| {
b.priority
.cmp(&a.priority)
.then_with(|| a.label.cmp(&b.label))
});
let (prod, out, via) = candidates.into_iter().next()?;
Some(self.make_match(cons, prod, input, out, via, false))
}
/// Calcula todos los matches consumer→producer en el set actual.
/// Útil para introspección o para que el Admin emita rutas en lote.
pub fn all_matches(&self) -> Vec<Match> {
let mut out = Vec::new();
for cons in self.cards.values() {
for input in &cons.inputs {
if let Some(m) = self.find_producer_for(cons.session, &input.name) {
out.push(m);
}
}
}
out
}
fn types_match(&self, consumer_ty: &TypeRef, producer_ty: &TypeRef) -> Option<MatchStrategy> {
match self.config.strategy {
MatchStrategy::Exact => exact_match(consumer_ty, producer_ty).then_some(MatchStrategy::Exact),
MatchStrategy::Structural => {
structural_match(consumer_ty, producer_ty).then_some(MatchStrategy::Structural)
}
MatchStrategy::ExactThenStructural => {
if exact_match(consumer_ty, producer_ty) {
Some(MatchStrategy::Exact)
} else if structural_match(consumer_ty, producer_ty) {
Some(MatchStrategy::Structural)
} else {
None
}
}
}
}
fn make_match(
&self,
cons: &BrokeredCard,
prod: &BrokeredCard,
input: &Flow,
output: &Flow,
via: MatchStrategy,
pinned: bool,
) -> Match {
Match {
consumer: Endpoint {
session: cons.session,
flow_name: input.name.clone(),
},
consumer_label: cons.label.clone(),
producer: Endpoint {
session: prod.session,
flow_name: output.name.clone(),
},
producer_label: prod.label.clone(),
ty: input.ty.clone(),
via,
pinned,
}
}
}
// =====================================================================
// Predicados de matching (libres, testeables aislados)
// =====================================================================
fn exact_match(a: &TypeRef, b: &TypeRef) -> bool {
a == b
}
fn structural_match(a: &TypeRef, b: &TypeRef) -> bool {
match (a, b) {
(TypeRef::Primitive { name: na }, TypeRef::Primitive { name: nb }) => na == nb,
(
TypeRef::Wit {
package: pa, name: na, ..
},
TypeRef::Wit {
package: pb, name: nb, ..
},
) => pa == pb && na == nb,
_ => false,
}
}
// =====================================================================
// Tests
// =====================================================================
#[cfg(test)]
mod tests {
use super::*;
use brahman_card::{Card, Flows, Payload, Supervision, CARD_SCHEMA_VERSION};
fn card(label: &str, priority: Priority, flows: Flows) -> Card {
Card {
schema_version: CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: label.into(),
payload: Payload::Virtual,
supervision: Supervision::OneShot,
priority,
flow: flows,
..Default::default()
}
}
fn prim(name: &str) -> TypeRef {
TypeRef::Primitive { name: name.into() }
}
fn wit(pkg: &str, iface: Option<&str>, name: &str) -> TypeRef {
TypeRef::Wit {
package: pkg.into(),
interface: iface.map(|s| s.into()),
name: name.into(),
}
}
fn flow(name: &str, ty: TypeRef, pin: Option<&str>) -> Flow {
Flow {
name: name.into(),
ty,
pin_to: pin.map(|s| s.into()),
}
}
#[test]
fn exact_match_same_typeref() {
let mut b = Broker::new(BrokerConfig {
strategy: MatchStrategy::Exact,
});
let producer = card(
"dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("results", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("query", prim("string"), None)],
output: vec![],
},
);
let s_prod = Ulid::new();
let s_cons = Ulid::new();
b.register(s_prod, &producer);
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "query").expect("match");
assert_eq!(m.producer_label, "dht");
assert_eq!(m.via, MatchStrategy::Exact);
assert!(!m.pinned);
}
#[test]
fn structural_ignores_interface() {
let mut b = Broker::new(BrokerConfig {
strategy: MatchStrategy::Structural,
});
let producer = card(
"dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow(
"out",
wit("brahman:dht", Some("v1"), "entity-result"),
None,
)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow(
"in",
wit("brahman:dht", Some("v2"), "entity-result"),
None,
)],
output: vec![],
},
);
let s_prod = Ulid::new();
let s_cons = Ulid::new();
b.register(s_prod, &producer);
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
assert_eq!(m.via, MatchStrategy::Structural);
}
#[test]
fn exact_strategy_rejects_interface_mismatch() {
let mut b = Broker::new(BrokerConfig {
strategy: MatchStrategy::Exact,
});
let producer = card(
"dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow(
"out",
wit("brahman:dht", Some("v1"), "entity-result"),
None,
)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow(
"in",
wit("brahman:dht", Some("v2"), "entity-result"),
None,
)],
output: vec![],
},
);
b.register(Ulid::new(), &producer);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
assert!(b.find_producer_for(s_cons, "in").is_none());
}
#[test]
fn exact_then_structural_prefers_exact() {
let mut b = Broker::new(BrokerConfig {
strategy: MatchStrategy::ExactThenStructural,
});
// Productor 1: match estructural (interface diferente)
let p_struct = card(
"dht-cache",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow(
"out",
wit("brahman:dht", Some("v2"), "entity-result"),
None,
)],
},
);
// Productor 2: match exact (interface igual)
let p_exact = card(
"dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow(
"out",
wit("brahman:dht", Some("v1"), "entity-result"),
None,
)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow(
"in",
wit("brahman:dht", Some("v1"), "entity-result"),
None,
)],
output: vec![],
},
);
b.register(Ulid::new(), &p_struct);
b.register(Ulid::new(), &p_exact);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
// El exact gana incluso si tiene priority igual: por estrategia.
assert_eq!(m.producer_label, "dht");
assert_eq!(m.via, MatchStrategy::Exact);
}
#[test]
fn pin_to_overrides_type_search() {
let mut b = Broker::new(BrokerConfig::default());
// Dos productores que producen el mismo tipo.
let p1 = card(
"dht-prod",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let p2 = card(
"dht-test",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), Some("dht-test"))],
output: vec![],
},
);
b.register(Ulid::new(), &p1);
b.register(Ulid::new(), &p2);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
assert_eq!(m.producer_label, "dht-test");
assert!(m.pinned);
}
#[test]
fn pin_to_unresolvable_falls_back_to_type_match() {
let mut b = Broker::new(BrokerConfig::default());
let p = card(
"real-dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), Some("nonexistent"))],
output: vec![],
},
);
b.register(Ulid::new(), &p);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
assert_eq!(m.producer_label, "real-dht");
assert!(!m.pinned);
}
#[test]
fn priority_breaks_ties() {
let mut b = Broker::new(BrokerConfig::default());
let p_low = card(
"z-dht",
Priority::Low,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let p_high = card(
"a-dht",
Priority::High,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), None)],
output: vec![],
},
);
b.register(Ulid::new(), &p_low);
b.register(Ulid::new(), &p_high);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
assert_eq!(m.producer_label, "a-dht"); // priority High > Low
}
#[test]
fn label_alpha_breaks_priority_ties() {
let mut b = Broker::new(BrokerConfig::default());
let p1 = card(
"z-dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let p2 = card(
"a-dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), None)],
output: vec![],
},
);
b.register(Ulid::new(), &p1);
b.register(Ulid::new(), &p2);
let s_cons = Ulid::new();
b.register(s_cons, &consumer);
let m = b.find_producer_for(s_cons, "in").expect("match");
assert_eq!(m.producer_label, "a-dht"); // alfabético gana
}
#[test]
fn unregister_removes_producer() {
let mut b = Broker::new(BrokerConfig::default());
let p = card(
"dht",
Priority::Normal,
Flows {
input: vec![],
output: vec![flow("out", prim("string"), None)],
},
);
let consumer = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), None)],
output: vec![],
},
);
let s_p = Ulid::new();
b.register(s_p, &p);
let s_c = Ulid::new();
b.register(s_c, &consumer);
assert!(b.find_producer_for(s_c, "in").is_some());
b.unregister(s_p);
assert!(b.find_producer_for(s_c, "in").is_none());
}
#[test]
fn no_self_loops() {
let mut b = Broker::new(BrokerConfig::default());
let same = card(
"echo",
Priority::Normal,
Flows {
input: vec![flow("in", prim("string"), None)],
output: vec![flow("out", prim("string"), None)],
},
);
let s = Ulid::new();
b.register(s, &same);
// Solo una Card registrada — no hay otra que produzca string.
assert!(b.find_producer_for(s, "in").is_none());
}
#[test]
fn all_matches_lists_pairs() {
let mut b = Broker::new(BrokerConfig::default());
let dht = card(
"dht",
Priority::Normal,
Flows {
input: vec![flow("query", prim("string"), None)],
output: vec![flow("results", prim("bytes"), None)],
},
);
let ui = card(
"ui",
Priority::Normal,
Flows {
input: vec![flow("data", prim("bytes"), None)],
output: vec![flow("user-input", prim("string"), None)],
},
);
b.register(Ulid::new(), &dht);
b.register(Ulid::new(), &ui);
let matches = b.all_matches();
assert_eq!(matches.len(), 2);
// dht.query ← ui.user-input y ui.data ← dht.results
let pairs: Vec<_> = matches
.iter()
.map(|m| (m.consumer_label.as_str(), m.producer_label.as_str()))
.collect();
assert!(pairs.contains(&("dht", "ui")));
assert!(pairs.contains(&("ui", "dht")));
}
}
+3 -2
View File
@@ -360,8 +360,9 @@ pub enum Lifecycle {
Widget, Widget,
} }
/// Prioridad de scheduling. /// Prioridad de scheduling. Orden: `Low < Normal < High < Critical` —
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] /// usable como tiebreaker en el broker (mayor priority gana).
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Priority { pub enum Priority {
Low, Low,