feat(explorer+daemon): discovery dinamico via broker + query socket

Cierra el "explorer encuentra al daemon de forma totalmente dinamica"
del meta-plan. La UI deja de hardcodear el socket admin: descubre al
daemon nouser via MatchEvent::Available del broker y le consulta sus
Monadas directo.

Pipeline end-to-end:
- Daemon publica engine Card con service_socket = $XDG_RUNTIME_DIR/
  nouser-engine.sock y flow.output = monad-list:json.
- Daemon binda Unix socket en ese path con listener blocking que
  sirve nouser_card::query::QueryRequest::ListMonads, responde
  ListMonadsResponse { engine, monads: Vec<MonadView> }.
- Explorer construye consumer Card con flow.input matched,
  brahman_sidecar::await_provider_blocking le devuelve el socket,
  y nouser_core::engine_socket::client::list_monads lo consulta.
- Cachea el socket; cualquier fallo de query lo invalida y la
  proxima iteracion re-descubre.

Wire types nuevos en nouser_card::query:
- QueryRequest::ListMonads
- ListMonadsResponse { engine: EngineInfo, monads: Vec<MonadView> }
- MonadView: proyeccion slim de MonadManifest SIN centroid ni
  members (KB que no tienen por que viajar cada poll).
- transport::default_socket_path() con env override.

Listener en nouser_core::engine_socket: spawn_listener + client
blocking con QueryError tipado. 3 tests integracion verdes.

Refactor explorer:
- Drop dep brahman-admin, add brahman-sidecar/nouser-card/nouser-core.
- State: socket cache + snapshot + socket_source informativo.
- TickOutcome enum desacopla la I/O del UI.

Trade-offs: polling 2s (no streaming — broker no empuja Data cards
hoy), re-discovery full en error (discovery es barato).

