feat(broker): priority contexts — biases per-contexto operativo
Cierra el último pendiente de feature: el broker ahora puede operar
bajo un contexto (test/prod/foreground/secure/etc) que activa biases
declarados en las Cards.
Schema (brahman-card):
- ContextBias { pin_to: Option<String>, priority_offset: i8 }.
- Card.priority_contexts: BTreeMap<String, ContextBias>, también en
WireCard. Las conversiones From propagan el campo.
Comportamiento (brahman-broker):
- BrokerConfig.current_context: Option<String>. Cuando es Some(ctx) y
una Card tiene priority_contexts.get(ctx), el bias aplica:
- Consumer-side: bias.pin_to sobreescribe Flow.pin_to estático.
- Producer-side: bias.priority_offset se suma a la priority base
(clamp en [Low=0, Critical=3]).
- BrokeredCard propaga priority_contexts. find_producer_for usa
effective_priority y context_bias en lugar de comparar Priority
directo.
Observabilidad:
- AdminConfig.current_context + StatusSnapshot.current_context.
- brahman-status imprime "Context: <nombre>" si está activo.
Wiring:
- ente-zero lee BRAHMAN_BROKER_CONTEXT del entorno y la propaga al
broker y al admin. Sin var, biases inactivos (back-compat total).
Tests nuevos (brahman-broker, +4):
- context_priority_offset_lifts_producer_above_alphabetic_winner:
sin contexto a-prod gana por alfabético; con context "test" b-prod
gana por offset +1.
- context_pin_to_overrides_static_pin: static pin "real-dht", test
override "mock-dht" → mock gana en context "test".
- unknown_context_no_op: biases declarados para "test" no aplican
cuando broker está en "prod".
- priority_offset_clamps_to_critical: offset enorme se clampa a 3.
Validación end-to-end manual:
$ BRAHMAN_BROKER_CONTEXT=test ente-zero &
$ brahman-status
Init: server=0.1.0 protocol=0.1.0 attached=true
Context: test
Tests acumulados: 39 (card 11, broker 15, handshake codec+transport 2 +
integ 7, card-wit 4, admin 0). cargo check --workspace: 0 errores, 0
warnings.
Con esto cierran TODOS los pendientes técnicos abiertos. El único
"pendiente" que queda es el caso real para extender (priority
contexts per-deployment, scheduling biases dinámicos, etc.).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,31 @@ ratio/diff ver `git show <sha>`.
|
|||||||
|
|
||||||
## 2026-05-08
|
## 2026-05-08
|
||||||
|
|
||||||
|
### feat(broker): priority contexts — biases per-contexto operativo
|
||||||
|
- `brahman-card::ContextBias { pin_to: Option<String>, priority_offset: i8 }`
|
||||||
|
declara un override per-contexto.
|
||||||
|
- `Card.priority_contexts: BTreeMap<String, ContextBias>` y mismo en
|
||||||
|
`WireCard` (cruza el wire). Las conversiones `From` lo propagan.
|
||||||
|
- `BrokerConfig.current_context: Option<String>`. Cuando el broker corre
|
||||||
|
bajo un contexto y una Card declara biases para ese nombre, se aplican:
|
||||||
|
- Como **consumidor**: `pin_to` sobreescribe el `Flow.pin_to` estático.
|
||||||
|
- Como **productor**: `priority_offset` se suma a la priority base
|
||||||
|
(clamp en `[Low=0, Critical=3]`) para el ranking.
|
||||||
|
- `BrokeredCard` propaga `priority_contexts`. `find_producer_for` usa
|
||||||
|
`effective_priority(card)` y `effective_pin(card, input)` antes de
|
||||||
|
los tiebreaks.
|
||||||
|
- `brahman-admin::AdminConfig.current_context` + `StatusSnapshot.current_context`
|
||||||
|
espejan el contexto activo. `brahman-status` lo imprime como
|
||||||
|
`Context: <nombre>` justo debajo de `Init: ...`.
|
||||||
|
- `ente-zero` lee `BRAHMAN_BROKER_CONTEXT` env var y la propaga al
|
||||||
|
broker y al admin. Sin var, biases per-contexto inactivos.
|
||||||
|
- 4 tests nuevos en brahman-broker:
|
||||||
|
`context_priority_offset_lifts_producer_above_alphabetic_winner`,
|
||||||
|
`context_pin_to_overrides_static_pin`, `unknown_context_no_op`,
|
||||||
|
`priority_offset_clamps_to_critical`.
|
||||||
|
- Validación end-to-end: `BRAHMAN_BROKER_CONTEXT=test ente-zero` →
|
||||||
|
`brahman-status` muestra `Context: test`.
|
||||||
|
|
||||||
### feat(card): WireCard + extensions — forward-compat sin romper postcard
|
### feat(card): WireCard + extensions — forward-compat sin romper postcard
|
||||||
- `Card.extensions: BTreeMap<String, serde_json::Value>` restaurado con
|
- `Card.extensions: BTreeMap<String, serde_json::Value>` restaurado con
|
||||||
`#[serde(flatten, default, skip_serializing_if = is_empty)]`. Los
|
`#[serde(flatten, default, skip_serializing_if = is_empty)]`. Los
|
||||||
|
|||||||
@@ -14,6 +14,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
"Init: server={} protocol={} attached={}",
|
"Init: server={} protocol={} attached={}",
|
||||||
snap.server_version, snap.protocol_version, snap.init_attached
|
snap.server_version, snap.protocol_version, snap.init_attached
|
||||||
);
|
);
|
||||||
|
if let Some(ctx) = &snap.current_context {
|
||||||
|
println!("Context: {}", ctx);
|
||||||
|
}
|
||||||
println!();
|
println!();
|
||||||
println!("Sessions ({}):", snap.sessions.len());
|
println!("Sessions ({}):", snap.sessions.len());
|
||||||
if snap.sessions.is_empty() {
|
if snap.sessions.is_empty() {
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ use crate::snapshot::StatusSnapshot;
|
|||||||
pub struct AdminConfig {
|
pub struct AdminConfig {
|
||||||
/// `true` si el Init está atado al servidor que aloja este admin.
|
/// `true` si el Init está atado al servidor que aloja este admin.
|
||||||
pub init_attached: bool,
|
pub init_attached: bool,
|
||||||
|
/// Contexto operativo del broker, espejado en el snapshot.
|
||||||
|
pub current_context: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Servidor admin escuchando en un Unix socket.
|
/// Servidor admin escuchando en un Unix socket.
|
||||||
@@ -101,6 +103,7 @@ async fn build_snapshot(broker: &Arc<Mutex<Broker>>, config: &AdminConfig) -> St
|
|||||||
server_version: crate::ADMIN_VERSION.to_string(),
|
server_version: crate::ADMIN_VERSION.to_string(),
|
||||||
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
|
protocol_version: brahman_card::PROTOCOL_VERSION.to_string(),
|
||||||
init_attached: config.init_attached,
|
init_attached: config.init_attached,
|
||||||
|
current_context: config.current_context.clone(),
|
||||||
sessions,
|
sessions,
|
||||||
matches,
|
matches,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,11 @@ pub struct StatusSnapshot {
|
|||||||
pub protocol_version: String,
|
pub protocol_version: String,
|
||||||
/// `true` si el Init está atado al servidor.
|
/// `true` si el Init está atado al servidor.
|
||||||
pub init_attached: bool,
|
pub init_attached: bool,
|
||||||
|
/// Contexto operativo activo del broker (p. ej. `"test"`, `"prod"`).
|
||||||
|
/// `None` si no hay contexto configurado — los biases per-contexto
|
||||||
|
/// declarados en las Cards quedan inactivos.
|
||||||
|
#[serde(default)]
|
||||||
|
pub current_context: Option<String>,
|
||||||
/// Cards actualmente registradas (sesiones vivas).
|
/// Cards actualmente registradas (sesiones vivas).
|
||||||
pub sessions: Vec<BrokeredCard>,
|
pub sessions: Vec<BrokeredCard>,
|
||||||
/// Matches consumer↔producer derivados del set actual.
|
/// Matches consumer↔producer derivados del set actual.
|
||||||
|
|||||||
@@ -30,7 +30,7 @@
|
|||||||
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use brahman_card::{Card, Flow, Lifecycle, Priority, TypeRef, WitInterface};
|
use brahman_card::{Card, ContextBias, Flow, Lifecycle, Priority, TypeRef, WitInterface};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
|
|
||||||
@@ -57,6 +57,10 @@ pub enum MatchStrategy {
|
|||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct BrokerConfig {
|
pub struct BrokerConfig {
|
||||||
pub strategy: MatchStrategy,
|
pub strategy: MatchStrategy,
|
||||||
|
/// Contexto operativo activo. Si una Card declara un
|
||||||
|
/// `priority_contexts.<this>`, ese bias se aplica durante el match.
|
||||||
|
/// `None` = sin biases per-contexto, sólo se usa lo estático.
|
||||||
|
pub current_context: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Vista mínima de una Card que el broker necesita.
|
/// Vista mínima de una Card que el broker necesita.
|
||||||
@@ -70,6 +74,9 @@ pub struct BrokeredCard {
|
|||||||
pub outputs: Vec<Flow>,
|
pub outputs: Vec<Flow>,
|
||||||
/// Interfaz WIT extraída si el módulo es "consciente"; `None` si agnóstico.
|
/// Interfaz WIT extraída si el módulo es "consciente"; `None` si agnóstico.
|
||||||
pub wit: Option<WitInterface>,
|
pub wit: Option<WitInterface>,
|
||||||
|
/// Biases per-contexto, propagados desde `Card.priority_contexts`.
|
||||||
|
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||||
|
pub priority_contexts: BTreeMap<String, ContextBias>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BrokeredCard {
|
impl BrokeredCard {
|
||||||
@@ -82,6 +89,7 @@ impl BrokeredCard {
|
|||||||
inputs: card.flow.input.clone(),
|
inputs: card.flow.input.clone(),
|
||||||
outputs: card.flow.output.clone(),
|
outputs: card.flow.output.clone(),
|
||||||
wit,
|
wit,
|
||||||
|
priority_contexts: card.priority_contexts.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -178,10 +186,16 @@ impl Broker {
|
|||||||
let cons = self.cards.get(&consumer)?;
|
let cons = self.cards.get(&consumer)?;
|
||||||
let input = cons.inputs.iter().find(|f| f.name == input_name)?;
|
let input = cons.inputs.iter().find(|f| f.name == input_name)?;
|
||||||
|
|
||||||
// pin_to: short-circuit. Si la pista resuelve, gana.
|
// pin_to efectivo: bias del contexto activo (si la Card declara
|
||||||
if let Some(pin) = &input.pin_to {
|
// override consumer-side) > pin_to estático del Flow.
|
||||||
|
let context_pin = self
|
||||||
|
.context_bias(cons)
|
||||||
|
.and_then(|b| b.pin_to.as_deref());
|
||||||
|
let effective_pin = context_pin.or(input.pin_to.as_deref());
|
||||||
|
|
||||||
|
if let Some(pin) = effective_pin {
|
||||||
for prod in self.cards.values() {
|
for prod in self.cards.values() {
|
||||||
if prod.session == consumer || &prod.label != pin {
|
if prod.session == consumer || prod.label != pin {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for out in &prod.outputs {
|
for out in &prod.outputs {
|
||||||
@@ -205,9 +219,11 @@ impl Broker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sort por (effective priority desc, label asc). El bias del
|
||||||
|
// contexto puede subir o bajar la priority del productor.
|
||||||
candidates.sort_by(|(a, _, _), (b, _, _)| {
|
candidates.sort_by(|(a, _, _), (b, _, _)| {
|
||||||
b.priority
|
self.effective_priority(b)
|
||||||
.cmp(&a.priority)
|
.cmp(&self.effective_priority(a))
|
||||||
.then_with(|| a.label.cmp(&b.label))
|
.then_with(|| a.label.cmp(&b.label))
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -215,6 +231,26 @@ impl Broker {
|
|||||||
Some(self.make_match(cons, prod, input, out, via, false))
|
Some(self.make_match(cons, prod, input, out, via, false))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Devuelve el `ContextBias` que aplica a este Card en el contexto
|
||||||
|
/// activo (si lo hay).
|
||||||
|
fn context_bias<'a>(&self, card: &'a BrokeredCard) -> Option<&'a ContextBias> {
|
||||||
|
self.config
|
||||||
|
.current_context
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|ctx| card.priority_contexts.get(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Priority efectiva del Card como productor, considerando el bias
|
||||||
|
/// del contexto activo. El offset se clampa a `[Low=0, Critical=3]`.
|
||||||
|
fn effective_priority(&self, card: &BrokeredCard) -> i16 {
|
||||||
|
let base = priority_value(card.priority);
|
||||||
|
let offset = self
|
||||||
|
.context_bias(card)
|
||||||
|
.map(|b| b.priority_offset as i16)
|
||||||
|
.unwrap_or(0);
|
||||||
|
(base + offset).clamp(0, 3)
|
||||||
|
}
|
||||||
|
|
||||||
/// Calcula todos los matches consumer→producer en el set actual.
|
/// Calcula todos los matches consumer→producer en el set actual.
|
||||||
/// Útil para introspección o para que el Admin emita rutas en lote.
|
/// Útil para introspección o para que el Admin emita rutas en lote.
|
||||||
pub fn all_matches(&self) -> Vec<Match> {
|
pub fn all_matches(&self) -> Vec<Match> {
|
||||||
@@ -278,6 +314,15 @@ impl Broker {
|
|||||||
// Predicados de matching (libres, testeables aislados)
|
// Predicados de matching (libres, testeables aislados)
|
||||||
// =====================================================================
|
// =====================================================================
|
||||||
|
|
||||||
|
fn priority_value(p: Priority) -> i16 {
|
||||||
|
match p {
|
||||||
|
Priority::Low => 0,
|
||||||
|
Priority::Normal => 1,
|
||||||
|
Priority::High => 2,
|
||||||
|
Priority::Critical => 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn exact_match(a: &TypeRef, b: &TypeRef) -> bool {
|
fn exact_match(a: &TypeRef, b: &TypeRef) -> bool {
|
||||||
a == b
|
a == b
|
||||||
}
|
}
|
||||||
@@ -343,6 +388,7 @@ mod tests {
|
|||||||
fn exact_match_same_typeref() {
|
fn exact_match_same_typeref() {
|
||||||
let mut b = Broker::new(BrokerConfig {
|
let mut b = Broker::new(BrokerConfig {
|
||||||
strategy: MatchStrategy::Exact,
|
strategy: MatchStrategy::Exact,
|
||||||
|
current_context: None,
|
||||||
});
|
});
|
||||||
let producer = card(
|
let producer = card(
|
||||||
"dht",
|
"dht",
|
||||||
@@ -375,6 +421,7 @@ mod tests {
|
|||||||
fn structural_ignores_interface() {
|
fn structural_ignores_interface() {
|
||||||
let mut b = Broker::new(BrokerConfig {
|
let mut b = Broker::new(BrokerConfig {
|
||||||
strategy: MatchStrategy::Structural,
|
strategy: MatchStrategy::Structural,
|
||||||
|
current_context: None,
|
||||||
});
|
});
|
||||||
let producer = card(
|
let producer = card(
|
||||||
"dht",
|
"dht",
|
||||||
@@ -413,6 +460,7 @@ mod tests {
|
|||||||
fn exact_strategy_rejects_interface_mismatch() {
|
fn exact_strategy_rejects_interface_mismatch() {
|
||||||
let mut b = Broker::new(BrokerConfig {
|
let mut b = Broker::new(BrokerConfig {
|
||||||
strategy: MatchStrategy::Exact,
|
strategy: MatchStrategy::Exact,
|
||||||
|
current_context: None,
|
||||||
});
|
});
|
||||||
let producer = card(
|
let producer = card(
|
||||||
"dht",
|
"dht",
|
||||||
@@ -449,6 +497,7 @@ mod tests {
|
|||||||
fn exact_then_structural_prefers_exact() {
|
fn exact_then_structural_prefers_exact() {
|
||||||
let mut b = Broker::new(BrokerConfig {
|
let mut b = Broker::new(BrokerConfig {
|
||||||
strategy: MatchStrategy::ExactThenStructural,
|
strategy: MatchStrategy::ExactThenStructural,
|
||||||
|
current_context: None,
|
||||||
});
|
});
|
||||||
// Productor 1: match estructural (interface diferente)
|
// Productor 1: match estructural (interface diferente)
|
||||||
let p_struct = card(
|
let p_struct = card(
|
||||||
@@ -716,4 +765,210 @@ mod tests {
|
|||||||
assert!(pairs.contains(&("dht", "ui")));
|
assert!(pairs.contains(&("dht", "ui")));
|
||||||
assert!(pairs.contains(&("ui", "dht")));
|
assert!(pairs.contains(&("ui", "dht")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===========================================================
|
||||||
|
// Priority contexts
|
||||||
|
// ===========================================================
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn context_priority_offset_lifts_producer_above_alphabetic_winner() {
|
||||||
|
// Sin contexto, "a-prod" gana contra "b-prod" (alfabético).
|
||||||
|
// En contexto "test", b-prod tiene offset +1 → debería ganar.
|
||||||
|
let mut a_prod = card(
|
||||||
|
"a-prod",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
a_prod.priority_contexts = std::collections::BTreeMap::new(); // explícito vacío
|
||||||
|
|
||||||
|
let mut b_prod = card(
|
||||||
|
"b-prod",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
b_prod.priority_contexts.insert(
|
||||||
|
"test".into(),
|
||||||
|
ContextBias {
|
||||||
|
pin_to: None,
|
||||||
|
priority_offset: 1,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let consumer = card(
|
||||||
|
"ui",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![flow("in", prim("string"), None)],
|
||||||
|
output: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let s_cons = Ulid::new();
|
||||||
|
|
||||||
|
// Caso 1: sin contexto → a-prod gana (alfabético).
|
||||||
|
let mut b = Broker::new(BrokerConfig {
|
||||||
|
strategy: MatchStrategy::default(),
|
||||||
|
current_context: None,
|
||||||
|
});
|
||||||
|
b.register(Ulid::new(), &a_prod, None);
|
||||||
|
b.register(Ulid::new(), &b_prod, None);
|
||||||
|
b.register(s_cons, &consumer, None);
|
||||||
|
let m = b.find_producer_for(s_cons, "in").unwrap();
|
||||||
|
assert_eq!(m.producer_label, "a-prod");
|
||||||
|
|
||||||
|
// Caso 2: contexto "test" → b-prod gana por offset +1.
|
||||||
|
let mut b = Broker::new(BrokerConfig {
|
||||||
|
strategy: MatchStrategy::default(),
|
||||||
|
current_context: Some("test".into()),
|
||||||
|
});
|
||||||
|
b.register(Ulid::new(), &a_prod, None);
|
||||||
|
b.register(Ulid::new(), &b_prod, None);
|
||||||
|
b.register(s_cons, &consumer, None);
|
||||||
|
let m = b.find_producer_for(s_cons, "in").unwrap();
|
||||||
|
assert_eq!(m.producer_label, "b-prod");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn context_pin_to_overrides_static_pin() {
|
||||||
|
// Consumer pinea estático a "real-dht", pero en contexto "test"
|
||||||
|
// declara override a "mock-dht".
|
||||||
|
let real = card(
|
||||||
|
"real-dht",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let mock = card(
|
||||||
|
"mock-dht",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let mut consumer = card(
|
||||||
|
"ui",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![flow("in", prim("string"), Some("real-dht"))],
|
||||||
|
output: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
consumer.priority_contexts.insert(
|
||||||
|
"test".into(),
|
||||||
|
ContextBias {
|
||||||
|
pin_to: Some("mock-dht".into()),
|
||||||
|
priority_offset: 0,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let s_cons = Ulid::new();
|
||||||
|
|
||||||
|
// Caso 1: sin contexto → static pin gana ("real-dht").
|
||||||
|
let mut b = Broker::new(BrokerConfig::default());
|
||||||
|
b.register(Ulid::new(), &real, None);
|
||||||
|
b.register(Ulid::new(), &mock, None);
|
||||||
|
b.register(s_cons, &consumer, None);
|
||||||
|
let m = b.find_producer_for(s_cons, "in").unwrap();
|
||||||
|
assert_eq!(m.producer_label, "real-dht");
|
||||||
|
assert!(m.pinned);
|
||||||
|
|
||||||
|
// Caso 2: contexto "test" → context override gana ("mock-dht").
|
||||||
|
let mut b = Broker::new(BrokerConfig {
|
||||||
|
strategy: MatchStrategy::default(),
|
||||||
|
current_context: Some("test".into()),
|
||||||
|
});
|
||||||
|
b.register(Ulid::new(), &real, None);
|
||||||
|
b.register(Ulid::new(), &mock, None);
|
||||||
|
b.register(s_cons, &consumer, None);
|
||||||
|
let m = b.find_producer_for(s_cons, "in").unwrap();
|
||||||
|
assert_eq!(m.producer_label, "mock-dht");
|
||||||
|
assert!(m.pinned);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unknown_context_no_op() {
|
||||||
|
// Si la Card declara biases para "test" pero el broker está en
|
||||||
|
// "prod", los biases no aplican.
|
||||||
|
let mut b_prod = card(
|
||||||
|
"b-prod",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
b_prod.priority_contexts.insert(
|
||||||
|
"test".into(),
|
||||||
|
ContextBias {
|
||||||
|
pin_to: None,
|
||||||
|
priority_offset: 5,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let a_prod = card(
|
||||||
|
"a-prod",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let consumer = card(
|
||||||
|
"ui",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![flow("in", prim("string"), None)],
|
||||||
|
output: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut b = Broker::new(BrokerConfig {
|
||||||
|
strategy: MatchStrategy::default(),
|
||||||
|
current_context: Some("prod".into()),
|
||||||
|
});
|
||||||
|
let s_cons = Ulid::new();
|
||||||
|
b.register(Ulid::new(), &a_prod, None);
|
||||||
|
b.register(Ulid::new(), &b_prod, None);
|
||||||
|
b.register(s_cons, &consumer, None);
|
||||||
|
|
||||||
|
// En contexto "prod" sin biases declarados, gana por alfabético.
|
||||||
|
let m = b.find_producer_for(s_cons, "in").unwrap();
|
||||||
|
assert_eq!(m.producer_label, "a-prod");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn priority_offset_clamps_to_critical() {
|
||||||
|
// Offset enorme no debe hacer overflow ni saltar fuera del rango.
|
||||||
|
let mut prod = card(
|
||||||
|
"p",
|
||||||
|
Priority::Normal,
|
||||||
|
Flows {
|
||||||
|
input: vec![],
|
||||||
|
output: vec![flow("out", prim("string"), None)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
prod.priority_contexts.insert(
|
||||||
|
"x".into(),
|
||||||
|
ContextBias {
|
||||||
|
pin_to: None,
|
||||||
|
priority_offset: 100,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let b = Broker::new(BrokerConfig {
|
||||||
|
strategy: MatchStrategy::default(),
|
||||||
|
current_context: Some("x".into()),
|
||||||
|
});
|
||||||
|
let bc = BrokeredCard::from_card(Ulid::new(), &prod, None);
|
||||||
|
// effective_priority debe estar clampada a 3 (Critical), no 101.
|
||||||
|
assert_eq!(b.effective_priority(&bc), 3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,6 +130,13 @@ pub struct Card {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub genesis: Vec<Card>,
|
pub genesis: Vec<Card>,
|
||||||
|
|
||||||
|
/// Biases per-contexto. La key es el nombre del contexto (p. ej.
|
||||||
|
/// `"test"`, `"prod"`, `"foreground"`). Cuando el broker está
|
||||||
|
/// configurado bajo ese contexto, el bias se aplica. Sin contexto
|
||||||
|
/// activo o sin entrada matching, este campo no afecta el ranking.
|
||||||
|
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||||
|
pub priority_contexts: BTreeMap<String, ContextBias>,
|
||||||
|
|
||||||
/// Campos JSON/TOML desconocidos preservados durante I/O de archivos
|
/// Campos JSON/TOML desconocidos preservados durante I/O de archivos
|
||||||
/// (forward-compat). **No se transmiten por wire (postcard)** — la
|
/// (forward-compat). **No se transmiten por wire (postcard)** — la
|
||||||
/// proyección a [`WireCard`] los descarta porque `serde_json::Value`
|
/// proyección a [`WireCard`] los descarta porque `serde_json::Value`
|
||||||
@@ -159,6 +166,7 @@ impl Default for Card {
|
|||||||
priority: Priority::default(),
|
priority: Priority::default(),
|
||||||
flow: Flows::default(),
|
flow: Flows::default(),
|
||||||
genesis: Vec::new(),
|
genesis: Vec::new(),
|
||||||
|
priority_contexts: BTreeMap::new(),
|
||||||
extensions: BTreeMap::new(),
|
extensions: BTreeMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -386,6 +394,36 @@ pub enum Priority {
|
|||||||
Critical,
|
Critical,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Override per-contexto sobre los matches del broker.
|
||||||
|
///
|
||||||
|
/// La Card declara biases bajo `priority_contexts.<nombre>` que se
|
||||||
|
/// activan cuando el broker corre bajo ese contexto. Aplicación según rol:
|
||||||
|
///
|
||||||
|
/// - **Como consumidor**: `pin_to` sobreescribe el `pin_to` estático del
|
||||||
|
/// `Flow.pin_to` durante la búsqueda de productores.
|
||||||
|
/// - **Como productor**: `priority_offset` se suma a la priority base
|
||||||
|
/// (saturando en `[Low, Critical]`) para el ranking de candidatos.
|
||||||
|
///
|
||||||
|
/// Casos de uso típicos: test↔prod (mock vs real), foreground↔background
|
||||||
|
/// (latencia vs costo), trust gates (sólo productores con cierto nivel).
|
||||||
|
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ContextBias {
|
||||||
|
/// Override del `pin_to` estático cuando el broker está en este
|
||||||
|
/// contexto y la Card actúa como consumidor.
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub pin_to: Option<String>,
|
||||||
|
|
||||||
|
/// Modifica la priority efectiva del Card como productor.
|
||||||
|
/// `+1` lo eleva, `-1` lo baja. El resultado se clampa al rango de
|
||||||
|
/// `Priority` ([Low, Critical]).
|
||||||
|
#[serde(default, skip_serializing_if = "is_zero_i8")]
|
||||||
|
pub priority_offset: i8,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_zero_i8(v: &i8) -> bool {
|
||||||
|
*v == 0
|
||||||
|
}
|
||||||
|
|
||||||
// =====================================================================
|
// =====================================================================
|
||||||
// Flujos tipados (del modelo brahman)
|
// Flujos tipados (del modelo brahman)
|
||||||
// =====================================================================
|
// =====================================================================
|
||||||
@@ -702,6 +740,8 @@ pub struct WireCard {
|
|||||||
pub flow: Flows,
|
pub flow: Flows,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub genesis: Vec<WireCard>,
|
pub genesis: Vec<WireCard>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub priority_contexts: BTreeMap<String, ContextBias>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Card> for WireCard {
|
impl From<Card> for WireCard {
|
||||||
@@ -721,6 +761,7 @@ impl From<Card> for WireCard {
|
|||||||
priority: c.priority,
|
priority: c.priority,
|
||||||
flow: c.flow,
|
flow: c.flow,
|
||||||
genesis: c.genesis.into_iter().map(WireCard::from).collect(),
|
genesis: c.genesis.into_iter().map(WireCard::from).collect(),
|
||||||
|
priority_contexts: c.priority_contexts,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -742,6 +783,7 @@ impl From<WireCard> for Card {
|
|||||||
priority: w.priority,
|
priority: w.priority,
|
||||||
flow: w.flow,
|
flow: w.flow,
|
||||||
genesis: w.genesis.into_iter().map(Card::from).collect(),
|
genesis: w.genesis.into_iter().map(Card::from).collect(),
|
||||||
|
priority_contexts: w.priority_contexts,
|
||||||
extensions: BTreeMap::new(),
|
extensions: BTreeMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,8 +145,18 @@ async fn primordial_loop(
|
|||||||
// tipados. Si el bind falla (socket en uso, FS no escribible),
|
// tipados. Si el bind falla (socket en uso, FS no escribible),
|
||||||
// degradamos a "modo bus-only" — la doctrina de PID 1 no rompe
|
// degradamos a "modo bus-only" — la doctrina de PID 1 no rompe
|
||||||
// por subsistemas opcionales.
|
// por subsistemas opcionales.
|
||||||
|
// Contexto operativo del broker: configurable por env var. Útil para
|
||||||
|
// distinguir test/prod/foreground sin recompilar. Sin la var, los
|
||||||
|
// biases per-contexto declarados en las Cards quedan inactivos.
|
||||||
|
let broker_context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok();
|
||||||
|
if let Some(ctx) = &broker_context {
|
||||||
|
info!(context = %ctx, "brahman broker bajo contexto operativo");
|
||||||
|
}
|
||||||
let brahman_broker = std::sync::Arc::new(tokio::sync::Mutex::new(
|
let brahman_broker = std::sync::Arc::new(tokio::sync::Mutex::new(
|
||||||
brahman_broker::Broker::new(brahman_broker::BrokerConfig::default()),
|
brahman_broker::Broker::new(brahman_broker::BrokerConfig {
|
||||||
|
strategy: brahman_broker::MatchStrategy::default(),
|
||||||
|
current_context: broker_context.clone(),
|
||||||
|
}),
|
||||||
));
|
));
|
||||||
let brahman_sock = brahman_handshake::transport::default_socket_path();
|
let brahman_sock = brahman_handshake::transport::default_socket_path();
|
||||||
match brahman_handshake::server::Server::bind(
|
match brahman_handshake::server::Server::bind(
|
||||||
@@ -177,6 +187,7 @@ async fn primordial_loop(
|
|||||||
brahman_broker.clone(),
|
brahman_broker.clone(),
|
||||||
brahman_admin::server::AdminConfig {
|
brahman_admin::server::AdminConfig {
|
||||||
init_attached: true,
|
init_attached: true,
|
||||||
|
current_context: broker_context.clone(),
|
||||||
},
|
},
|
||||||
) {
|
) {
|
||||||
Ok(admin) => {
|
Ok(admin) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user