feat(nouser): Phase D — proveedor Nous mock + cliente remoto

Cierra el patrón "Nous como módulo aparte intercambiable": el contrato
del proveedor de embeddings vive en su crate, el mock determinístico
implementa ese contrato sirviéndolo por Unix socket, y nouser-core
sabe consumirlo remotamente. El switch mock↔real (futuro) será vía
priority_contexts en el broker.

Crates nuevos:

- crates/modules/nouser/nous: contrato compartido.
  - EmbedRequest { kind: { EmbedFile | EmbedText | Ping }, payload }.
  - EmbedFilePayload (path, ext, size, mtime), EmbedTextPayload.
  - EmbedResponse (embedding, model, elapsed_ms), PingResponse,
    ErrorResponse.
  - Wire: line-delimited JSON sobre Unix socket, single-shot.
  - Constants FLOW_EMBED_REQUEST, FLOW_EMBED_RESULT, FLOW_TYPE_NAME.
  - transport::default_socket_path con env NOUSER_NOUS_SOCKET.

- crates/modules/nouser/nous-mock: bin nouser-nous-mock.
  - Sidecarea a brahman-init con Card kind=Ente declarando los flows
    embed-request/embed-result + priority_contexts.test = +1.
  - Bind del socket Nous + accept loop tokio.
  - EmbedFile delega a nouser_core::embed::embed (Phase C).
  - Modelo: "mock-pseudo-32d".

Cambios:

- nouser-core: dep nueva nouser-nous. Subcomando attract --remote
  abre un UnixStream blocking, envía EmbedRequest, lee response.
  Imprime "embed: local|remote" para ver cuál ruta corrió.

Bug encontrado y corregido:
- ContextBias tenía #[serde(skip_serializing_if = ...)] en sus campos.
  Postcard NO soporta skip-condicional en formatos no self-describing:
  el serializer omitía bytes que el deserializer esperaba, rompiendo
  la wire de cualquier Card con priority_contexts poblada.
  Síntoma: "postcard decode: Hit the end of buffer" en el server,
  "early eof" en el cliente.
- Fix: removidos los skip_serializing_if de ContextBias. JSON pretty
  ahora emite {"pin_to": null, "priority_offset": 0} pero el wire
  funciona. Trade-off aceptado.
- Test wirecard_postcard_with_priority_contexts en brahman-card que
  ejercita el roundtrip postcard con biases poblados.

Validación end-to-end:
  $ ente-zero & nouser-nous-mock & nouser daemon crates/core
  $ brahman-status
  Sessions (7):
    [ente] nouser.nous_mock      flows: embed-request, embed-result
    [ente] brahman.nouser_engine
    [data] src   summary: 6 archivos en crates/core/brahman-handshake/src
    [data] graph summary: 7 archivos en crates/core/ente-zero/src/graph
    ...
  $ nouser attract --remote crates/core <archivo>.rs
    embed: remote
    🧲  0.9058  src  ...
  (mock log: embed_file path=...)

Tests: 75. cargo check --workspace: 0 errores, 0 warnings.

