diff --git a/CHANGELOG.md b/CHANGELOG.md index 4239500..3661db1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,68 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(explorer+daemon): discovery dinámico vía broker + query socket +La UI deja de hardcodear el socket admin: ahora descubre al daemon +nouser vía `MatchEvent::Available` del broker brahman y le consulta +sus Mónadas directo, sin pasar por brahman-admin. Cierra el "explorer +encuentra al daemon de forma totalmente dinámica" del meta-plan. + +Pipeline end-to-end: +- Daemon publica engine Card con `service_socket = $XDG_RUNTIME_DIR/nouser-engine.sock` + y `flow.output = monad-list:json`. +- Daemon binda un Unix socket en ese path y monta un listener + blocking que sirve `nouser_card::query::QueryRequest::ListMonads`, + responde `ListMonadsResponse { engine, monads: Vec }`. +- Explorer construye un consumer Card con `flow.input = monad-list:json` + vía `brahman_sidecar::build_consumer_card`, llama + `await_provider_blocking(card, 3s)` y recibe el socket descubierto. +- Cachea ese socket; cada poll (2s) llama + `nouser_core::engine_socket::client::list_monads(socket, 2s)`. + Fallo de query → invalida cache → próximo tick re-descubre. + +Wire types nuevos en `nouser_card::query`: +- `QueryRequest::ListMonads` (single variant por ahora). +- `ListMonadsResponse { engine: EngineInfo, monads: Vec }`. +- `MonadView`: proyección slim de `MonadManifest` SIN `centroid` ni + `members` — la UI no los necesita y eran KB por Mónada que no + tenían por qué viajar cada poll. +- `transport::default_socket_path()` con env override + `NOUSER_ENGINE_SOCKET`. +- Const `FLOW_MONAD_LIST = "monad-list"`, `FLOW_TYPE_NAME = "json"`. + +Listener en `nouser_core::engine_socket`: +- `spawn_listener(config, db)` arma std::os::unix::net::UnixListener + en thread blocking dedicado. Frecuencia esperada (UI cada 2s) no + amerita tokio. +- `client::list_monads(socket, timeout)` — cliente blocking con + `QueryError` tipado (Connect / Io / Serde / Daemon / Timeout / Empty). +- 3 tests integración: roundtrip vacío, Mónadas reales, request + inválido devuelve ErrorResponse. + +Refactor explorer: +- Drop dep `brahman-admin`, add deps `brahman-sidecar`, `nouser-card`, + `nouser-core`. +- State: `socket: Option` cache + `snapshot: Option` + + `socket_source: "discovery"|"cache"` (sólo informativo). +- Tick: `tick(prior_socket)` separado del UI, devuelve un enum + `TickOutcome::{Ok, DiscoveryFailed, QueryFailed}`. Cualquier + fallo invalida la cache → re-discovery automática. +- Header reformulado: `Engine 'nouser_engine' · N mónada(s) · + socket: /... (cache|discovery) · watching: /tmp/x`. +- Render pintado de un engine card + Mónadas, sin ya iterar + `BrokeredCard` del admin. + +Trade-offs aceptados: +- Polling 2s (no streaming). El broker no empuja cambios de Data + cards hoy; agregar streaming requiere extender el protocolo + handshake. Para snapshot UI, polling 2s es suficiente. +- Re-descubrimiento full en cada error de query (en lugar de retry + con backoff). Discovery es barato (~ms vs broker), no vale la + pena la complejidad. + +Tests: 10 (nouser-card, +3 query) + 27 (nouser-core, +3 engine_socket) ++ 4 (sidecar) verdes. Explorer compila clean. + ### feat(nous-real): cache de embeddings + write-through al CAS de arje Cierra el ciclo de la crítica del usuario: "Si un archivo no ha cambiado su hash en el CAS, Nouser ni siquiera debería pedirle al diff --git a/Cargo.lock b/Cargo.lock index 0137cc0..51beaa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6382,9 +6382,11 @@ dependencies = [ name = "nouser-explorer" version = "0.1.0" dependencies = [ - "brahman-admin", "brahman-card", + "brahman-sidecar", "gpui", + "nouser-card", + "nouser-core", ] [[package]] diff --git a/crates/apps/nouser-explorer/Cargo.toml b/crates/apps/nouser-explorer/Cargo.toml index b90d7fd..a050959 100644 --- a/crates/apps/nouser-explorer/Cargo.toml +++ b/crates/apps/nouser-explorer/Cargo.toml @@ -3,11 +3,13 @@ name = "nouser-explorer" version.workspace = true edition.workspace = true license.workspace = true -description = "Explorador GPUI de Mónadas: panel que consulta brahman-admin y renderea las sesiones como cards." +description = "Explorador GPUI de Mónadas: panel que descubre al daemon nouser vía broker brahman y consulta sus Mónadas dinámicamente." [dependencies] -brahman-admin = { path = "../../core/brahman-admin" } brahman-card = { path = "../../core/brahman-card" } +brahman-sidecar = { path = "../../shared/brahman-sidecar" } +nouser-card = { path = "../../modules/nouser/card" } +nouser-core = { path = "../../modules/nouser/core" } gpui = { workspace = true } [[bin]] diff --git a/crates/apps/nouser-explorer/src/main.rs b/crates/apps/nouser-explorer/src/main.rs index 3fbe271..ef05d3b 100644 --- a/crates/apps/nouser-explorer/src/main.rs +++ b/crates/apps/nouser-explorer/src/main.rs @@ -1,29 +1,38 @@ -//! `nouser-explorer` — panel GPUI que muestra las Mónadas (y demás -//! sesiones) registradas en el Init brahman. +//! `nouser-explorer` — panel GPUI que descubre al daemon `nouser` +//! vía broker brahman y muestra sus Mónadas en vivo. //! -//! Diseño: ventana standalone que cada N segundos consulta el socket -//! admin (`brahman_admin::client::query_blocking`) y renderea las -//! sesiones como cards. Sin integración con yahweh-shell — es su -//! propio binario para que el ecosistema sea visible incluso sin la -//! shell completa. +//! Diseño: ventana standalone que cada N segundos consulta el query +//! socket del daemon (`nouser_core::engine_socket::client::list_monads`). +//! El path del socket NO está hardcoded — se descubre vía +//! `brahman_sidecar::await_provider_blocking` para el flow +//! `monad-list:json`. Si el daemon cae, el socket cacheado se invalida +//! y la próxima iteración re-descubre. +//! +//! Sin integración con yahweh-shell — es su propio binario para que el +//! ecosistema sea visible incluso sin la shell completa. //! //! Uso: //! ```sh //! cargo run -p nouser-explorer -//! # con override de socket admin: -//! BRAHMAN_ADMIN_SOCKET=/tmp/brahman-admin.sock cargo run -p nouser-explorer +//! # con override del init socket (heredado de brahman-handshake): +//! BRAHMAN_INIT_SOCKET=/tmp/init.sock cargo run -p nouser-explorer //! ``` +use std::path::PathBuf; use std::time::Duration; -use brahman_admin::{client::query_blocking, transport, StatusSnapshot}; -use brahman_card::CardKind; +use brahman_sidecar::{await_provider_blocking, build_consumer_card, ConsumerError}; use gpui::{ div, prelude::*, px, rgb, App, Application, Bounds, Context, IntoElement, Render, SharedString, Window, WindowBounds, WindowOptions, }; +use nouser_card::query::{ListMonadsResponse, FLOW_MONAD_LIST, FLOW_TYPE_NAME}; +use nouser_card::Lens; +use nouser_core::engine_socket::client as query_client; const REFRESH_INTERVAL: Duration = Duration::from_secs(2); +const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(3); +const QUERY_TIMEOUT: Duration = Duration::from_secs(2); fn main() { Application::new().run(|cx: &mut App| { @@ -44,35 +53,52 @@ fn main() { }); } -/// Vista raíz: contiene el último snapshot recibido y el último error. +/// Vista raíz: cachea el socket descubierto, el último snapshot y el +/// último error. El socket cacheado se invalida ante cualquier fallo +/// de query, forzando re-discovery en la próxima iteración. struct Explorer { - snapshot: Option, + socket: Option, + snapshot: Option, error: Option, + /// Última fuente del socket activo: "discovery" (vía broker) o + /// "cache" (reusando el de la iteración anterior). Sólo informativo. + socket_source: Option<&'static str>, } impl Explorer { fn new(cx: &mut Context) -> Self { - // Loop de refresh: cada `REFRESH_INTERVAL`, query al admin y - // actualiza el modelo. `cx.spawn` corre en el GPUI executor; - // el `query_blocking` sí bloquea pero sólo brevemente — admin - // responde con un snapshot pequeño. + // Loop de refresh: cada `REFRESH_INTERVAL`: + // 1. Si no tenemos socket cacheado → discovery vía broker. + // 2. Si tenemos → query directo. Fallo → invalida cache. cx.spawn(async move |this, cx| { let timer = cx.background_executor().clone(); loop { - let path = transport::default_socket_path(); - let result = query_blocking(&path); + let prior_socket = this + .read_with(cx, |me, _| me.socket.clone()) + .ok() + .flatten(); + + let result = tick(prior_socket); + let _ = this.update(cx, |me, cx| { match result { - Ok(snap) => { - me.snapshot = Some(snap); + TickOutcome::Ok { socket, source, snapshot } => { + me.socket = Some(socket); + me.socket_source = Some(source); + me.snapshot = Some(snapshot); me.error = None; } - Err(e) => { - me.error = Some(SharedString::from(format!( - "no conectado a {}: {}", - path.display(), - e - ))); + TickOutcome::DiscoveryFailed(msg) => { + me.socket = None; + me.socket_source = None; + me.error = Some(SharedString::from(msg)); + } + TickOutcome::QueryFailed(msg) => { + // Invalidamos el socket cacheado: la + // próxima iteración re-descubre. + me.socket = None; + me.socket_source = None; + me.error = Some(SharedString::from(msg)); } } cx.notify(); @@ -83,33 +109,79 @@ impl Explorer { .detach(); Self { + socket: None, snapshot: None, error: None, + socket_source: None, } } } +enum TickOutcome { + Ok { + socket: PathBuf, + source: &'static str, + snapshot: ListMonadsResponse, + }, + DiscoveryFailed(String), + QueryFailed(String), +} + +/// Resuelve el socket (cache o discovery) y consulta `ListMonads`. +/// Pensado para correr en background: no toca GPUI, sólo I/O. +fn tick(prior_socket: Option) -> TickOutcome { + let (socket, source) = match prior_socket { + Some(p) => (p, "cache"), + None => match discover() { + Ok(p) => (p, "discovery"), + Err(e) => return TickOutcome::DiscoveryFailed(format!("discovery: {e}")), + }, + }; + + match query_client::list_monads(&socket, QUERY_TIMEOUT) { + Ok(resp) => TickOutcome::Ok { + socket, + source, + snapshot: resp, + }, + Err(e) => TickOutcome::QueryFailed(format!( + "query a {}: {e} — re-descubriendo en próxima iteración", + socket.display() + )), + } +} + +/// Discovery del daemon vía broker brahman. Construye un consumer +/// Card con `flow.input = monad-list:json`, espera al primer +/// `MatchEvent::Available`, devuelve el `producer_service_socket`. +fn discover() -> Result { + let card = build_consumer_card("nouser-explorer", FLOW_MONAD_LIST, FLOW_TYPE_NAME); + await_provider_blocking(card, DISCOVERY_TIMEOUT) +} + impl Render for Explorer { fn render(&mut self, _w: &mut Window, _cx: &mut Context) -> impl IntoElement { let bg = rgb(0x14171c); let card_bg = rgb(0x1d2128); let text_dim = rgb(0x9ba1ad); let text = rgb(0xe6e8ec); - let accent_ente = rgb(0x88c0d0); + let accent_engine = rgb(0x88c0d0); let accent_data = rgb(0xb48ead); - let header_text = match &self.snapshot { - Some(s) => format!( - "Init · protocol={} · attached={} · {} sesión(es){}", - s.protocol_version, - s.init_attached, - s.sessions.len(), - s.current_context + let header_text = match (&self.snapshot, &self.socket, self.socket_source) { + (Some(s), Some(sock), Some(src)) => format!( + "Engine '{}' · {} mónada(s) · socket: {} ({}){}", + s.engine.label, + s.monads.len(), + sock.display(), + src, + s.engine + .watching .as_deref() - .map(|c| format!(" · context: {}", c)) + .map(|w| format!(" · watching: {}", w)) .unwrap_or_default() ), - None => "Esperando snapshot del Init brahman…".to_string(), + _ => "Buscando daemon nouser vía brahman-broker…".to_string(), }; let header = div() @@ -134,62 +206,11 @@ impl Render for Explorer { let cards: Vec = match &self.snapshot { None => vec![], - Some(snap) => snap - .sessions - .iter() - .map(|s| { - let (kind_label, accent) = match s.kind { - CardKind::Ente => ("ente", accent_ente), - CardKind::Data => ("data", accent_data), - }; - - let summary_line = s - .data - .as_ref() - .map(|d| d.summary.clone()) - .unwrap_or_default(); - - let keywords = s - .data - .as_ref() - .map(|d| d.keywords.join(", ")) - .unwrap_or_default(); - - let lens_line = s - .data - .as_ref() - .map(|d| d.presentation_hint.clone()) - .filter(|h| !h.is_empty()) - .map(|h| format!("lens: {h}")) - .unwrap_or_default(); - - let sock_line = s - .service_socket - .as_ref() - .map(|p| format!("socket: {}", p.display())) - .unwrap_or_default(); - - let refs_line = if s.references.is_empty() { - String::new() - } else { - let parts: Vec = s - .references - .iter() - .map(|r| { - format!( - "{:?}→{}", - r.kind, - if r.target_label.is_empty() { - "?" - } else { - r.target_label.as_str() - } - ) - }) - .collect(); - format!("refs: {}", parts.join(" ")) - }; + Some(snap) => { + let mut out = Vec::with_capacity(snap.monads.len() + 1); + // Engine card primero — el "ser" que owns las Mónadas. + out.push( div() .flex() .flex_col() @@ -198,7 +219,7 @@ impl Render for Explorer { .bg(card_bg) .rounded(px(6.)) .border_l_4() - .border_color(accent) + .border_color(accent_engine) .gap(px(2.)) .child( div() @@ -208,72 +229,131 @@ impl Render for Explorer { .items_center() .child( div() - .text_color(accent) + .text_color(accent_engine) .text_size(px(11.)) - .child(format!("[{kind_label}]")), + .child("[engine]"), ) .child( div() .text_color(text) .text_size(px(15.)) - .child(s.label.clone()), - ) - .child( - div() - .text_color(text_dim) - .text_size(px(11.)) - .child(format!("{:?}", s.lifecycle)), + .child(snap.engine.label.clone()), ), ) .child( div() .text_color(text_dim) .text_size(px(11.)) - .child(format!("id: {}", s.session)), + .child(format!("id: {}", snap.engine.id)), ) - .when(!summary_line.is_empty(), |d| { - d.child( - div() - .text_color(text) - .text_size(px(12.)) - .child(summary_line.clone()), - ) - }) - .when(!keywords.is_empty(), |d| { + .when_some(snap.engine.watching.clone(), |d, w| { d.child( div() .text_color(text_dim) .text_size(px(11.)) - .child(format!("keywords: {keywords}")), + .child(format!("watching: {w}")), ) }) - .when(!lens_line.is_empty(), |d| { - d.child( + .into_any_element(), + ); + + // Mónadas (kind=Data por construcción). + for m in &snap.monads { + let lens = lens_label(m.dominant_lens); + let keywords = m.keywords.join(", "); + let path_hint_line = m + .path_hint + .as_deref() + .filter(|p| !p.is_empty()) + .map(|p| format!("path: {p}")); + let model_line = m + .centroid_model + .as_deref() + .filter(|m| !m.is_empty()) + .map(|m| format!("model: {m}")); + + out.push( + div() + .flex() + .flex_col() + .p(px(12.)) + .mb(px(8.)) + .bg(card_bg) + .rounded(px(6.)) + .border_l_4() + .border_color(accent_data) + .gap(px(2.)) + .child( + div() + .flex() + .flex_row() + .gap(px(8.)) + .items_center() + .child( + div() + .text_color(accent_data) + .text_size(px(11.)) + .child("[monad]"), + ) + .child( + div() + .text_color(text) + .text_size(px(15.)) + .child(m.label.clone()), + ) + .child( + div() + .text_color(text_dim) + .text_size(px(11.)) + .child(format!( + "{} files · ent {:.2} · {}", + m.cardinality, m.entropy, lens + )), + ), + ) + .child( div() .text_color(text_dim) .text_size(px(11.)) - .child(lens_line), + .child(format!("id: {}", m.id)), ) - }) - .when(!sock_line.is_empty(), |d| { - d.child( - div() - .text_color(text_dim) - .text_size(px(11.)) - .child(sock_line), - ) - }) - .when(!refs_line.is_empty(), |d| { - d.child( - div() - .text_color(text_dim) - .text_size(px(11.)) - .child(refs_line), - ) - }) - .into_any_element() - }) - .collect(), + .when(!m.summary.is_empty(), |d| { + d.child( + div() + .text_color(text) + .text_size(px(12.)) + .child(m.summary.clone()), + ) + }) + .when(!keywords.is_empty(), |d| { + d.child( + div() + .text_color(text_dim) + .text_size(px(11.)) + .child(format!("keywords: {keywords}")), + ) + }) + .when_some(path_hint_line, |d, line| { + d.child( + div() + .text_color(text_dim) + .text_size(px(11.)) + .child(line), + ) + }) + .when_some(model_line, |d, line| { + d.child( + div() + .text_color(text_dim) + .text_size(px(11.)) + .child(line), + ) + }) + .into_any_element(), + ); + } + out + } }; let body = div() @@ -293,3 +373,14 @@ impl Render for Explorer { .child(body) } } + +fn lens_label(l: Lens) -> &'static str { + match l { + Lens::Grid => "grid", + Lens::Code => "code", + Lens::Gallery => "gallery", + Lens::Database => "database", + Lens::Markdown => "markdown", + Lens::Tree => "tree", + } +} diff --git a/crates/modules/nouser/card/src/lib.rs b/crates/modules/nouser/card/src/lib.rs index b448fb9..d94fd70 100644 --- a/crates/modules/nouser/card/src/lib.rs +++ b/crates/modules/nouser/card/src/lib.rs @@ -33,6 +33,8 @@ use ulid::Ulid; // Re-export para consumidores pub use ::ulid; +pub mod query; + /// Versión del esquema del manifiesto. Bump al cambiar el schema. pub const MONAD_SCHEMA_VERSION: u16 = 1; diff --git a/crates/modules/nouser/card/src/query.rs b/crates/modules/nouser/card/src/query.rs new file mode 100644 index 0000000..a0eb61a --- /dev/null +++ b/crates/modules/nouser/card/src/query.rs @@ -0,0 +1,209 @@ +//! Wire types para consultar al daemon `nouser` por sus Mónadas. +//! +//! El daemon expone un Unix socket (cuyo path se publica en +//! `Card.service_socket` y se descubre vía broker MatchEvent). Cada +//! conexión es single-shot: una request JSON terminada en `\n`, +//! una response JSON terminada en `\n`, cierre. +//! +//! Mismo patrón que `nouser-nous` (mock/real ↔ nouser-core), reusado +//! ahora para que la UI (`nouser-explorer`) descubra y consulte al +//! daemon sin hardcodear sockets ni pasar por brahman-admin. +//! +//! ## Contrato +//! +//! ```text +//! C → S: {"kind":"list_monads"}\n +//! S → C: {"engine":{...},"monads":[...]}\n +//! ``` +//! +//! En caso de error: +//! +//! ```text +//! S → C: {"error":"unsupported kind"}\n +//! ``` + +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use ulid::Ulid; + +use crate::{Lens, MonadId, MonadManifest}; + +// ===================================================================== +// Constants compartidos para el broker brahman +// ===================================================================== + +/// Nombre del flow output del daemon (input del consumer/explorer). +pub const FLOW_MONAD_LIST: &str = "monad-list"; + +/// Tipo del flow: el wire es JSON, así que el TypeRef es `primitive::json`. +pub const FLOW_TYPE_NAME: &str = "json"; + +// ===================================================================== +// Wire request +// ===================================================================== + +/// Request al daemon. El wire es JSON line-delimited (un objeto + `\n` +/// por conexión). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum QueryRequest { + /// Lista todas las Mónadas vivas del daemon, junto con metadata + /// del engine. Pensado para que la UI haga snapshot polling. + ListMonads, +} + +// ===================================================================== +// Wire response +// ===================================================================== + +/// Response a `ListMonads`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListMonadsResponse { + /// Datos del engine (la Card que es "dueña" de las Mónadas). + pub engine: EngineInfo, + /// Mónadas vivas en este momento. Vista slim sin centroide ni + /// member set para que el wire sea liviano: una Mónada con 50k + /// archivos no debe transmitir 50k ULIDs cada poll. + pub monads: Vec, +} + +/// Identidad del engine (Card kind=Ente que owns las Mónadas). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EngineInfo { + pub id: Ulid, + pub label: String, + /// Path del directorio que el daemon está observando. `None` si + /// el daemon corre sin watcher. + #[serde(default)] + pub watching: Option, +} + +/// Vista slim de una Mónada — los campos que la UI necesita para +/// renderizar una card sin pull del centroide ni del member set. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonadView { + pub id: MonadId, + pub label: String, + #[serde(default)] + pub summary: String, + #[serde(default)] + pub keywords: Vec, + pub cardinality: u32, + #[serde(default)] + pub entropy: f32, + #[serde(default)] + pub dominant_lens: Lens, + #[serde(default)] + pub path_hint: Option, + #[serde(default)] + pub centroid_model: Option, +} + +impl MonadView { + /// Proyecta un MonadManifest completo a su vista slim para wire. + pub fn from_manifest(m: &MonadManifest) -> Self { + Self { + id: m.id, + label: m.label.clone(), + summary: m.summary.clone(), + keywords: m.keywords.clone(), + cardinality: m.cardinality, + entropy: m.entropy, + dominant_lens: m.dominant_lens, + path_hint: m.path_hint.clone(), + centroid_model: m.centroid_model.clone(), + } + } +} + +/// Error de protocolo retornado en lugar de la response normal. +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +#[error("nouser-engine: {error}")] +pub struct ErrorResponse { + pub error: String, +} + +// ===================================================================== +// Transport +// ===================================================================== + +pub mod transport { + use std::path::PathBuf; + + /// Variable de entorno para sobreescribir la ruta del socket del + /// daemon (útil para tests / multi-daemon). + pub const SOCKET_ENV: &str = "NOUSER_ENGINE_SOCKET"; + + /// Nombre por defecto del socket. + pub const SOCKET_NAME: &str = "nouser-engine.sock"; + + /// Ruta canónica al socket del daemon. Honra `NOUSER_ENGINE_SOCKET` + /// si está set, sino arma sobre `$XDG_RUNTIME_DIR` (con fallback + /// `$TMPDIR`). + pub fn default_socket_path() -> PathBuf { + if let Ok(p) = std::env::var(SOCKET_ENV) { + return PathBuf::from(p); + } + std::env::var_os("XDG_RUNTIME_DIR") + .map(PathBuf::from) + .unwrap_or_else(std::env::temp_dir) + .join(SOCKET_NAME) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_roundtrips_json_with_tag() { + let req = QueryRequest::ListMonads; + let s = serde_json::to_string(&req).unwrap(); + assert_eq!(s, r#"{"kind":"list_monads"}"#); + let back: QueryRequest = serde_json::from_str(&s).unwrap(); + assert_eq!(back, req); + } + + #[test] + fn response_roundtrip_preserves_view() { + let m = MonadManifest::new("x/src"); + let view = MonadView::from_manifest(&m); + let resp = ListMonadsResponse { + engine: EngineInfo { + id: Ulid::new(), + label: "brahman.nouser_engine".into(), + watching: Some("/tmp/x".into()), + }, + monads: vec![view.clone()], + }; + let s = serde_json::to_string(&resp).unwrap(); + let back: ListMonadsResponse = serde_json::from_str(&s).unwrap(); + assert_eq!(back.monads.len(), 1); + assert_eq!(back.monads[0].label, view.label); + assert_eq!(back.engine.label, "brahman.nouser_engine"); + } + + #[test] + fn view_is_slim_no_centroid_no_members() { + // Construimos una Mónada con centroid + members "pesados", + // proyectamos a view, verificamos que esos campos no viajan. + let mut m = MonadManifest::new("test"); + m.centroid = vec![0.1; 384]; // peso "real-fastembed" + m.members.insert(Ulid::new()); + m.members.insert(Ulid::new()); + m.cardinality = 2; + let view = MonadView::from_manifest(&m); + let s = serde_json::to_string(&view).unwrap(); + // Chequeo con `:` para distinguir el field "centroid" del + // field "centroid_model" (que sí es metadata liviana y debe ir). + assert!( + !s.contains("\"centroid\":"), + "MonadView no debe serializar el vector centroid: {s}" + ); + assert!( + !s.contains("\"members\":"), + "MonadView no debe serializar members: {s}" + ); + assert!(s.contains("\"cardinality\":2"), "cardinality sí va: {s}"); + } +} diff --git a/crates/modules/nouser/core/src/bin/nouser.rs b/crates/modules/nouser/core/src/bin/nouser.rs index 4c4a12c..55b13b9 100644 --- a/crates/modules/nouser/core/src/bin/nouser.rs +++ b/crates/modules/nouser/core/src/bin/nouser.rs @@ -179,13 +179,21 @@ fn cmd_daemon(args: &[String]) -> Cmd { brahman_sidecar::SidecarPool::new().map_err(|e| format!("crear pool: {e}"))?, ); - // 1. Engine como Ente. - let engine_card = build_engine_card(); + // 1. Decidir el path del query socket ANTES de armar el engine + // Card (porque viaja como service_socket en la Card). + let query_socket = nouser_card::query::transport::default_socket_path(); + + // 2. Engine como Ente. Declara service_socket + flow.output para + // que el broker pueda emitir MatchEvent::Available a consumers + // interesados en `flow.input = monad-list:json`. + let engine_card = build_engine_card(query_socket.clone()); let engine_id = engine_card.id; let engine_label = engine_card.label.clone(); eprintln!( - "nouser daemon: publicando engine '{}' (kind=Ente, id={})", - engine_label, engine_id + "nouser daemon: publicando engine '{}' (kind=Ente, id={}, socket={})", + engine_label, + engine_id, + query_socket.display() ); pool.spawn(engine_card); @@ -272,11 +280,39 @@ fn cmd_daemon(args: &[String]) -> Cmd { scanned_count, newly_spawned ); + // Engine query socket: bind antes del watcher para que cualquier + // consumer descubierto vía broker pueda consultarnos enseguida. + // Si el bind falla, seguimos sin él — la UI degrada a "no + // alcanzable" pero el daemon sigue procesando cambios. + let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db)); + let _query_listener = match nouser_core::engine_socket::spawn_listener( + nouser_core::engine_socket::ListenerConfig { + socket_path: query_socket.clone(), + engine_id, + engine_label: engine_label.clone(), + watching: Some(dir.clone()), + }, + db_shared.clone(), + ) { + Ok(h) => { + eprintln!( + "nouser daemon: query socket activo en {} (proto: nouser_card::query)", + query_socket.display() + ); + Some(h) + } + Err(e) => { + eprintln!( + "nouser daemon: query socket NO disponible ({e}) — explorer no podrá consultar" + ); + None + } + }; + // Watcher: cada cambio en el árbol — coalescido con debounce de // 150ms — dispara un re-scan + re-cluster del directorio y // re-publica al broker las Mónadas afectadas (drop + spawn por id, // gracias al replace en `SidecarPool::spawn`). - let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db)); let _watcher = match spawn_fs_watcher( dir.clone(), db_shared.clone(), @@ -301,6 +337,8 @@ fn cmd_daemon(args: &[String]) -> Cmd { std::thread::park(); drop(_watcher); + drop(_query_listener); + let _ = std::fs::remove_file(&query_socket); // best-effort cleanup drop(pool); Ok(()) } @@ -726,14 +764,32 @@ fn embed_via( /// Card del propio engine (kind=Ente). Es el "ser" que produce y /// administra Mónadas; aparece en brahman-status junto a sus Mónadas. -fn build_engine_card() -> brahman_card::Card { - use brahman_card::{Card, CardKind, Lifecycle, Payload, Priority, Supervision}; +/// +/// Declara `service_socket` y `flow.output = monad-list:json` para +/// que un consumer (UI, CLI) pueda descubrir al daemon vía broker +/// MatchEvent y consultarle por sus Mónadas sin pasar por +/// brahman-admin. +fn build_engine_card(service_socket: std::path::PathBuf) -> brahman_card::Card { + use brahman_card::{Card, CardKind, Flow, Flows, Lifecycle, Payload, Priority, Supervision, TypeRef}; + use nouser_card::query::{FLOW_MONAD_LIST, FLOW_TYPE_NAME}; + Card { payload: Payload::Virtual, supervision: Supervision::Delegate, lifecycle: Lifecycle::Daemon, priority: Priority::Normal, kind: CardKind::Ente, + service_socket: Some(service_socket), + flow: Flows { + input: vec![], + output: vec![Flow { + name: FLOW_MONAD_LIST.into(), + ty: TypeRef::Primitive { + name: FLOW_TYPE_NAME.into(), + }, + pin_to: None, + }], + }, ..Card::new("brahman.nouser_engine") } } diff --git a/crates/modules/nouser/core/src/engine_socket.rs b/crates/modules/nouser/core/src/engine_socket.rs new file mode 100644 index 0000000..326515e --- /dev/null +++ b/crates/modules/nouser/core/src/engine_socket.rs @@ -0,0 +1,300 @@ +//! Listener Unix-socket que sirve [`nouser_card::query::QueryRequest`]. +//! +//! El daemon `nouser` lo monta para que cualquier consumer (UI, CLI, +//! otro módulo) pueda preguntarle por sus Mónadas sin pasar por +//! brahman-admin. El path del socket viaja en el `Card.service_socket` +//! del engine; el broker brahman lo enseña vía MatchEvent::Available +//! cuando un consumer declara `flow.input = monad-list:json`. +//! +//! Wire: line-delimited JSON, single-shot por conexión. Mismo patrón +//! que `nouser-nous` (mock/real ↔ nouser-core), reutilizado. +//! +//! Threading: un thread dedicado, blocking I/O. No vale la pena traer +//! tokio acá — la frecuencia esperada es muy baja (UI poll cada 2s) +//! y el handler es trivial (lock db → snapshot → write). + +use std::io::{BufRead, BufReader, Write}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use nouser_card::query::{ + EngineInfo, ErrorResponse, ListMonadsResponse, MonadView, QueryRequest, +}; +use nouser_card::ulid::Ulid; + +use crate::db::MonadDb; + +/// Configuración del listener. +pub struct ListenerConfig { + pub socket_path: PathBuf, + pub engine_id: Ulid, + pub engine_label: String, + /// Path del directorio que el daemon está observando, para incluir + /// en `EngineInfo.watching`. `None` si el daemon no observa nada. + pub watching: Option, +} + +/// Bind del socket + spawn de un thread con accept loop. Devuelve el +/// path final (útil para confirmar) y un `JoinHandle` para shutdown +/// explícito (drop = thread sigue, listener queda). +/// +/// Si el socket ya existe (sesión anterior crasheada), se intenta +/// removerlo antes del bind. Errores de bind se propagan al caller. +pub fn spawn_listener( + config: ListenerConfig, + db: Arc>, +) -> std::io::Result> { + if config.socket_path.exists() { + let _ = std::fs::remove_file(&config.socket_path); + } + if let Some(parent) = config.socket_path.parent() { + std::fs::create_dir_all(parent)?; + } + let listener = UnixListener::bind(&config.socket_path)?; + + let handle = std::thread::Builder::new() + .name("nouser-engine-listener".into()) + .spawn(move || { + for conn in listener.incoming() { + match conn { + Ok(stream) => { + // Handler sincrónico inline. La frecuencia + // esperada (UI poll cada N segundos) no + // amerita spawn-per-connection; si en el + // futuro hay carga, agregar un threadpool. + if let Err(e) = handle_conn(stream, &db, &config) { + eprintln!("[engine-socket] conn falló: {e}"); + } + } + Err(e) => { + eprintln!("[engine-socket] accept falló: {e}"); + } + } + } + })?; + + Ok(handle) +} + +fn handle_conn( + mut stream: UnixStream, + db: &Arc>, + config: &ListenerConfig, +) -> std::io::Result<()> { + let mut reader = BufReader::new(stream.try_clone()?); + let mut line = String::new(); + let n = reader.read_line(&mut line)?; + if n == 0 { + return Ok(()); + } + + let resp_bytes = match serde_json::from_str::(line.trim()) { + Ok(QueryRequest::ListMonads) => match handle_list_monads(db, config) { + Ok(json) => json, + Err(e) => encode_error(format!("list_monads falló: {e}")), + }, + Err(e) => encode_error(format!("JSON inválido: {e}")), + }; + + stream.write_all(resp_bytes.as_bytes())?; + stream.write_all(b"\n")?; + stream.flush()?; + let _ = stream.shutdown(std::net::Shutdown::Both); + Ok(()) +} + +fn handle_list_monads( + db: &Arc>, + config: &ListenerConfig, +) -> Result { + let db_lock = db.lock().map_err(|_| "mutex envenenado".to_string())?; + let monads: Vec = db_lock.monads().map(MonadView::from_manifest).collect(); + let resp = ListMonadsResponse { + engine: EngineInfo { + id: config.engine_id, + label: config.engine_label.clone(), + watching: config.watching.as_ref().map(|p| p.display().to_string()), + }, + monads, + }; + serde_json::to_string(&resp).map_err(|e| format!("encode: {e}")) +} + +fn encode_error(msg: String) -> String { + let err = ErrorResponse { error: msg }; + serde_json::to_string(&err).unwrap_or_else(|_| "{\"error\":\"encode\"}".into()) +} + +/// Cliente blocking — `client::list_monads(socket)` para que la UI no +/// reimplemente el handshake JSON cada vez. +pub mod client { + use std::io::{BufRead, BufReader, Write}; + use std::os::unix::net::UnixStream; + use std::path::Path; + use std::time::Duration; + + use nouser_card::query::{ErrorResponse, ListMonadsResponse, QueryRequest}; + + #[derive(Debug, thiserror::Error)] + pub enum QueryError { + #[error("conectar a {path}: {source}")] + Connect { + path: std::path::PathBuf, + #[source] + source: std::io::Error, + }, + #[error("I/O: {0}")] + Io(#[from] std::io::Error), + #[error("serializacion: {0}")] + Serde(#[from] serde_json::Error), + #[error("daemon: {0}")] + Daemon(String), + #[error("timeout esperando response")] + Timeout, + #[error("response vacía del daemon")] + Empty, + } + + /// Envía `ListMonads` y devuelve la response. Timeout aplicado + /// tanto al connect como al read. + pub fn list_monads( + socket: &Path, + timeout: Duration, + ) -> Result { + let mut stream = UnixStream::connect(socket).map_err(|e| QueryError::Connect { + path: socket.to_path_buf(), + source: e, + })?; + stream.set_read_timeout(Some(timeout))?; + stream.set_write_timeout(Some(timeout))?; + + let req = QueryRequest::ListMonads; + let line = serde_json::to_string(&req)?; + stream.write_all(line.as_bytes())?; + stream.write_all(b"\n")?; + stream.flush()?; + + let mut reader = BufReader::new(stream); + let mut response = String::new(); + let n = reader.read_line(&mut response)?; + if n == 0 { + return Err(QueryError::Empty); + } + + if let Ok(resp) = serde_json::from_str::(response.trim()) { + return Ok(resp); + } + let err: ErrorResponse = serde_json::from_str(response.trim())?; + Err(QueryError::Daemon(err.error)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::MonadDb; + use nouser_card::MonadManifest; + use std::time::Duration; + + fn fresh_socket_path(name: &str) -> PathBuf { + let dir = std::env::temp_dir(); + let unique = format!("{}-{}-{}.sock", name, std::process::id(), Ulid::new()); + dir.join(unique) + } + + #[test] + fn list_monads_roundtrip_empty() { + let socket = fresh_socket_path("nouser-engine-test"); + let db = Arc::new(Mutex::new(MonadDb::new())); + let engine_id = Ulid::new(); + let _h = spawn_listener( + ListenerConfig { + socket_path: socket.clone(), + engine_id, + engine_label: "test-engine".into(), + watching: Some(PathBuf::from("/tmp/x")), + }, + db.clone(), + ) + .unwrap(); + + // Pequeña espera para que el bind se asiente (en práctica el + // socket existe inmediatamente tras el bind, pero algunos FS + // necesitan un tick). Si esto resulta flaky, agregar un loop + // de wait_for(socket.exists()). + std::thread::sleep(Duration::from_millis(50)); + + let resp = client::list_monads(&socket, Duration::from_secs(2)).unwrap(); + assert_eq!(resp.engine.id, engine_id); + assert_eq!(resp.engine.label, "test-engine"); + assert_eq!(resp.engine.watching.as_deref(), Some("/tmp/x")); + assert!(resp.monads.is_empty()); + + let _ = std::fs::remove_file(&socket); + } + + #[test] + fn list_monads_returns_views() { + let socket = fresh_socket_path("nouser-engine-test-views"); + let db = Arc::new(Mutex::new(MonadDb::new())); + let m1 = MonadManifest::new("alpha"); + let m2 = MonadManifest::new("beta"); + { + let mut g = db.lock().unwrap(); + g.replace_monads(vec![m1.clone(), m2.clone()]); + } + let _h = spawn_listener( + ListenerConfig { + socket_path: socket.clone(), + engine_id: Ulid::new(), + engine_label: "test".into(), + watching: None, + }, + db.clone(), + ) + .unwrap(); + std::thread::sleep(Duration::from_millis(50)); + + let resp = client::list_monads(&socket, Duration::from_secs(2)).unwrap(); + assert_eq!(resp.monads.len(), 2); + let labels: Vec<_> = resp.monads.iter().map(|m| m.label.as_str()).collect(); + assert!(labels.contains(&"alpha")); + assert!(labels.contains(&"beta")); + + let _ = std::fs::remove_file(&socket); + } + + #[test] + fn invalid_request_returns_error_response() { + let socket = fresh_socket_path("nouser-engine-test-bad"); + let db = Arc::new(Mutex::new(MonadDb::new())); + let _h = spawn_listener( + ListenerConfig { + socket_path: socket.clone(), + engine_id: Ulid::new(), + engine_label: "test".into(), + watching: None, + }, + db.clone(), + ) + .unwrap(); + std::thread::sleep(Duration::from_millis(50)); + + // Bypass del cliente tipado: mandamos JSON inválido a mano. + use std::io::{BufRead, BufReader, Write}; + let mut stream = UnixStream::connect(&socket).unwrap(); + stream.write_all(b"not json\n").unwrap(); + stream.flush().unwrap(); + let mut reader = BufReader::new(stream); + let mut response = String::new(); + reader.read_line(&mut response).unwrap(); + + assert!( + response.contains("\"error\""), + "esperaba ErrorResponse, got: {response}" + ); + + let _ = std::fs::remove_file(&socket); + } +} diff --git a/crates/modules/nouser/core/src/lib.rs b/crates/modules/nouser/core/src/lib.rs index 3c1e45c..0bf2e19 100644 --- a/crates/modules/nouser/core/src/lib.rs +++ b/crates/modules/nouser/core/src/lib.rs @@ -28,6 +28,7 @@ pub mod cluster; pub mod db; pub mod embed; +pub mod engine_socket; pub mod scanner; pub use nouser_card::*;