feat(nouser): Phase D-2 — proveedor Nous real (LLM) detrás de feature

Cierra el ciclo del módulo Nous: existe un proveedor que produce
embeddings reales con un modelo LLM, mientras que `cargo build` sin
features sigue siendo liviano (no descarga ni compila ML deps).

Crate nuevo crates/modules/nouser/nous-real con dos modos según feature:

- Sin feature (default): stub.
  cargo build -p nouser-nous-real (~10s, sin ML deps).
  Bin arranca, sidecarea a brahman-init declarando la Card,
  escucha en el socket Nous, rechaza requests con un ErrorResponse
  explicativo: "compilado sin la feature embeddings, rebuild con
  cargo build -p nouser-nous-real --features embeddings".
  cargo build --workspace SIGUE siendo limpio.

- Con --features embeddings: real.
  Pulls fastembed = "4" → ort 2.0.0-rc.9 (ONNX Runtime con binarios
  descargados por Cargo) + tokenizers 0.21 + ~30 transitive deps.
  Compila en ~50s.
  Modelo default: all-MiniLM-L6-v2 (384-d, descargado a
  ~/.cache/fastembed la primera vez).
  EmbedText: pasa el texto al modelo → vector 384-d.
  EmbedFile: lee primeros 8KiB UTF-8 lossy, embed como texto.
  Ping: devuelve model_id + embed_dim reales.

Card declara label "nouser.nous_real" + priority_contexts.prod = +1.
En contexto prod gana sobre el mock; en test el mock gana por su +1
en test. Sin contexto, empate alfabético.

Validación end-to-end con modelo real:
  $ ente-zero & nouser-nous-real &
  $ python3 socket-probe '{"kind":"embed_text","payload":{"text":"..."}}'
    model: real-fastembed-allMiniLML6V2-384d
    elapsed_ms: 8
    embed_dim: 384

Tradeoff: dim mock (32) vs real (384) son incompatibles. Cambiar
proveedor invalida centroides cacheados — documentar "limpiar DB al
swap".

Workspace state:
- cargo build --workspace limpio sin features (no ML deps pulled).
- cargo build -p nouser-nous-real --features embeddings funciona.
- 0 errores, 0 warnings en ambos modos.

Pendientes para D-3 / futuro:
- Discovery de socket: el consumer hoy usa NOUSER_NOUS_SOCKET hardcoded.
  Para que el broker elija real vs mock per-contexto, falta o un campo
  socket en el MatchEvent o un broker query "dame socket de session X".
- Coexistencia: ambos providers compiten por el mismo socket path por
  default. Parametrizarlos cuando se quiera correrlos juntos.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-08 19:08:27 +00:00