Tests: 10 (nouser-card +3 query) + 27 (nouser-core +3 engine_socket)
+ 4 (sidecar) verdes. Explorer compila clean.
This commit is contained in:
Sergio
2026-05-09 03:08:20 +00:00
parent b23ddf2980
commit 2ae888bc8f
9 changed files with 871 additions and 146 deletions
+2
View File
@@ -33,6 +33,8 @@ use ulid::Ulid;
// Re-export para consumidores
pub use ::ulid;
pub mod query;
/// Versión del esquema del manifiesto. Bump al cambiar el schema.
pub const MONAD_SCHEMA_VERSION: u16 = 1;
+209
View File
@@ -0,0 +1,209 @@
//! Wire types para consultar al daemon `nouser` por sus Mónadas.
//!
//! El daemon expone un Unix socket (cuyo path se publica en
//! `Card.service_socket` y se descubre vía broker MatchEvent). Cada
//! conexión es single-shot: una request JSON terminada en `\n`,
//! una response JSON terminada en `\n`, cierre.
//!
//! Mismo patrón que `nouser-nous` (mock/real ↔ nouser-core), reusado
//! ahora para que la UI (`nouser-explorer`) descubra y consulte al
//! daemon sin hardcodear sockets ni pasar por brahman-admin.
//!
//! ## Contrato
//!
//! ```text
//! C → S: {"kind":"list_monads"}\n
//! S → C: {"engine":{...},"monads":[...]}\n
//! ```
//!
//! En caso de error:
//!
//! ```text
//! S → C: {"error":"unsupported kind"}\n
//! ```
use serde::{Deserialize, Serialize};
use thiserror::Error;
use ulid::Ulid;
use crate::{Lens, MonadId, MonadManifest};
// =====================================================================
// Constants compartidos para el broker brahman
// =====================================================================
/// Nombre del flow output del daemon (input del consumer/explorer).
pub const FLOW_MONAD_LIST: &str = "monad-list";
/// Tipo del flow: el wire es JSON, así que el TypeRef es `primitive::json`.
pub const FLOW_TYPE_NAME: &str = "json";
// =====================================================================
// Wire request
// =====================================================================
/// Request al daemon. El wire es JSON line-delimited (un objeto + `\n`
/// por conexión).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum QueryRequest {
/// Lista todas las Mónadas vivas del daemon, junto con metadata
/// del engine. Pensado para que la UI haga snapshot polling.
ListMonads,
}
// =====================================================================
// Wire response
// =====================================================================
/// Response a `ListMonads`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListMonadsResponse {
/// Datos del engine (la Card que es "dueña" de las Mónadas).
pub engine: EngineInfo,
/// Mónadas vivas en este momento. Vista slim sin centroide ni
/// member set para que el wire sea liviano: una Mónada con 50k
/// archivos no debe transmitir 50k ULIDs cada poll.
pub monads: Vec<MonadView>,
}
/// Identidad del engine (Card kind=Ente que owns las Mónadas).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineInfo {
pub id: Ulid,
pub label: String,
/// Path del directorio que el daemon está observando. `None` si
/// el daemon corre sin watcher.
#[serde(default)]
pub watching: Option<String>,
}
/// Vista slim de una Mónada — los campos que la UI necesita para
/// renderizar una card sin pull del centroide ni del member set.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonadView {
pub id: MonadId,
pub label: String,
#[serde(default)]
pub summary: String,
#[serde(default)]
pub keywords: Vec<String>,
pub cardinality: u32,
#[serde(default)]
pub entropy: f32,
#[serde(default)]
pub dominant_lens: Lens,
#[serde(default)]
pub path_hint: Option<String>,
#[serde(default)]
pub centroid_model: Option<String>,
}
impl MonadView {
/// Proyecta un MonadManifest completo a su vista slim para wire.
pub fn from_manifest(m: &MonadManifest) -> Self {
Self {
id: m.id,
label: m.label.clone(),
summary: m.summary.clone(),
keywords: m.keywords.clone(),
cardinality: m.cardinality,
entropy: m.entropy,
dominant_lens: m.dominant_lens,
path_hint: m.path_hint.clone(),
centroid_model: m.centroid_model.clone(),
}
}
}
/// Error de protocolo retornado en lugar de la response normal.
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("nouser-engine: {error}")]
pub struct ErrorResponse {
pub error: String,
}
// =====================================================================
// Transport
// =====================================================================
pub mod transport {
use std::path::PathBuf;
/// Variable de entorno para sobreescribir la ruta del socket del
/// daemon (útil para tests / multi-daemon).
pub const SOCKET_ENV: &str = "NOUSER_ENGINE_SOCKET";
/// Nombre por defecto del socket.
pub const SOCKET_NAME: &str = "nouser-engine.sock";
/// Ruta canónica al socket del daemon. Honra `NOUSER_ENGINE_SOCKET`
/// si está set, sino arma sobre `$XDG_RUNTIME_DIR` (con fallback
/// `$TMPDIR`).
pub fn default_socket_path() -> PathBuf {
if let Ok(p) = std::env::var(SOCKET_ENV) {
return PathBuf::from(p);
}
std::env::var_os("XDG_RUNTIME_DIR")
.map(PathBuf::from)
.unwrap_or_else(std::env::temp_dir)
.join(SOCKET_NAME)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_roundtrips_json_with_tag() {
let req = QueryRequest::ListMonads;
let s = serde_json::to_string(&req).unwrap();
assert_eq!(s, r#"{"kind":"list_monads"}"#);
let back: QueryRequest = serde_json::from_str(&s).unwrap();
assert_eq!(back, req);
}
#[test]
fn response_roundtrip_preserves_view() {
let m = MonadManifest::new("x/src");
let view = MonadView::from_manifest(&m);
let resp = ListMonadsResponse {
engine: EngineInfo {
id: Ulid::new(),
label: "brahman.nouser_engine".into(),
watching: Some("/tmp/x".into()),
},
monads: vec![view.clone()],
};
let s = serde_json::to_string(&resp).unwrap();
let back: ListMonadsResponse = serde_json::from_str(&s).unwrap();
assert_eq!(back.monads.len(), 1);
assert_eq!(back.monads[0].label, view.label);
assert_eq!(back.engine.label, "brahman.nouser_engine");
}
#[test]
fn view_is_slim_no_centroid_no_members() {
// Construimos una Mónada con centroid + members "pesados",
// proyectamos a view, verificamos que esos campos no viajan.
let mut m = MonadManifest::new("test");
m.centroid = vec![0.1; 384]; // peso "real-fastembed"
m.members.insert(Ulid::new());
m.members.insert(Ulid::new());
m.cardinality = 2;
let view = MonadView::from_manifest(&m);
let s = serde_json::to_string(&view).unwrap();
// Chequeo con `:` para distinguir el field "centroid" del
// field "centroid_model" (que sí es metadata liviana y debe ir).
assert!(
!s.contains("\"centroid\":"),
"MonadView no debe serializar el vector centroid: {s}"
);
assert!(
!s.contains("\"members\":"),
"MonadView no debe serializar members: {s}"
);
assert!(s.contains("\"cardinality\":2"), "cardinality sí va: {s}");
}
}
+63 -7
View File
@@ -179,13 +179,21 @@ fn cmd_daemon(args: &[String]) -> Cmd {
brahman_sidecar::SidecarPool::new().map_err(|e| format!("crear pool: {e}"))?,
);
// 1. Engine como Ente.
let engine_card = build_engine_card();
// 1. Decidir el path del query socket ANTES de armar el engine
// Card (porque viaja como service_socket en la Card).
let query_socket = nouser_card::query::transport::default_socket_path();
// 2. Engine como Ente. Declara service_socket + flow.output para
// que el broker pueda emitir MatchEvent::Available a consumers
// interesados en `flow.input = monad-list:json`.
let engine_card = build_engine_card(query_socket.clone());
let engine_id = engine_card.id;
let engine_label = engine_card.label.clone();
eprintln!(
"nouser daemon: publicando engine '{}' (kind=Ente, id={})",
engine_label, engine_id
"nouser daemon: publicando engine '{}' (kind=Ente, id={}, socket={})",
engine_label,
engine_id,
query_socket.display()
);
pool.spawn(engine_card);
@@ -272,11 +280,39 @@ fn cmd_daemon(args: &[String]) -> Cmd {
scanned_count, newly_spawned
);
// Engine query socket: bind antes del watcher para que cualquier
// consumer descubierto vía broker pueda consultarnos enseguida.
// Si el bind falla, seguimos sin él — la UI degrada a "no
// alcanzable" pero el daemon sigue procesando cambios.
let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db));
let _query_listener = match nouser_core::engine_socket::spawn_listener(
nouser_core::engine_socket::ListenerConfig {
socket_path: query_socket.clone(),
engine_id,
engine_label: engine_label.clone(),
watching: Some(dir.clone()),
},
db_shared.clone(),
) {
Ok(h) => {
eprintln!(
"nouser daemon: query socket activo en {} (proto: nouser_card::query)",
query_socket.display()
);
Some(h)
}
Err(e) => {
eprintln!(
"nouser daemon: query socket NO disponible ({e}) — explorer no podrá consultar"
);
None
}
};
// Watcher: cada cambio en el árbol — coalescido con debounce de
// 150ms — dispara un re-scan + re-cluster del directorio y
// re-publica al broker las Mónadas afectadas (drop + spawn por id,
// gracias al replace en `SidecarPool::spawn`).
let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db));
let _watcher = match spawn_fs_watcher(
dir.clone(),
db_shared.clone(),
@@ -301,6 +337,8 @@ fn cmd_daemon(args: &[String]) -> Cmd {
std::thread::park();
drop(_watcher);
drop(_query_listener);
let _ = std::fs::remove_file(&query_socket); // best-effort cleanup
drop(pool);
Ok(())
}
@@ -726,14 +764,32 @@ fn embed_via(
/// Card del propio engine (kind=Ente). Es el "ser" que produce y
/// administra Mónadas; aparece en brahman-status junto a sus Mónadas.
fn build_engine_card() -> brahman_card::Card {
use brahman_card::{Card, CardKind, Lifecycle, Payload, Priority, Supervision};
///
/// Declara `service_socket` y `flow.output = monad-list:json` para
/// que un consumer (UI, CLI) pueda descubrir al daemon vía broker
/// MatchEvent y consultarle por sus Mónadas sin pasar por
/// brahman-admin.
fn build_engine_card(service_socket: std::path::PathBuf) -> brahman_card::Card {
use brahman_card::{Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef};
use nouser_card::query::{FLOW_MONAD_LIST, FLOW_TYPE_NAME};
Card {
payload: Payload::Virtual,
supervision: Supervision::Delegate,
lifecycle: Lifecycle::Daemon,
priority: Priority::Normal,
kind: CardKind::Ente,
service_socket: Some(service_socket),
flow: Flows {
input: vec![],
output: vec![Flow {
name: FLOW_MONAD_LIST.into(),
ty: TypeRef::Primitive {
name: FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
},
..Card::new("brahman.nouser_engine")
}
}
@@ -0,0 +1,300 @@
//! Listener Unix-socket que sirve [`nouser_card::query::QueryRequest`].
//!
//! El daemon `nouser` lo monta para que cualquier consumer (UI, CLI,
//! otro módulo) pueda preguntarle por sus Mónadas sin pasar por
//! brahman-admin. El path del socket viaja en el `Card.service_socket`
//! del engine; el broker brahman lo enseña vía MatchEvent::Available
//! cuando un consumer declara `flow.input = monad-list:json`.
//!
//! Wire: line-delimited JSON, single-shot por conexión. Mismo patrón
//! que `nouser-nous` (mock/real ↔ nouser-core), reutilizado.
//!
//! Threading: un thread dedicado, blocking I/O. No vale la pena traer
//! tokio acá — la frecuencia esperada es muy baja (UI poll cada 2s)
//! y el handler es trivial (lock db → snapshot → write).
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use nouser_card::query::{
EngineInfo, ErrorResponse, ListMonadsResponse, MonadView, QueryRequest,
};
use nouser_card::ulid::Ulid;
use crate::db::MonadDb;
/// Configuración del listener.
pub struct ListenerConfig {
pub socket_path: PathBuf,
pub engine_id: Ulid,
pub engine_label: String,
/// Path del directorio que el daemon está observando, para incluir
/// en `EngineInfo.watching`. `None` si el daemon no observa nada.
pub watching: Option<PathBuf>,
}
/// Bind del socket + spawn de un thread con accept loop. Devuelve el
/// path final (útil para confirmar) y un `JoinHandle` para shutdown
/// explícito (drop = thread sigue, listener queda).
///
/// Si el socket ya existe (sesión anterior crasheada), se intenta
/// removerlo antes del bind. Errores de bind se propagan al caller.
pub fn spawn_listener(
config: ListenerConfig,
db: Arc<Mutex<MonadDb>>,
) -> std::io::Result<std::thread::JoinHandle<()>> {
if config.socket_path.exists() {
let _ = std::fs::remove_file(&config.socket_path);
}
if let Some(parent) = config.socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&config.socket_path)?;
let handle = std::thread::Builder::new()
.name("nouser-engine-listener".into())
.spawn(move || {
for conn in listener.incoming() {
match conn {
Ok(stream) => {
// Handler sincrónico inline. La frecuencia
// esperada (UI poll cada N segundos) no
// amerita spawn-per-connection; si en el
// futuro hay carga, agregar un threadpool.
if let Err(e) = handle_conn(stream, &db, &config) {
eprintln!("[engine-socket] conn falló: {e}");
}
}
Err(e) => {
eprintln!("[engine-socket] accept falló: {e}");
}
}
}
})?;
Ok(handle)
}
fn handle_conn(
mut stream: UnixStream,
db: &Arc<Mutex<MonadDb>>,
config: &ListenerConfig,
) -> std::io::Result<()> {
let mut reader = BufReader::new(stream.try_clone()?);
let mut line = String::new();
let n = reader.read_line(&mut line)?;
if n == 0 {
return Ok(());
}
let resp_bytes = match serde_json::from_str::<QueryRequest>(line.trim()) {
Ok(QueryRequest::ListMonads) => match handle_list_monads(db, config) {
Ok(json) => json,
Err(e) => encode_error(format!("list_monads falló: {e}")),
},
Err(e) => encode_error(format!("JSON inválido: {e}")),
};
stream.write_all(resp_bytes.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
let _ = stream.shutdown(std::net::Shutdown::Both);
Ok(())
}
fn handle_list_monads(
db: &Arc<Mutex<MonadDb>>,
config: &ListenerConfig,
) -> Result<String, String> {
let db_lock = db.lock().map_err(|_| "mutex envenenado".to_string())?;
let monads: Vec<MonadView> = db_lock.monads().map(MonadView::from_manifest).collect();
let resp = ListMonadsResponse {
engine: EngineInfo {
id: config.engine_id,
label: config.engine_label.clone(),
watching: config.watching.as_ref().map(|p| p.display().to_string()),
},
monads,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
fn encode_error(msg: String) -> String {
let err = ErrorResponse { error: msg };
serde_json::to_string(&err).unwrap_or_else(|_| "{\"error\":\"encode\"}".into())
}
/// Cliente blocking — `client::list_monads(socket)` para que la UI no
/// reimplemente el handshake JSON cada vez.
pub mod client {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::time::Duration;
use nouser_card::query::{ErrorResponse, ListMonadsResponse, QueryRequest};
#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error("conectar a {path}: {source}")]
Connect {
path: std::path::PathBuf,
#[source]
source: std::io::Error,
},
#[error("I/O: {0}")]
Io(#[from] std::io::Error),
#[error("serializacion: {0}")]
Serde(#[from] serde_json::Error),
#[error("daemon: {0}")]
Daemon(String),
#[error("timeout esperando response")]
Timeout,
#[error("response vacía del daemon")]
Empty,
}
/// Envía `ListMonads` y devuelve la response. Timeout aplicado
/// tanto al connect como al read.
pub fn list_monads(
socket: &Path,
timeout: Duration,
) -> Result<ListMonadsResponse, QueryError> {
let mut stream = UnixStream::connect(socket).map_err(|e| QueryError::Connect {
path: socket.to_path_buf(),
source: e,
})?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
let req = QueryRequest::ListMonads;
let line = serde_json::to_string(&req)?;
stream.write_all(line.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
let mut reader = BufReader::new(stream);
let mut response = String::new();
let n = reader.read_line(&mut response)?;
if n == 0 {
return Err(QueryError::Empty);
}
if let Ok(resp) = serde_json::from_str::<ListMonadsResponse>(response.trim()) {
return Ok(resp);
}
let err: ErrorResponse = serde_json::from_str(response.trim())?;
Err(QueryError::Daemon(err.error))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::MonadDb;
use nouser_card::MonadManifest;
use std::time::Duration;
fn fresh_socket_path(name: &str) -> PathBuf {
let dir = std::env::temp_dir();
let unique = format!("{}-{}-{}.sock", name, std::process::id(), Ulid::new());
dir.join(unique)
}
#[test]
fn list_monads_roundtrip_empty() {
let socket = fresh_socket_path("nouser-engine-test");
let db = Arc::new(Mutex::new(MonadDb::new()));
let engine_id = Ulid::new();
let _h = spawn_listener(
ListenerConfig {
socket_path: socket.clone(),
engine_id,
engine_label: "test-engine".into(),
watching: Some(PathBuf::from("/tmp/x")),
},
db.clone(),
)
.unwrap();
// Pequeña espera para que el bind se asiente (en práctica el
// socket existe inmediatamente tras el bind, pero algunos FS
// necesitan un tick). Si esto resulta flaky, agregar un loop
// de wait_for(socket.exists()).
std::thread::sleep(Duration::from_millis(50));
let resp = client::list_monads(&socket, Duration::from_secs(2)).unwrap();
assert_eq!(resp.engine.id, engine_id);
assert_eq!(resp.engine.label, "test-engine");
assert_eq!(resp.engine.watching.as_deref(), Some("/tmp/x"));
assert!(resp.monads.is_empty());
let _ = std::fs::remove_file(&socket);
}
#[test]
fn list_monads_returns_views() {
let socket = fresh_socket_path("nouser-engine-test-views");
let db = Arc::new(Mutex::new(MonadDb::new()));
let m1 = MonadManifest::new("alpha");
let m2 = MonadManifest::new("beta");
{
let mut g = db.lock().unwrap();
g.replace_monads(vec![m1.clone(), m2.clone()]);
}
let _h = spawn_listener(
ListenerConfig {
socket_path: socket.clone(),
engine_id: Ulid::new(),
engine_label: "test".into(),
watching: None,
},
db.clone(),
)
.unwrap();
std::thread::sleep(Duration::from_millis(50));
let resp = client::list_monads(&socket, Duration::from_secs(2)).unwrap();
assert_eq!(resp.monads.len(), 2);
let labels: Vec<_> = resp.monads.iter().map(|m| m.label.as_str()).collect();
assert!(labels.contains(&"alpha"));
assert!(labels.contains(&"beta"));
let _ = std::fs::remove_file(&socket);
}
#[test]
fn invalid_request_returns_error_response() {
let socket = fresh_socket_path("nouser-engine-test-bad");
let db = Arc::new(Mutex::new(MonadDb::new()));
let _h = spawn_listener(
ListenerConfig {
socket_path: socket.clone(),
engine_id: Ulid::new(),
engine_label: "test".into(),
watching: None,
},
db.clone(),
)
.unwrap();
std::thread::sleep(Duration::from_millis(50));
// Bypass del cliente tipado: mandamos JSON inválido a mano.
use std::io::{BufRead, BufReader, Write};
let mut stream = UnixStream::connect(&socket).unwrap();
stream.write_all(b"not json\n").unwrap();
stream.flush().unwrap();
let mut reader = BufReader::new(stream);
let mut response = String::new();
reader.read_line(&mut response).unwrap();
assert!(
response.contains("\"error\""),
"esperaba ErrorResponse, got: {response}"
);
let _ = std::fs::remove_file(&socket);
}
}
+1
View File
@@ -28,6 +28,7 @@
pub mod cluster;
pub mod db;
pub mod embed;
pub mod engine_socket;
pub mod scanner;
pub use nouser_card::*;