diff --git a/CHANGELOG.md b/CHANGELOG.md index 335a036..bd4189e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,31 @@ ratio/diff ver `git show `. ## 2026-05-08 +### feat(broker): priority contexts — biases per-contexto operativo +- `brahman-card::ContextBias { pin_to: Option, priority_offset: i8 }` + declara un override per-contexto. +- `Card.priority_contexts: BTreeMap` y mismo en + `WireCard` (cruza el wire). Las conversiones `From` lo propagan. +- `BrokerConfig.current_context: Option`. Cuando el broker corre + bajo un contexto y una Card declara biases para ese nombre, se aplican: + - Como **consumidor**: `pin_to` sobreescribe el `Flow.pin_to` estático. + - Como **productor**: `priority_offset` se suma a la priority base + (clamp en `[Low=0, Critical=3]`) para el ranking. +- `BrokeredCard` propaga `priority_contexts`. `find_producer_for` usa + `effective_priority(card)` y `effective_pin(card, input)` antes de + los tiebreaks. +- `brahman-admin::AdminConfig.current_context` + `StatusSnapshot.current_context` + espejan el contexto activo. `brahman-status` lo imprime como + `Context: ` justo debajo de `Init: ...`. +- `ente-zero` lee `BRAHMAN_BROKER_CONTEXT` env var y la propaga al + broker y al admin. Sin var, biases per-contexto inactivos. +- 4 tests nuevos en brahman-broker: + `context_priority_offset_lifts_producer_above_alphabetic_winner`, + `context_pin_to_overrides_static_pin`, `unknown_context_no_op`, + `priority_offset_clamps_to_critical`. +- Validación end-to-end: `BRAHMAN_BROKER_CONTEXT=test ente-zero` → + `brahman-status` muestra `Context: test`. + ### feat(card): WireCard + extensions — forward-compat sin romper postcard - `Card.extensions: BTreeMap` restaurado con `#[serde(flatten, default, skip_serializing_if = is_empty)]`. Los diff --git a/crates/core/brahman-admin/examples/brahman-status.rs b/crates/core/brahman-admin/examples/brahman-status.rs index 3c6f004..d8dc44c 100644 --- a/crates/core/brahman-admin/examples/brahman-status.rs +++ b/crates/core/brahman-admin/examples/brahman-status.rs @@ -14,6 +14,9 @@ async fn main() -> anyhow::Result<()> { "Init: server={} protocol={} attached={}", snap.server_version, snap.protocol_version, snap.init_attached ); + if let Some(ctx) = &snap.current_context { + println!("Context: {}", ctx); + } println!(); println!("Sessions ({}):", snap.sessions.len()); if snap.sessions.is_empty() { diff --git a/crates/core/brahman-admin/src/server.rs b/crates/core/brahman-admin/src/server.rs index a04285e..0aa5b56 100644 --- a/crates/core/brahman-admin/src/server.rs +++ b/crates/core/brahman-admin/src/server.rs @@ -16,6 +16,8 @@ use crate::snapshot::StatusSnapshot; pub struct AdminConfig { /// `true` si el Init está atado al servidor que aloja este admin. pub init_attached: bool, + /// Contexto operativo del broker, espejado en el snapshot. + pub current_context: Option, } /// Servidor admin escuchando en un Unix socket. @@ -101,6 +103,7 @@ async fn build_snapshot(broker: &Arc>, config: &AdminConfig) -> St server_version: crate::ADMIN_VERSION.to_string(), protocol_version: brahman_card::PROTOCOL_VERSION.to_string(), init_attached: config.init_attached, + current_context: config.current_context.clone(), sessions, matches, } diff --git a/crates/core/brahman-admin/src/snapshot.rs b/crates/core/brahman-admin/src/snapshot.rs index 8a4df41..9056996 100644 --- a/crates/core/brahman-admin/src/snapshot.rs +++ b/crates/core/brahman-admin/src/snapshot.rs @@ -12,6 +12,11 @@ pub struct StatusSnapshot { pub protocol_version: String, /// `true` si el Init está atado al servidor. pub init_attached: bool, + /// Contexto operativo activo del broker (p. ej. `"test"`, `"prod"`). + /// `None` si no hay contexto configurado — los biases per-contexto + /// declarados en las Cards quedan inactivos. + #[serde(default)] + pub current_context: Option, /// Cards actualmente registradas (sesiones vivas). pub sessions: Vec, /// Matches consumer↔producer derivados del set actual. diff --git a/crates/core/brahman-broker/src/lib.rs b/crates/core/brahman-broker/src/lib.rs index 8aa6dfe..33799d8 100644 --- a/crates/core/brahman-broker/src/lib.rs +++ b/crates/core/brahman-broker/src/lib.rs @@ -30,7 +30,7 @@ use std::collections::BTreeMap; -use brahman_card::{Card, Flow, Lifecycle, Priority, TypeRef, WitInterface}; +use brahman_card::{Card, ContextBias, Flow, Lifecycle, Priority, TypeRef, WitInterface}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -57,6 +57,10 @@ pub enum MatchStrategy { #[derive(Debug, Clone, Default)] pub struct BrokerConfig { pub strategy: MatchStrategy, + /// Contexto operativo activo. Si una Card declara un + /// `priority_contexts.`, ese bias se aplica durante el match. + /// `None` = sin biases per-contexto, sólo se usa lo estático. + pub current_context: Option, } /// Vista mínima de una Card que el broker necesita. @@ -70,6 +74,9 @@ pub struct BrokeredCard { pub outputs: Vec, /// Interfaz WIT extraída si el módulo es "consciente"; `None` si agnóstico. pub wit: Option, + /// Biases per-contexto, propagados desde `Card.priority_contexts`. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub priority_contexts: BTreeMap, } impl BrokeredCard { @@ -82,6 +89,7 @@ impl BrokeredCard { inputs: card.flow.input.clone(), outputs: card.flow.output.clone(), wit, + priority_contexts: card.priority_contexts.clone(), } } } @@ -178,10 +186,16 @@ impl Broker { let cons = self.cards.get(&consumer)?; let input = cons.inputs.iter().find(|f| f.name == input_name)?; - // pin_to: short-circuit. Si la pista resuelve, gana. - if let Some(pin) = &input.pin_to { + // pin_to efectivo: bias del contexto activo (si la Card declara + // override consumer-side) > pin_to estático del Flow. + let context_pin = self + .context_bias(cons) + .and_then(|b| b.pin_to.as_deref()); + let effective_pin = context_pin.or(input.pin_to.as_deref()); + + if let Some(pin) = effective_pin { for prod in self.cards.values() { - if prod.session == consumer || &prod.label != pin { + if prod.session == consumer || prod.label != pin { continue; } for out in &prod.outputs { @@ -205,9 +219,11 @@ impl Broker { } } + // Sort por (effective priority desc, label asc). El bias del + // contexto puede subir o bajar la priority del productor. candidates.sort_by(|(a, _, _), (b, _, _)| { - b.priority - .cmp(&a.priority) + self.effective_priority(b) + .cmp(&self.effective_priority(a)) .then_with(|| a.label.cmp(&b.label)) }); @@ -215,6 +231,26 @@ impl Broker { Some(self.make_match(cons, prod, input, out, via, false)) } + /// Devuelve el `ContextBias` que aplica a este Card en el contexto + /// activo (si lo hay). + fn context_bias<'a>(&self, card: &'a BrokeredCard) -> Option<&'a ContextBias> { + self.config + .current_context + .as_ref() + .and_then(|ctx| card.priority_contexts.get(ctx)) + } + + /// Priority efectiva del Card como productor, considerando el bias + /// del contexto activo. El offset se clampa a `[Low=0, Critical=3]`. + fn effective_priority(&self, card: &BrokeredCard) -> i16 { + let base = priority_value(card.priority); + let offset = self + .context_bias(card) + .map(|b| b.priority_offset as i16) + .unwrap_or(0); + (base + offset).clamp(0, 3) + } + /// Calcula todos los matches consumer→producer en el set actual. /// Útil para introspección o para que el Admin emita rutas en lote. pub fn all_matches(&self) -> Vec { @@ -278,6 +314,15 @@ impl Broker { // Predicados de matching (libres, testeables aislados) // ===================================================================== +fn priority_value(p: Priority) -> i16 { + match p { + Priority::Low => 0, + Priority::Normal => 1, + Priority::High => 2, + Priority::Critical => 3, + } +} + fn exact_match(a: &TypeRef, b: &TypeRef) -> bool { a == b } @@ -343,6 +388,7 @@ mod tests { fn exact_match_same_typeref() { let mut b = Broker::new(BrokerConfig { strategy: MatchStrategy::Exact, + current_context: None, }); let producer = card( "dht", @@ -375,6 +421,7 @@ mod tests { fn structural_ignores_interface() { let mut b = Broker::new(BrokerConfig { strategy: MatchStrategy::Structural, + current_context: None, }); let producer = card( "dht", @@ -413,6 +460,7 @@ mod tests { fn exact_strategy_rejects_interface_mismatch() { let mut b = Broker::new(BrokerConfig { strategy: MatchStrategy::Exact, + current_context: None, }); let producer = card( "dht", @@ -449,6 +497,7 @@ mod tests { fn exact_then_structural_prefers_exact() { let mut b = Broker::new(BrokerConfig { strategy: MatchStrategy::ExactThenStructural, + current_context: None, }); // Productor 1: match estructural (interface diferente) let p_struct = card( @@ -716,4 +765,210 @@ mod tests { assert!(pairs.contains(&("dht", "ui"))); assert!(pairs.contains(&("ui", "dht"))); } + + // =========================================================== + // Priority contexts + // =========================================================== + + #[test] + fn context_priority_offset_lifts_producer_above_alphabetic_winner() { + // Sin contexto, "a-prod" gana contra "b-prod" (alfabético). + // En contexto "test", b-prod tiene offset +1 → debería ganar. + let mut a_prod = card( + "a-prod", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + a_prod.priority_contexts = std::collections::BTreeMap::new(); // explícito vacío + + let mut b_prod = card( + "b-prod", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + b_prod.priority_contexts.insert( + "test".into(), + ContextBias { + pin_to: None, + priority_offset: 1, + }, + ); + + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![], + }, + ); + + let s_cons = Ulid::new(); + + // Caso 1: sin contexto → a-prod gana (alfabético). + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: None, + }); + b.register(Ulid::new(), &a_prod, None); + b.register(Ulid::new(), &b_prod, None); + b.register(s_cons, &consumer, None); + let m = b.find_producer_for(s_cons, "in").unwrap(); + assert_eq!(m.producer_label, "a-prod"); + + // Caso 2: contexto "test" → b-prod gana por offset +1. + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: Some("test".into()), + }); + b.register(Ulid::new(), &a_prod, None); + b.register(Ulid::new(), &b_prod, None); + b.register(s_cons, &consumer, None); + let m = b.find_producer_for(s_cons, "in").unwrap(); + assert_eq!(m.producer_label, "b-prod"); + } + + #[test] + fn context_pin_to_overrides_static_pin() { + // Consumer pinea estático a "real-dht", pero en contexto "test" + // declara override a "mock-dht". + let real = card( + "real-dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let mock = card( + "mock-dht", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let mut consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), Some("real-dht"))], + output: vec![], + }, + ); + consumer.priority_contexts.insert( + "test".into(), + ContextBias { + pin_to: Some("mock-dht".into()), + priority_offset: 0, + }, + ); + + let s_cons = Ulid::new(); + + // Caso 1: sin contexto → static pin gana ("real-dht"). + let mut b = Broker::new(BrokerConfig::default()); + b.register(Ulid::new(), &real, None); + b.register(Ulid::new(), &mock, None); + b.register(s_cons, &consumer, None); + let m = b.find_producer_for(s_cons, "in").unwrap(); + assert_eq!(m.producer_label, "real-dht"); + assert!(m.pinned); + + // Caso 2: contexto "test" → context override gana ("mock-dht"). + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: Some("test".into()), + }); + b.register(Ulid::new(), &real, None); + b.register(Ulid::new(), &mock, None); + b.register(s_cons, &consumer, None); + let m = b.find_producer_for(s_cons, "in").unwrap(); + assert_eq!(m.producer_label, "mock-dht"); + assert!(m.pinned); + } + + #[test] + fn unknown_context_no_op() { + // Si la Card declara biases para "test" pero el broker está en + // "prod", los biases no aplican. + let mut b_prod = card( + "b-prod", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + b_prod.priority_contexts.insert( + "test".into(), + ContextBias { + pin_to: None, + priority_offset: 5, + }, + ); + let a_prod = card( + "a-prod", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + let consumer = card( + "ui", + Priority::Normal, + Flows { + input: vec![flow("in", prim("string"), None)], + output: vec![], + }, + ); + + let mut b = Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: Some("prod".into()), + }); + let s_cons = Ulid::new(); + b.register(Ulid::new(), &a_prod, None); + b.register(Ulid::new(), &b_prod, None); + b.register(s_cons, &consumer, None); + + // En contexto "prod" sin biases declarados, gana por alfabético. + let m = b.find_producer_for(s_cons, "in").unwrap(); + assert_eq!(m.producer_label, "a-prod"); + } + + #[test] + fn priority_offset_clamps_to_critical() { + // Offset enorme no debe hacer overflow ni saltar fuera del rango. + let mut prod = card( + "p", + Priority::Normal, + Flows { + input: vec![], + output: vec![flow("out", prim("string"), None)], + }, + ); + prod.priority_contexts.insert( + "x".into(), + ContextBias { + pin_to: None, + priority_offset: 100, + }, + ); + + let b = Broker::new(BrokerConfig { + strategy: MatchStrategy::default(), + current_context: Some("x".into()), + }); + let bc = BrokeredCard::from_card(Ulid::new(), &prod, None); + // effective_priority debe estar clampada a 3 (Critical), no 101. + assert_eq!(b.effective_priority(&bc), 3); + } } diff --git a/crates/core/brahman-card/src/lib.rs b/crates/core/brahman-card/src/lib.rs index fb5e99e..ae364e6 100644 --- a/crates/core/brahman-card/src/lib.rs +++ b/crates/core/brahman-card/src/lib.rs @@ -130,6 +130,13 @@ pub struct Card { #[serde(default)] pub genesis: Vec, + /// Biases per-contexto. La key es el nombre del contexto (p. ej. + /// `"test"`, `"prod"`, `"foreground"`). Cuando el broker está + /// configurado bajo ese contexto, el bias se aplica. Sin contexto + /// activo o sin entrada matching, este campo no afecta el ranking. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub priority_contexts: BTreeMap, + /// Campos JSON/TOML desconocidos preservados durante I/O de archivos /// (forward-compat). **No se transmiten por wire (postcard)** — la /// proyección a [`WireCard`] los descarta porque `serde_json::Value` @@ -159,6 +166,7 @@ impl Default for Card { priority: Priority::default(), flow: Flows::default(), genesis: Vec::new(), + priority_contexts: BTreeMap::new(), extensions: BTreeMap::new(), } } @@ -386,6 +394,36 @@ pub enum Priority { Critical, } +/// Override per-contexto sobre los matches del broker. +/// +/// La Card declara biases bajo `priority_contexts.` que se +/// activan cuando el broker corre bajo ese contexto. Aplicación según rol: +/// +/// - **Como consumidor**: `pin_to` sobreescribe el `pin_to` estático del +/// `Flow.pin_to` durante la búsqueda de productores. +/// - **Como productor**: `priority_offset` se suma a la priority base +/// (saturando en `[Low, Critical]`) para el ranking de candidatos. +/// +/// Casos de uso típicos: test↔prod (mock vs real), foreground↔background +/// (latencia vs costo), trust gates (sólo productores con cierto nivel). +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct ContextBias { + /// Override del `pin_to` estático cuando el broker está en este + /// contexto y la Card actúa como consumidor. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pin_to: Option, + + /// Modifica la priority efectiva del Card como productor. + /// `+1` lo eleva, `-1` lo baja. El resultado se clampa al rango de + /// `Priority` ([Low, Critical]). + #[serde(default, skip_serializing_if = "is_zero_i8")] + pub priority_offset: i8, +} + +fn is_zero_i8(v: &i8) -> bool { + *v == 0 +} + // ===================================================================== // Flujos tipados (del modelo brahman) // ===================================================================== @@ -702,6 +740,8 @@ pub struct WireCard { pub flow: Flows, #[serde(default)] pub genesis: Vec, + #[serde(default)] + pub priority_contexts: BTreeMap, } impl From for WireCard { @@ -721,6 +761,7 @@ impl From for WireCard { priority: c.priority, flow: c.flow, genesis: c.genesis.into_iter().map(WireCard::from).collect(), + priority_contexts: c.priority_contexts, } } } @@ -742,6 +783,7 @@ impl From for Card { priority: w.priority, flow: w.flow, genesis: w.genesis.into_iter().map(Card::from).collect(), + priority_contexts: w.priority_contexts, extensions: BTreeMap::new(), } } diff --git a/crates/core/ente-zero/src/main.rs b/crates/core/ente-zero/src/main.rs index b0c156a..db5c526 100644 --- a/crates/core/ente-zero/src/main.rs +++ b/crates/core/ente-zero/src/main.rs @@ -145,8 +145,18 @@ async fn primordial_loop( // tipados. Si el bind falla (socket en uso, FS no escribible), // degradamos a "modo bus-only" — la doctrina de PID 1 no rompe // por subsistemas opcionales. + // Contexto operativo del broker: configurable por env var. Útil para + // distinguir test/prod/foreground sin recompilar. Sin la var, los + // biases per-contexto declarados en las Cards quedan inactivos. + let broker_context = std::env::var("BRAHMAN_BROKER_CONTEXT").ok(); + if let Some(ctx) = &broker_context { + info!(context = %ctx, "brahman broker bajo contexto operativo"); + } let brahman_broker = std::sync::Arc::new(tokio::sync::Mutex::new( - brahman_broker::Broker::new(brahman_broker::BrokerConfig::default()), + brahman_broker::Broker::new(brahman_broker::BrokerConfig { + strategy: brahman_broker::MatchStrategy::default(), + current_context: broker_context.clone(), + }), )); let brahman_sock = brahman_handshake::transport::default_socket_path(); match brahman_handshake::server::Server::bind( @@ -177,6 +187,7 @@ async fn primordial_loop( brahman_broker.clone(), brahman_admin::server::AdminConfig { init_attached: true, + current_context: broker_context.clone(), }, ) { Ok(admin) => {