parent b3c3c00cf2
commit 11fc95629c
23 changed files with 31943 additions and 22 deletions
@@ -0,0 +1,35 @@
[package]
name = "nouser-nous-real"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "Nouser — proveedor Nous con LLM real (text-embedding via ONNX). El soporte AI vive detrás del feature `embeddings`; sin él, este crate compila como stub mínimo."
[features]
# Sin features = stub que arranca y rechaza requests. Compila en
# segundos, sin descargar nada.
default = []
# Con feature embeddings: pulls fastembed + ONNX Runtime descargado.
# Modelo default: all-MiniLM-L6-v2 (384-d, ~80MB descargado al primer
# run y cacheado).
embeddings = ["dep:fastembed"]
[dependencies]
brahman-card = { path = "../../../core/brahman-card" }
brahman-sidecar = { path = "../../../shared/brahman-sidecar" }
nouser-nous = { path = "../nous" }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
ulid = { workspace = true }
# Opcional: gateado por feature `embeddings`.
fastembed = { version = "4", optional = true }
[[bin]]
name = "nouser-nous-real"
path = "src/main.rs"
@@ -0,0 +1,151 @@
//! Modo embeddings: usa fastembed-rs (ONNX Runtime) para producir
//! vectores reales de text-embedding.
//!
//! Modelo default: `all-MiniLM-L6-v2` (384-d). Se descarga al primer
//! arranque a `~/.cache/fastembed` y queda cacheado.
//!
//! ## Mapeo del contrato
//!
//! - `EmbedText`: pasa el texto al modelo, devuelve el vector 384-d.
//! - `EmbedFile`: lee hasta los primeros 8 KiB del archivo, los
//! interpreta como UTF-8 con replacement-char, y los embeda como
//! texto. Para archivos binarios el resultado no es semánticamente
//! útil — caller decide qué hacer.
//! - `Ping`: devuelve `model_id` y `embed_dim` reales.
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use fastembed::{EmbeddingModel, InitOptions, TextEmbedding};
use nouser_nous::{
EmbedFilePayload, EmbedRequest, EmbedResponse, EmbedTextPayload, ErrorResponse, PingResponse,
RequestKind,
};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tracing::{info, warn};
const MAX_FILE_BYTES: usize = 8192;
/// Backend concreto: posee el modelo cargado.
pub struct Backend {
model: TextEmbedding,
}
impl Backend {
pub fn init() -> Result<Self, String> {
info!("cargando modelo all-MiniLM-L6-v2 (puede descargar ~80MB la primera vez)");
let opts = InitOptions::new(EmbeddingModel::AllMiniLML6V2)
.with_show_download_progress(true);
let model = TextEmbedding::try_new(opts).map_err(|e| format!("fastembed init: {e}"))?;
info!("modelo listo");
Ok(Self { model })
}
fn embed_one(&self, text: &str) -> Result<Vec<f32>, String> {
let out = self
.model
.embed(vec![text], None)
.map_err(|e| format!("embed: {e}"))?;
out.into_iter()
.next()
.ok_or_else(|| "fastembed devolvió 0 vectores".to_string())
}
}
pub async fn handle_conn(stream: UnixStream, backend: Arc<Backend>) -> std::io::Result<()> {
let mut reader = BufReader::new(stream);
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
let req: EmbedRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
return write_error(reader.into_inner(), format!("JSON inválido: {e}")).await;
}
};
let started = Instant::now();
let result = match req.kind {
RequestKind::EmbedFile => handle_file(req.payload, &backend, started),
RequestKind::EmbedText => handle_text(req.payload, &backend, started),
RequestKind::Ping => handle_ping(),
};
let mut stream = reader.into_inner();
match result {
Ok(json) => {
stream.write_all(json.as_bytes()).await?;
stream.write_all(b"\n").await?;
}
Err(msg) => return write_error(stream, msg).await,
}
stream.shutdown().await?;
Ok(())
}
fn handle_text(
payload: serde_json::Value,
backend: &Backend,
started: Instant,
) -> Result<String, String> {
let p: EmbedTextPayload =
serde_json::from_value(payload).map_err(|e| format!("payload: {e}"))?;
info!(text_len = p.text.len(), "embed_text");
let v = backend.embed_one(&p.text)?;
let resp = EmbedResponse {
embedding: v,
model: super::model_id().to_string(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
fn handle_file(
payload: serde_json::Value,
backend: &Backend,
started: Instant,
) -> Result<String, String> {
let p: EmbedFilePayload =
serde_json::from_value(payload).map_err(|e| format!("payload: {e}"))?;
info!(path = %p.path, "embed_file (lee contenido)");
let path = PathBuf::from(&p.path);
let mut file = File::open(&path).map_err(|e| format!("abrir archivo: {e}"))?;
let mut buf = vec![0u8; MAX_FILE_BYTES];
let n = file.read(&mut buf).map_err(|e| format!("leer archivo: {e}"))?;
buf.truncate(n);
let text = String::from_utf8_lossy(&buf).to_string();
let v = backend.embed_one(&text)?;
let resp = EmbedResponse {
embedding: v,
model: super::model_id().to_string(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
fn handle_ping() -> Result<String, String> {
let resp = PingResponse {
model: super::model_id().to_string(),
embed_dim: super::embed_dim(),
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))
}
async fn write_error(mut stream: UnixStream, msg: String) -> std::io::Result<()> {
warn!(error = %msg, "respuesta de error");
let resp = ErrorResponse { error: msg };
let json = serde_json::to_string(&resp).unwrap_or_else(|_| "{\"error\":\"encode\"}".into());
stream.write_all(json.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.shutdown().await?;
Ok(())
}
+182
View File
@@ -0,0 +1,182 @@
//! `nouser-nous-real` — proveedor Nous con LLM real (gated por feature).
//!
//! ## Build modes
//!
//! - `cargo build -p nouser-nous-real`
//! Compila como **stub**: bin que arranca, sidecarea al brahman-init
//! pero rechaza toda request con un error explicando que falta la
//! feature. Útil para que `cargo build --workspace` no requiera ML
//! deps.
//!
//! - `cargo build -p nouser-nous-real --features embeddings`
//! Compila con `fastembed` + ONNX Runtime descargado por Cargo.
//! Modelo default: `all-MiniLM-L6-v2` (384-d, ~80 MB descargado al
//! primer run y cacheado en `~/.cache/fastembed`).
//!
//! ## Diseño
//!
//! Mismo contrato wire que `nouser-nous-mock` (`nouser-nous` crate). La
//! diferencia operativa: real produce 384-d con semantic content
//! (text-embedding del modelo); mock produce 32-d con metadata-hashing.
//! No son intercambiables a media-deployment — los centroides de
//! Mónadas calculadas con uno NO matchean con el otro.
//!
//! La Card declara `priority_contexts.prod = { priority_offset: +1 }`,
//! contrapeso del mock que tiene `+1 en test`. Así el broker brahman
//! elige automáticamente:
//! - `BRAHMAN_BROKER_CONTEXT=test` → mock gana.
//! - `BRAHMAN_BROKER_CONTEXT=prod` → real gana.
//! - sin contexto → empate por label alfabético.
#![forbid(unsafe_code)]
use std::collections::BTreeMap;
use brahman_card::{
ulid::Ulid, Card, CardKind, ContextBias, Flow, Flows, Lifecycle, Payload, Priority,
Supervision, TypeRef,
};
use nouser_nous::{transport, FLOW_EMBED_REQUEST, FLOW_EMBED_RESULT, FLOW_TYPE_NAME};
use tokio::net::UnixListener;
use tracing::info;
#[cfg(feature = "embeddings")]
mod embeddings;
#[cfg(not(feature = "embeddings"))]
mod stub;
#[cfg(feature = "embeddings")]
const MODEL_ID: &str = "real-fastembed-allMiniLML6V2-384d";
#[cfg(not(feature = "embeddings"))]
const MODEL_ID: &str = "real-stub-no-feature";
#[cfg(feature = "embeddings")]
const EMBED_DIM: u32 = 384;
#[cfg(not(feature = "embeddings"))]
const EMBED_DIM: u32 = 0;
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
init_tracing();
#[cfg(not(feature = "embeddings"))]
info!(
"nouser-nous-real corriendo en modo STUB (compilá con \
--features embeddings para activar el modelo)"
);
// 1. Sidecar al brahman-init (mismo patrón que el mock).
let card = build_card();
info!(label = %card.label, mode = MODEL_ID, "publicando Card al brahman-init");
brahman_sidecar::spawn(card);
// 2. Bind del socket Nous (mismo path que el mock — son swappable).
let sock_path = transport::default_socket_path();
if sock_path.exists() {
std::fs::remove_file(&sock_path)?;
}
if let Some(parent) = sock_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&sock_path)?;
info!(socket = %sock_path.display(), "nouser-nous-real escuchando");
// 3. Inicializar el modelo (sólo en modo embeddings).
#[cfg(feature = "embeddings")]
let backend = embeddings::Backend::init().map_err(|e| {
std::io::Error::other(format!("init modelo: {e}"))
})?;
#[cfg(feature = "embeddings")]
let backend = std::sync::Arc::new(backend);
// 4. Accept loop.
loop {
let (stream, _addr) = listener.accept().await?;
#[cfg(feature = "embeddings")]
{
let backend = backend.clone();
tokio::spawn(async move {
if let Err(e) = embeddings::handle_conn(stream, backend).await {
tracing::warn!(error = %e, "conn falló");
}
});
}
#[cfg(not(feature = "embeddings"))]
{
tokio::spawn(async move {
if let Err(e) = stub::handle_conn(stream).await {
tracing::warn!(error = %e, "conn falló");
}
});
}
}
}
fn init_tracing() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.with_target(false)
.compact()
.init();
}
/// Card que real-nous anuncia. Idéntica al mock excepto por:
/// - label distinto (`nouser.nous_real`) para que coexistan en el broker.
/// - `priority_contexts.prod = +1` (gana en contexto prod).
fn build_card() -> Card {
let mut priority_contexts = BTreeMap::new();
priority_contexts.insert(
"prod".into(),
ContextBias {
pin_to: None,
priority_offset: 1,
},
);
Card {
schema_version: brahman_card::CARD_SCHEMA_VERSION,
id: Ulid::new(),
label: "nouser.nous_real".into(),
payload: Payload::Virtual,
supervision: Supervision::Delegate,
lifecycle: Lifecycle::Daemon,
priority: Priority::Normal,
kind: CardKind::Ente,
flow: Flows {
input: vec![Flow {
name: FLOW_EMBED_REQUEST.into(),
ty: TypeRef::Primitive {
name: FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
output: vec![Flow {
name: FLOW_EMBED_RESULT.into(),
ty: TypeRef::Primitive {
name: FLOW_TYPE_NAME.into(),
},
pin_to: None,
}],
},
priority_contexts,
..Default::default()
}
}
// Helpers compartidos. Anotados allow(dead_code) porque en stub mode
// algunos quedan sin uso pero los queremos disponibles consistentemente.
#[allow(dead_code)]
pub(crate) fn model_id() -> &'static str {
MODEL_ID
}
#[allow(dead_code)]
pub(crate) fn embed_dim() -> u32 {
EMBED_DIM
}
@@ -0,0 +1,36 @@
//! Modo stub: arranca el bin pero rechaza las requests con un error
//! que explica que falta la feature `embeddings`.
use nouser_nous::{EmbedRequest, ErrorResponse};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tracing::warn;
pub async fn handle_conn(stream: UnixStream) -> std::io::Result<()> {
let mut reader = BufReader::new(stream);
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Ok(());
}
// Parseamos para validar la forma; igual rechazamos.
let _: Result<EmbedRequest, _> = serde_json::from_str(&line);
warn!("rechazando request en modo stub (feature `embeddings` ausente)");
let resp = ErrorResponse {
error: format!(
"nouser-nous-real compilado sin la feature `embeddings`. \
Rebuild con: cargo build -p nouser-nous-real --features embeddings"
),
};
let mut stream = reader.into_inner();
let payload = serde_json::to_string(&resp).unwrap_or_else(|_| {
"{\"error\":\"stub mode and serialization failed\"}".to_string()
});
stream.write_all(payload.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.shutdown().await?;
Ok(())
}