79d42aba28
Cleanup de drift de convenciones: nakui-core era el unico crate del
monorepo que manteia version, edition y thiserror hardcoded, mientras
el resto heredaba del workspace y usaba thiserror v2. Eso significaba
que un bump global de version o edition se olvidaba sistematicamente
de nakui.
Cambios:
- [package]: version, edition, rust-version, license, authors, publish
-> todos *.workspace = true. Agregado description (convencion).
- Deps compartidas migradas a { workspace = true }: serde, serde_json,
thiserror (v1->v2), tokio, ulid, sha2.
- uuid migrado a { workspace = true, features = ["serde"] } — la feature
serde no esta en el workspace dep porque nakui es el unico user;
queda local opt-in en lugar de inflar el dep comun.
- Deps especificas de nakui (sin comparticion posible): rhai, petgraph,
surrealdb permanecen inline con version local.
Verificacion: cargo build -p nakui-core verde tras el bump thiserror
v1->v2 — los 14+ enums de error de nakui no requirieron ajustes
(derive backwards-compat para patrones simples). cargo test -p
nakui-core --lib: 27/27 verdes.
Bonus en este commit: discovery.rs movio el import Ulid a #[cfg(test)]
porque el refactor a Card::new lo dejo unused en module-scope.
214 lines
7.4 KiB
Rust
214 lines
7.4 KiB
Rust
//! `brahman-sidecar::discovery` — API reusable para que un módulo
|
|
//! consumer encuentre proveedores vivos vía broker, sin hardcodear
|
|
//! sockets ni reimplementar el patrón a mano.
|
|
//!
|
|
//! Es la generalización de `discover_producer_socket` del CLI
|
|
//! `nouser attract --remote`: declarás el `TypeRef` que querés
|
|
//! consumir y el broker te empuja un `MatchEvent::Available` con el
|
|
//! `producer_service_socket` del primer proveedor matched.
|
|
//!
|
|
//! Pipeline:
|
|
//! 1. `build_consumer_card(label, flow_name, type_name)` arma una
|
|
//! Card mínima (Ente, Oneshot, Virtual) con un input flow.
|
|
//! 2. `await_provider(card, timeout)` se conecta al brahman-init,
|
|
//! espera hasta `timeout` por `MatchEvent::Available`, devuelve
|
|
//! el socket del proveedor electo, y envía Farewell.
|
|
//! 3. Para mundos blocking (CLIs, tests, std-thread loops) hay
|
|
//! `await_provider_blocking` que arma su propio runtime
|
|
//! `current_thread`.
|
|
//!
|
|
//! Quién elige al proveedor es el broker, no este módulo. Si el
|
|
//! broker tiene `priority_contexts` activo, podés cambiar de
|
|
//! proveedor sin tocar el consumer; el matching dinámico se respeta.
|
|
|
|
use std::path::PathBuf;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use brahman_card::{
|
|
Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef,
|
|
};
|
|
use brahman_handshake::client::{Client, ClientError};
|
|
use brahman_handshake::messages::MatchEventKind;
|
|
use brahman_handshake::transport;
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum ConsumerError {
|
|
#[error("no se pudo conectar al init en {socket}: {source}")]
|
|
Connect {
|
|
socket: PathBuf,
|
|
#[source]
|
|
source: ClientError,
|
|
},
|
|
#[error("error en cliente brahman: {0}")]
|
|
Client(#[from] ClientError),
|
|
#[error("timeout {timeout:?} sin proveedor disponible para flow '{flow}' (type '{type_ref}')")]
|
|
NoProvider {
|
|
flow: String,
|
|
type_ref: String,
|
|
timeout: Duration,
|
|
},
|
|
#[error("no se pudo crear runtime tokio: {0}")]
|
|
Runtime(String),
|
|
}
|
|
|
|
/// Construye una Card mínima de consumer que declara un input flow
|
|
/// con el `TypeRef::Primitive { name }` solicitado. Usá esto para
|
|
/// el caso común; si necesitás algo más rico (output flows,
|
|
/// permissions, references) construí la Card a mano y pasala a
|
|
/// [`await_provider`] directamente.
|
|
pub fn build_consumer_card(
|
|
consumer_label: impl Into<String>,
|
|
flow_name: impl Into<String>,
|
|
type_name: impl Into<String>,
|
|
) -> Card {
|
|
Card {
|
|
payload: Payload::Virtual,
|
|
supervision: Supervision::OneShot,
|
|
lifecycle: Lifecycle::Oneshot,
|
|
priority: Priority::Normal,
|
|
kind: CardKind::Ente,
|
|
flow: Flows {
|
|
input: vec![Flow {
|
|
name: flow_name.into(),
|
|
ty: TypeRef::Primitive {
|
|
name: type_name.into(),
|
|
},
|
|
pin_to: None,
|
|
}],
|
|
output: vec![],
|
|
},
|
|
..Card::new(consumer_label)
|
|
}
|
|
}
|
|
|
|
/// Conecta al brahman-init, registra `consumer_card`, espera el
|
|
/// primer `MatchEvent::Available` y devuelve el `producer_service_socket`
|
|
/// que el broker emitió. Cierra la sesión con Farewell antes de
|
|
/// retornar (best-effort).
|
|
///
|
|
/// La `consumer_card` debe declarar al menos un `flow.input`; si no,
|
|
/// el broker no puede hacer matching y el await siempre dará timeout.
|
|
pub async fn await_provider(
|
|
consumer_card: Card,
|
|
timeout: Duration,
|
|
) -> Result<PathBuf, ConsumerError> {
|
|
let init_path = transport::default_socket_path();
|
|
|
|
// Capturamos descriptor para el mensaje de error antes de mover
|
|
// la card al cliente.
|
|
let (flow_name, type_ref_name) = describe_first_input(&consumer_card);
|
|
|
|
let mut client = Client::connect(&init_path, consumer_card)
|
|
.await
|
|
.map_err(|source| ConsumerError::Connect {
|
|
socket: init_path.clone(),
|
|
source,
|
|
})?;
|
|
|
|
let deadline = Instant::now() + timeout;
|
|
let socket = loop {
|
|
let remaining = deadline.saturating_duration_since(Instant::now());
|
|
if remaining.is_zero() {
|
|
break None;
|
|
}
|
|
match client.await_event(remaining).await? {
|
|
Some(ev) if ev.kind == MatchEventKind::Available => {
|
|
break ev.producer_service_socket;
|
|
}
|
|
Some(_) => continue, // Lost u otros: seguir esperando hasta el deadline
|
|
None => break None,
|
|
}
|
|
};
|
|
|
|
let _ = client.farewell().await; // best-effort cleanup
|
|
|
|
socket.ok_or(ConsumerError::NoProvider {
|
|
flow: flow_name,
|
|
type_ref: type_ref_name,
|
|
timeout,
|
|
})
|
|
}
|
|
|
|
/// Wrapper bloqueante de [`await_provider`]. Crea un runtime tokio
|
|
/// `current_thread` efímero y bloquea el thread llamador. Útil para
|
|
/// CLIs, tests y módulos std-thread (p. ej. el frontend GPUI antes
|
|
/// de tener su propio runtime async).
|
|
pub fn await_provider_blocking(
|
|
consumer_card: Card,
|
|
timeout: Duration,
|
|
) -> Result<PathBuf, ConsumerError> {
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_io()
|
|
.enable_time()
|
|
.build()
|
|
.map_err(|e| ConsumerError::Runtime(e.to_string()))?;
|
|
rt.block_on(await_provider(consumer_card, timeout))
|
|
}
|
|
|
|
fn describe_first_input(card: &Card) -> (String, String) {
|
|
match card.flow.input.first() {
|
|
Some(flow) => {
|
|
let type_name = match &flow.ty {
|
|
TypeRef::Primitive { name } => name.clone(),
|
|
TypeRef::Wit { package, name, .. } => format!("{package}#{name}"),
|
|
};
|
|
(flow.name.clone(), type_name)
|
|
}
|
|
None => ("(sin input)".into(), "(sin tipo)".into()),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use brahman_card::ulid::Ulid;
|
|
|
|
#[test]
|
|
fn builder_sets_input_flow_with_primitive_type() {
|
|
let c = build_consumer_card("nouser.cli", "embed-result", "json");
|
|
assert_eq!(c.label, "nouser.cli");
|
|
assert_eq!(c.kind, CardKind::Ente);
|
|
assert!(matches!(c.lifecycle, Lifecycle::Oneshot));
|
|
assert!(matches!(c.supervision, Supervision::OneShot));
|
|
assert_eq!(c.flow.input.len(), 1);
|
|
let f = &c.flow.input[0];
|
|
assert_eq!(f.name, "embed-result");
|
|
match &f.ty {
|
|
TypeRef::Primitive { name } => assert_eq!(name, "json"),
|
|
_ => panic!("expected primitive type"),
|
|
}
|
|
assert!(c.flow.output.is_empty());
|
|
// El builder asigna un id real (no nil) — fundamental para que
|
|
// el broker no colisione con otros consumers.
|
|
assert!(c.id != Ulid::nil(), "consumer card id no debe ser nil");
|
|
}
|
|
|
|
#[test]
|
|
fn builder_assigns_distinct_ids_per_call() {
|
|
let a = build_consumer_card("a", "f", "t");
|
|
let b = build_consumer_card("a", "f", "t");
|
|
assert_ne!(a.id, b.id, "cada Card debería tener id propio");
|
|
}
|
|
|
|
#[test]
|
|
fn describe_falls_back_when_no_input_flow() {
|
|
let mut c = build_consumer_card("x", "f", "t");
|
|
c.flow.input.clear();
|
|
let (flow, ty) = describe_first_input(&c);
|
|
assert_eq!(flow, "(sin input)");
|
|
assert_eq!(ty, "(sin tipo)");
|
|
}
|
|
|
|
#[test]
|
|
fn describe_formats_wit_type() {
|
|
let mut c = build_consumer_card("x", "f", "t");
|
|
c.flow.input[0].ty = TypeRef::Wit {
|
|
package: "brahman:dht".into(),
|
|
interface: None,
|
|
name: "entity-result".into(),
|
|
};
|
|
let (_, ty) = describe_first_input(&c);
|
|
assert_eq!(ty, "brahman:dht#entity-result");
|
|
}
|
|
}
|