Próximo natural: Phase D-2 — real-nous con ONNX/Llama text-embedding.
Declara la misma Card con priority_contexts.prod = +1 y el swap es
transparente para el consumer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-08 18:49:25 +00:00
parent 77faf12e82
commit b3c3c00cf2
10 changed files with 693 additions and 11 deletions
+1
View File
@@ -10,6 +10,7 @@ description = "Nouser — explorador de Mónadas: scanner, clustering determinis
[dependencies]
nouser-card = { path = "../card" }
nouser-nous = { path = "../nous" }
brahman-card = { path = "../../../core/brahman-card" }
brahman-sidecar = { path = "../../../shared/brahman-sidecar" }
blake3 = { workspace = true }
+72 -5
View File
@@ -206,16 +206,28 @@ fn cmd_daemon(args: &[String]) -> Cmd {
}
fn cmd_attract(args: &[String]) -> Cmd {
let dir = require_dir(args)?;
let file_path = args.get(1).ok_or("falta argumento <file>")?;
let file_path = std::path::PathBuf::from(file_path);
let mut remote = false;
let mut positional: Vec<&String> = Vec::new();
for a in args {
if a == "--remote" {
remote = true;
} else {
positional.push(a);
}
}
let dir = positional
.first()
.map(|s| std::path::PathBuf::from(s.as_str()))
.ok_or("falta argumento <dir>")?;
let file_path = positional.get(1).ok_or("falta argumento <file>")?;
let file_path = std::path::PathBuf::from(file_path.as_str());
if !file_path.exists() {
return Err(format!("archivo no existe: {}", file_path.display()).into());
}
let (db, _) = run_scan(&dir)?;
// Construimos un FileEntry para el archivo objetivo y sacamos su embedding.
// Construimos un FileEntry para el archivo objetivo.
let metadata = std::fs::metadata(&file_path)?;
let mtime_ms = metadata
.modified()
@@ -234,7 +246,16 @@ fn cmd_attract(args: &[String]) -> Cmd {
.and_then(|s| s.to_str())
.map(|s| s.to_lowercase()),
};
let target_vec = embed::embed(&target);
// Embedding: --remote consulta al socket de nouser-nous; sin flag,
// se computa localmente. El resultado debe ser idéntico mientras
// el proveedor sea el mock determinista.
let (target_vec, source) = if remote {
let v = remote_embed(&target)?;
(v, "remote")
} else {
(embed::embed(&target).to_vec(), "local")
};
// Ranking completo, no sólo el ganador — útil para entender qué
// Mónadas son secundarias.
@@ -252,6 +273,7 @@ fn cmd_attract(args: &[String]) -> Cmd {
println!("archivo: {}", file_path.display());
println!("scan dir: {}", dir.display());
println!("embed: {}", source);
println!("ranking de atracción (cosine similarity):");
println!();
for (i, (m, score)) in ranked.iter().take(5).enumerate() {
@@ -280,6 +302,51 @@ fn cmd_attract(args: &[String]) -> Cmd {
Ok(())
}
/// Cliente blocking del socket nouser-nous. Conecta, envía un
/// `EmbedRequest`, lee la response, devuelve el vector. Single-shot.
fn remote_embed(file: &nouser_card::FileEntry) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
let sock_path = nouser_nous::transport::default_socket_path();
if !sock_path.exists() {
return Err(format!(
"socket nouser-nous no existe en {} — corrió nouser-nous-mock?",
sock_path.display()
)
.into());
}
let mut stream = UnixStream::connect(&sock_path)?;
let req = nouser_nous::EmbedRequest {
kind: nouser_nous::RequestKind::EmbedFile,
payload: serde_json::to_value(nouser_nous::EmbedFilePayload {
path: file.path.display().to_string(),
extension: file.extension.clone(),
size: file.size,
mtime_ms: file.mtime_ms,
})?,
};
let line = serde_json::to_string(&req)?;
stream.write_all(line.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
let mut reader = BufReader::new(stream);
let mut response = String::new();
reader.read_line(&mut response)?;
if response.is_empty() {
return Err("nouser-nous cerró sin respuesta".into());
}
// Intentamos primero como response normal; si falla, como error.
if let Ok(resp) = serde_json::from_str::<nouser_nous::EmbedResponse>(&response) {
return Ok(resp.embedding);
}
let err: nouser_nous::ErrorResponse = serde_json::from_str(&response)?;
Err(format!("nouser-nous: {}", err.error).into())
}
/// Card del propio engine (kind=Ente). Es el "ser" que produce y
/// administra Mónadas; aparece en brahman-status junto a sus Mónadas.
fn build_engine_card() -> brahman_card::Card {
@@ -0,0 +1,25 @@
[package]
name = "nouser-nous-mock"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "Nouser — Nous mock determinístico: implementa el contrato nouser-nous con pseudo-embeddings de Phase C. Stand-in para tests y para `BRAHMAN_BROKER_CONTEXT=test`."
[dependencies]
brahman-card = { path = "../../../core/brahman-card" }
brahman-sidecar = { path = "../../../shared/brahman-sidecar" }
nouser-card = { path = "../card" }
nouser-core = { path = "../core" }
nouser-nous = { path = "../nous" }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
ulid = { workspace = true }
[[bin]]
name = "nouser-nous-mock"
path = "src/main.rs"
+238
View File
@@ -0,0 +1,238 @@
//! `nouser-nous-mock` — proveedor de embeddings determinista (sin LLM).
//!
//! Implementa el contrato `nouser-nous` usando los pseudo-embeddings
//! de Phase C (`nouser_core::embed`). Sirve como:
//!
//! - **Mock para tests**: en `BRAHMAN_BROKER_CONTEXT=test`, el
//! `priority_offset` per-contexto declarado en su Card lo prioriza
//! sobre cualquier proveedor real.
//! - **Bootstrap**: hasta que llegue el LLM real (Phase D futura), el
//! sistema funciona end-to-end con embeddings determinísticos.
//!
//! ## Vida del proceso
//!
//! 1. Sidecarea a brahman-init declarando una Card con flow output
//! `embed-result:json` y flow input `embed-request:json`. Su
//! `priority_contexts.test = { priority_offset: +1 }` lo prioriza
//! cuando el broker corre bajo contexto test.
//! 2. Bind del Unix socket en `$NOUSER_NOUS_SOCKET` (default
//! `$XDG_RUNTIME_DIR/nouser-nous.sock`).
//! 3. Loop: accept → read line JSON → process → write line JSON → close.
//! 4. Cada request se loggea (info) — útil para verificar que el
//! consumidor está usando este proveedor.
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use brahman_card::{
ulid::Ulid, Card, CardKind, ContextBias, Flow, Flows, Lifecycle, Payload, Priority,
Supervision, TypeRef,
};
use nouser_card::FileEntry;
use nouser_core::embed;
use nouser_nous::{
transport, EmbedFilePayload, EmbedRequest, EmbedResponse, EmbedTextPayload, ErrorResponse,
PingResponse, RequestKind, FLOW_EMBED_REQUEST, FLOW_EMBED_RESULT, FLOW_TYPE_NAME,
};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tracing::{info, warn};
const MODEL_ID: &str = "mock-pseudo-32d";
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
init_tracing();
// 1. Sidecar al brahman-init.
let card = build_card();
info!(label = %card.label, "publicando Card al brahman-init");
brahman_sidecar::spawn(card);
// 2. Bind del socket Nous.
let sock_path = transport::default_socket_path();
if sock_path.exists() {
std::fs::remove_file(&sock_path)?;
}
if let Some(parent) = sock_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&sock_path)?;
info!(socket = %sock_path.display(), "nouser-nous-mock escuchando");
// 3. Accept loop.
loop {
let (stream, _addr) = listener.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_conn(stream).await {
warn!(error = %e, "conn falló");
}
});
}
}
fn init_tracing() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.with_target(false)
.compact()
.init();
}
/// Card que el mock anuncia al brahman-init. Es kind=Ente (un proceso),
/// con flujos JSON y bias de prioridad para contexto `test`.
fn build_card() -> Card {
let mut priority_contexts = BTreeMap::new();
priority_contexts.insert(
"test".into(),
ContextBias {
pin_to: None,
// En contexto test, este mock gana sobre cualquier real-nous.
priority_offset: 1,
},
);
Card {
schema_version: brahman_card::CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: "nouser.nous_mock".into(),
payload: Payload::Virtual,
supervision: Supervision::Delegate,
lifecycle: Lifecycle::Daemon,
priority: Priority::Normal,
kind: CardKind::Ente,
flow: Flows {
input: vec![Flow {
name: FLOW_EMBED_REQUEST.into(),
ty: TypeRef::Primitive {
name: FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
output: vec![Flow {
name: FLOW_EMBED_RESULT.into(),
ty: TypeRef::Primitive {
name: FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
},
priority_contexts,
..Default::default()
}
}
/// Procesa una conexión single-shot: lee una línea JSON, despacha,
/// escribe una línea JSON, cierra.
async fn handle_conn(stream: UnixStream) -> std::io::Result<()> {
let mut reader = BufReader::new(stream);
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
let req: EmbedRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
return write_error(reader.into_inner(), format!("JSON inválido: {e}")).await;
}
};
let started = Instant::now();
let result = match req.kind {
RequestKind::EmbedFile => handle_embed_file(req.payload, started),
RequestKind::EmbedText => handle_embed_text(req.payload, started),
RequestKind::Ping => handle_ping(),
};
let mut stream = reader.into_inner();
match result {
Ok(payload) => {
stream.write_all(payload.as_bytes()).await?;
stream.write_all(b"\n").await?;
}
Err(msg) => {
return write_error(stream, msg).await;
}
}
stream.shutdown().await?;
Ok(())
}
fn handle_embed_file(payload: serde_json::Value, started: Instant) -> Result<String, String> {
let p: EmbedFilePayload =
serde_json::from_value(payload).map_err(|e| format!("payload inválido: {e}"))?;
info!(path = %p.path, "embed_file");
let file = FileEntry {
id: nouser_card::FileId::from(Ulid::new()),
path: PathBuf::from(p.path),
content_hash: None,
size: p.size,
mtime_ms: p.mtime_ms,
extension: p.extension,
};
let v = embed::embed(&file);
let resp = EmbedResponse {
embedding: v.to_vec(),
model: MODEL_ID.into(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
fn handle_embed_text(payload: serde_json::Value, started: Instant) -> Result<String, String> {
let p: EmbedTextPayload =
serde_json::from_value(payload).map_err(|e| format!("payload inválido: {e}"))?;
info!(text_len = p.text.len(), "embed_text");
// Mock: tratamos el texto como un "stem" sintético y rellenamos el
// resto del vector con ceros. No es semánticamente útil, pero respeta
// la forma para que el cliente no se rompa.
let synthetic = FileEntry {
id: nouser_card::FileId::from(Ulid::new()),
path: PathBuf::from(format!("synthetic://{}", p.text)),
content_hash: None,
size: p.text.len() as u64,
mtime_ms: now_ms(),
extension: Some("text".into()),
};
let v = embed::embed(&synthetic);
let resp = EmbedResponse {
embedding: v.to_vec(),
model: MODEL_ID.into(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
fn handle_ping() -> Result<String, String> {
let resp = PingResponse {
model: MODEL_ID.into(),
embed_dim: embed::EMBED_DIM as u32,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
async fn write_error(mut stream: UnixStream, msg: String) -> std::io::Result<()> {
warn!(error = %msg, "respuesta de error");
let resp = ErrorResponse { error: msg };
let json = serde_json::to_string(&resp).unwrap_or_else(|_| "{\"error\":\"encode\"}".into());
stream.write_all(json.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.shutdown().await?;
Ok(())
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "nouser-nous"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "Nouser — protocolo Nous: contrato JSON line-delimited entre nouser-core y los proveedores de embeddings (mock o LLM real)."
[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
+182
View File
@@ -0,0 +1,182 @@
//! `nouser-nous` — el contrato del proveedor de embeddings.
//!
//! Define el wire-format compartido entre `nouser-core` (consumidor) y
//! cualquier implementación de Nous (mock determinista o LLM real). El
//! protocolo es **line-delimited JSON** sobre Unix socket: cada conexión
//! envía una request, recibe una response, y cierra. Single-shot por
//! conexión, igual al admin de brahman.
//!
//! ## Contrato
//!
//! ```text
//! C → S: {"kind":"embed_file","payload":{...}}\n
//! S → C: {"embedding":[...],"model":"mock-pseudo-32d","elapsed_ms":1}\n
//! ```
//!
//! En caso de error:
//!
//! ```text
//! S → C: {"error":"unsupported kind"}\n
//! ```
//!
//! ## Por qué un crate aparte
//!
//! El consumidor (nouser-core) y el proveedor (nouser-nous-mock,
//! nouser-nous-real) deben acordar en types EXACTOS. Tener el contrato
//! en su crate evita que cada lado declare structs paralelos que se
//! desincronizan. Si bumpeás el wire, bumpeás aquí.
//!
//! ## Swap por priority_contexts
//!
//! Cuando existan dos proveedores (mock-nous y real-nous), ambos declaran
//! el mismo `flow.output: { name: "embed-result", type: ... }` y
//! `flow.input: "embed-request"`. El broker brahman los matchea contra
//! los consumidores; el `priority_offset` per-contexto del Card hace que
//! mock-nous gane en `test` y real-nous en `prod`. nouser-core sólo
//! consume el flow, sin saber cuál implementación corre.
#![forbid(unsafe_code)]
#![warn(rust_2018_idioms)]
use serde::{Deserialize, Serialize};
use thiserror::Error;
// =====================================================================
// Wire types
// =====================================================================
/// Request al proveedor Nous.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedRequest {
pub kind: RequestKind,
pub payload: serde_json::Value,
}
/// Tipo de request. El payload se interpreta según el kind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RequestKind {
/// payload = `EmbedFilePayload` (path + metadata mínima).
EmbedFile,
/// payload = `EmbedTextPayload` (string libre).
EmbedText,
/// payload = `{}`. Devuelve `PingResponse`.
Ping,
}
/// Payload para `EmbedFile`. Es la información mínima que el proveedor
/// necesita para producir un embedding de archivo determinista.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedFilePayload {
pub path: String,
pub extension: Option<String>,
pub size: u64,
/// `mtime` en ms desde UNIX_EPOCH.
pub mtime_ms: u64,
}
/// Payload para `EmbedText`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedTextPayload {
pub text: String,
}
/// Response exitosa con un embedding.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbedResponse {
/// Vector. Su longitud depende del modelo (mock=32, llama=384, etc.).
pub embedding: Vec<f32>,
/// Identificador del modelo que produjo el embedding (útil para logs
/// y para invalidar caches al cambiar de proveedor).
pub model: String,
/// Tiempo de cómputo en ms (proveedor lo reporta).
pub elapsed_ms: u64,
}
/// Response a Ping. Útil para health-checks y discovery.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingResponse {
pub model: String,
pub embed_dim: u32,
}
/// Error retornado por el proveedor en lugar de una response normal.
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("nous: {error}")]
pub struct ErrorResponse {
pub error: String,
}
// =====================================================================
// Transport
// =====================================================================
pub mod transport {
use std::path::PathBuf;
/// Variable de entorno para sobreescribir la ruta del socket.
pub const SOCKET_ENV: &str = "NOUSER_NOUS_SOCKET";
/// Nombre por default del socket dentro del runtime dir.
pub const SOCKET_NAME: &str = "nouser-nous.sock";
/// Ruta canónica al socket de Nous.
pub fn default_socket_path() -> PathBuf {
if let Ok(p) = std::env::var(SOCKET_ENV) {
return PathBuf::from(p);
}
let base = std::env::var_os("XDG_RUNTIME_DIR")
.map(PathBuf::from)
.unwrap_or_else(std::env::temp_dir);
base.join(SOCKET_NAME)
}
}
// =====================================================================
// Names compartidos para el broker brahman
// =====================================================================
/// Nombre del flow output del proveedor (entrada del consumidor).
pub const FLOW_EMBED_RESULT: &str = "embed-result";
/// Nombre del flow input del proveedor (salida del consumidor).
pub const FLOW_EMBED_REQUEST: &str = "embed-request";
/// Tipo del flow: el wire es JSON serializado, así que el TypeRef
/// declarado en la Card es `primitive::json`.
pub const FLOW_TYPE_NAME: &str = "json";
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_roundtrip_json() {
let req = EmbedRequest {
kind: RequestKind::EmbedFile,
payload: serde_json::to_value(EmbedFilePayload {
path: "/x/y.rs".into(),
extension: Some("rs".into()),
size: 1024,
mtime_ms: 1_700_000_000_000,
})
.unwrap(),
};
let s = serde_json::to_string(&req).unwrap();
let parsed: EmbedRequest = serde_json::from_str(&s).unwrap();
assert_eq!(parsed.kind, RequestKind::EmbedFile);
}
#[test]
fn response_roundtrip() {
let resp = EmbedResponse {
embedding: vec![0.1, 0.2, 0.3],
model: "mock-pseudo-32d".into(),
elapsed_ms: 1,
};
let s = serde_json::to_string(&resp).unwrap();
let parsed: EmbedResponse = serde_json::from_str(&s).unwrap();
assert_eq!(parsed.model, "mock-pseudo-32d");
assert_eq!(parsed.embedding.len(), 3);
}
}