diff --git a/CHANGELOG.md b/CHANGELOG.md index 601d8ac..4239500 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,55 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(nous-real): cache de embeddings + write-through al CAS de arje +Cierra el ciclo de la crítica del usuario: "Si un archivo no ha +cambiado su hash en el CAS, Nouser ni siquiera debería pedirle al +LLM que re-genere el embedding". El modelo real +(`fastembed-allMiniLML6V2-384d`, ~1-50ms por archivo) era invocado +ciegamente en cada re-cluster del watcher. Ahora se cachea por +`sha256(bytes-vistos) + model_id`. + +Pipeline en `handle_file`: +1. Lee primeros 8 KiB (igual que antes). +2. `file_sha = ente_cas::sha256_of(buf)` — hash de los bytes que el + modelo *realmente* verá (no del archivo completo). Garantiza + que un archivo creciendo más allá de la ventana sin tocar la + cabeza siga sirviendo cache hits. +3. Cache lookup: HIT → respuesta en ~µs. +4. MISS → `ente_cas::store(&buf)` (write-through al CAS de arje, + no-fatal si falla) → `backend.embed_one(text)` → `cache.put(...)`. + +Backend de cache: sled local en +`$XDG_CACHE_HOME/brahman/nouser-nous-real-embed-cache.sled`. Tree +versionado `embed_cache_v1`; el `MODEL_ID` viaja en la key, así que +cambiar de modelo invalida el cache implícitamente. Override por env +`NOUSER_NOUS_REAL_CACHE`. + +Encoding compacto: cada `Vec` se serializa como bytes +little-endian (4B por f32, sin overhead). Para el modelo default +(384-d) son 1.5 KiB por entry. Decode tolera bytes corruptos +(longitud no-múltiplo de 4 → `None`, no panic). + +Por qué sled y no `ente-cas` directo: el CAS de arje es flat +sha256-keyed; la cache necesita un mapeo `(file_sha, model_id) → +embedding`, no expresable como entry CAS. El write-through a CAS +queda como registro consultable + futura GC. + +API: +- `EmbedCache::open()` → abre sled, idempotente. +- `EmbedCache::open_at(dir)` para tests. +- `EmbedCache::get(sha, model)` → `Option>`. +- `EmbedCache::put(sha, model, &[f32])` → no-fatal en error. +- `EmbedCache::len()` → contador para logs (best-effort). + +Mock NO se modifica — su embedding pseudo-32d es metadata-hashing +puro, sin costo. Cachearlo sería overhead. + +Tests: 5 unitarios (`roundtrip_returns_same_vector`, `miss_returns_none`, +`different_models_do_not_collide`, `different_content_different_keys`, +`corrupted_value_returns_none`). Verdes con `--features embeddings`; +stub mode (sin feature) sigue compilando sin tocar cache. + ### chore(nakui): alinear `nakui-core` con `[workspace.package]` y deps compartidas Cleanup de drift de convenciones: `nakui-core` era el único crate del monorepo que mantenía `version = "0.1.0"` / `edition = "2021"` / diff --git a/Cargo.lock b/Cargo.lock index b533e61..0137cc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6418,9 +6418,12 @@ version = "0.1.0" dependencies = [ "brahman-card", "brahman-sidecar", + "ente-cas", "fastembed", "nouser-nous", "serde_json", + "sled", + "tempfile", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/modules/nouser/nous-real/Cargo.toml b/crates/modules/nouser/nous-real/Cargo.toml index 9de52ae..81fdf93 100644 --- a/crates/modules/nouser/nous-real/Cargo.toml +++ b/crates/modules/nouser/nous-real/Cargo.toml @@ -20,8 +20,10 @@ embeddings = ["dep:fastembed"] [dependencies] brahman-card = { path = "../../../core/brahman-card" } brahman-sidecar = { path = "../../../shared/brahman-sidecar" } +ente-cas = { path = "../../../core/ente-cas" } nouser-nous = { path = "../nous" } serde_json = { workspace = true } +sled = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } @@ -30,6 +32,9 @@ ulid = { workspace = true } # Opcional: gateado por feature `embeddings`. fastembed = { version = "4", optional = true } +[dev-dependencies] +tempfile = { workspace = true } + [[bin]] name = "nouser-nous-real" path = "src/main.rs" diff --git a/crates/modules/nouser/nous-real/src/cache.rs b/crates/modules/nouser/nous-real/src/cache.rs new file mode 100644 index 0000000..bb5a8ba --- /dev/null +++ b/crates/modules/nouser/nous-real/src/cache.rs @@ -0,0 +1,199 @@ +//! Cache de embeddings keyed por sha256 del contenido + model_id. +//! +//! Razón de existir: el modelo real (`fastembed-allMiniLML6V2`) es +//! caro (1-50 ms por archivo según tamaño y CPU). Cada vez que el +//! daemon de nouser re-publica una Mónada o el watcher dispara un +//! re-cluster por cambio de FS, todos los archivos pasan otra vez +//! por embed. Para árboles de 1000 archivos, eso son segundos +//! desperdiciados re-embedidando contenido que no cambió. +//! +//! ## Diseño +//! +//! - **Cache key**: `sha256(bytes que el modelo realmente vio)` + +//! `MODEL_ID` (string). Usar el sha de los bytes-vistos garantiza +//! que la cache no devuelva un embedding de contenido viejo +//! simplemente porque el path no cambió. +//! - **Cache value**: el `Vec` serializado como bytes +//! little-endian (4 bytes por f32). Compacto, sin overhead de +//! bincode/postcard para datos numéricos puros. +//! - **Backend**: sled, tree único `embed_cache_v1`. Path: +//! `$XDG_CACHE_HOME/brahman/nouser-nous-real-embed-cache.sled`. +//! +//! ## Versionado +//! +//! El nombre del tree (`embed_cache_v1`) es el "schema version" del +//! formato value. Si bumpeamos a (p. ej.) almacenar también el +//! tiempo de cómputo o el ONNX session id, creamos `embed_cache_v2` +//! y el viejo queda como dato muerto que sled puede limpiar. +//! +//! El `MODEL_ID` viaja dentro del key, así que cambiar de modelo +//! invalida implícitamente las entradas viejas (no se accede más +//! a esos keys; sled las mantiene hasta GC manual). + +use std::path::PathBuf; + +/// Wrapper sobre sled::Db con la API justa que necesita `handle_file`. +#[derive(Clone)] +pub struct EmbedCache { + tree: sled::Tree, +} + +impl EmbedCache { + /// Abre (o crea) la cache en su path canónico. El sled::Db queda + /// referenciado por el Tree; mientras `EmbedCache` viva, el DB + /// vive. + pub fn open() -> Result { + let path = default_path(); + if let Some(parent) = path.parent() { + // best-effort: si no podemos crear el dir, sled falla con + // mensaje específico abajo. + let _ = std::fs::create_dir_all(parent); + } + let db = sled::open(&path)?; + let tree = db.open_tree("embed_cache_v1")?; + Ok(Self { tree }) + } + + /// Variante para tests: cache efímera bajo `dir`. + #[cfg(test)] + pub fn open_at(dir: &std::path::Path) -> Result { + let db = sled::open(dir)?; + let tree = db.open_tree("embed_cache_v1")?; + Ok(Self { tree }) + } + + /// Lookup. `None` si miss; `Some(vec)` si hit. + pub fn get(&self, file_sha: &[u8; 32], model_id: &str) -> Option> { + let key = build_key(file_sha, model_id); + let bytes = self.tree.get(&key).ok()??; + decode_embedding(&bytes) + } + + /// Almacena. Errores se loggean pero no propagan — cache miss es + /// recuperable, no querés tirar el embed válido por fallo de I/O + /// de cache. + pub fn put(&self, file_sha: &[u8; 32], model_id: &str, embedding: &[f32]) { + let key = build_key(file_sha, model_id); + let bytes = encode_embedding(embedding); + if let Err(e) = self.tree.insert(key, bytes) { + tracing::warn!(error = %e, "embed-cache put falló (no-fatal)"); + } + } + + /// Cantidad actual de entradas (best-effort para logs). + pub fn len(&self) -> usize { + self.tree.len() + } +} + +/// Path default. Honra `XDG_CACHE_HOME`, cae a `$HOME/.cache`, y de +/// último recurso a `/tmp` (sin persistencia, pero al menos no +/// crashea en entornos minimalistas como CI sin HOME). +fn default_path() -> PathBuf { + if let Ok(p) = std::env::var("NOUSER_NOUS_REAL_CACHE") { + return PathBuf::from(p); + } + let base = std::env::var("XDG_CACHE_HOME") + .ok() + .map(PathBuf::from) + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join(".cache")) + }) + .unwrap_or_else(std::env::temp_dir); + base.join("brahman").join("nouser-nous-real-embed-cache.sled") +} + +fn build_key(file_sha: &[u8; 32], model_id: &str) -> Vec { + let mut k = Vec::with_capacity(32 + 1 + model_id.len()); + k.extend_from_slice(file_sha); + // separator byte para que prefijos de model_id no choquen con + // bytes del sha (improbable pero barato). + k.push(0xff); + k.extend_from_slice(model_id.as_bytes()); + k +} + +fn encode_embedding(v: &[f32]) -> Vec { + let mut out = Vec::with_capacity(v.len() * 4); + for f in v { + out.extend_from_slice(&f.to_le_bytes()); + } + out +} + +fn decode_embedding(bytes: &[u8]) -> Option> { + if bytes.len() % 4 != 0 { + return None; + } + let mut out = Vec::with_capacity(bytes.len() / 4); + for chunk in bytes.chunks_exact(4) { + out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); + } + Some(out) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sha(s: &[u8]) -> [u8; 32] { + ente_cas::sha256_of(s) + } + + #[test] + fn roundtrip_returns_same_vector() { + let dir = tempfile::tempdir().unwrap(); + let cache = EmbedCache::open_at(dir.path()).unwrap(); + let key = sha(b"hello world"); + let v = vec![0.1f32, -0.5, 1.0, 3.14159]; + cache.put(&key, "real-fastembed-allMiniLML6V2-384d", &v); + let got = cache + .get(&key, "real-fastembed-allMiniLML6V2-384d") + .expect("hit esperado"); + assert_eq!(got, v); + } + + #[test] + fn miss_returns_none() { + let dir = tempfile::tempdir().unwrap(); + let cache = EmbedCache::open_at(dir.path()).unwrap(); + let key = sha(b"never stored"); + assert!(cache.get(&key, "real-fastembed-allMiniLML6V2-384d").is_none()); + } + + #[test] + fn different_models_do_not_collide() { + let dir = tempfile::tempdir().unwrap(); + let cache = EmbedCache::open_at(dir.path()).unwrap(); + let key = sha(b"same content"); + cache.put(&key, "model-a", &[1.0, 2.0]); + cache.put(&key, "model-b", &[7.0, 8.0]); + assert_eq!(cache.get(&key, "model-a").unwrap(), vec![1.0, 2.0]); + assert_eq!(cache.get(&key, "model-b").unwrap(), vec![7.0, 8.0]); + } + + #[test] + fn different_content_different_keys() { + let dir = tempfile::tempdir().unwrap(); + let cache = EmbedCache::open_at(dir.path()).unwrap(); + let k1 = sha(b"abc"); + let k2 = sha(b"abd"); + cache.put(&k1, "m", &[1.0]); + assert!(cache.get(&k2, "m").is_none()); + } + + #[test] + fn corrupted_value_returns_none() { + // Si sled devuelve bytes con length no múltiplo de 4, decode + // debe fallar limpio (None) en vez de panicar. + let dir = tempfile::tempdir().unwrap(); + let cache = EmbedCache::open_at(dir.path()).unwrap(); + let key = sha(b"x"); + // Insertamos manualmente bytes inválidos. + let raw_key = build_key(&key, "m"); + cache.tree.insert(raw_key, &[1u8, 2, 3][..]).unwrap(); + assert!(cache.get(&key, "m").is_none()); + } +} diff --git a/crates/modules/nouser/nous-real/src/embeddings.rs b/crates/modules/nouser/nous-real/src/embeddings.rs index e64b3cc..72a8987 100644 --- a/crates/modules/nouser/nous-real/src/embeddings.rs +++ b/crates/modules/nouser/nous-real/src/embeddings.rs @@ -28,6 +28,8 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; use tracing::{info, warn}; +use crate::cache::EmbedCache; + const MAX_FILE_BYTES: usize = 8192; /// Backend concreto: posee el modelo cargado. @@ -56,7 +58,11 @@ impl Backend { } } -pub async fn handle_conn(stream: UnixStream, backend: Arc) -> std::io::Result<()> { +pub async fn handle_conn( + stream: UnixStream, + backend: Arc, + cache: Option, +) -> std::io::Result<()> { let mut reader = BufReader::new(stream); let mut line = String::new(); let n = reader.read_line(&mut line).await?; @@ -73,7 +79,7 @@ pub async fn handle_conn(stream: UnixStream, backend: Arc) -> std::io:: let started = Instant::now(); let result = match req.kind { - RequestKind::EmbedFile => handle_file(req.payload, &backend, started), + RequestKind::EmbedFile => handle_file(req.payload, &backend, cache.as_ref(), started), RequestKind::EmbedText => handle_text(req.payload, &backend, started), RequestKind::Ping => handle_ping(), }; @@ -110,23 +116,71 @@ fn handle_text( fn handle_file( payload: serde_json::Value, backend: &Backend, + cache: Option<&EmbedCache>, started: Instant, ) -> Result { let p: EmbedFilePayload = serde_json::from_value(payload).map_err(|e| format!("payload: {e}"))?; - info!(path = %p.path, "embed_file (lee contenido)"); let path = PathBuf::from(&p.path); let mut file = File::open(&path).map_err(|e| format!("abrir archivo: {e}"))?; let mut buf = vec![0u8; MAX_FILE_BYTES]; let n = file.read(&mut buf).map_err(|e| format!("leer archivo: {e}"))?; buf.truncate(n); - let text = String::from_utf8_lossy(&buf).to_string(); + let model_id = super::model_id(); + // Hash de los bytes que el modelo realmente verá. Si el archivo + // crece pasada la ventana MAX_FILE_BYTES sin modificar la cabeza, + // el hash NO cambia — el embedding cacheado sigue siendo válido + // bajo la semántica del proveedor (el modelo nunca vio los bytes + // adicionales). Si la cabeza cambia, el hash cambia y caemos a + // re-embed naturalmente. + let file_sha = ente_cas::sha256_of(&buf); + + if let Some(cache) = cache { + if let Some(cached) = cache.get(&file_sha, model_id) { + info!( + path = %p.path, + sha = %ente_cas::hex(&file_sha), + bytes = n, + "embed_file: cache HIT" + ); + let resp = EmbedResponse { + embedding: cached, + model: model_id.to_string(), + elapsed_ms: started.elapsed().as_millis() as u64, + }; + return serde_json::to_string(&resp).map_err(|e| format!("encode: {e}")); + } + } + + info!( + path = %p.path, + sha = %ente_cas::hex(&file_sha), + bytes = n, + "embed_file: cache MISS — invocando modelo" + ); + + // Write-through al CAS de arje: hacemos la cabeza del archivo + // direccionable por contenido. No es la fuente de verdad para + // el cache (sled lo es) pero deja un registro consultable por + // herramientas como `ente-cas gc` y permite que otros consumers + // resuelvan los bytes por hash. + if let Err(e) = ente_cas::store(&buf) { + // No-fatal: si CAS no escribe, cacheamos el embedding igual. + warn!(error = %e, "ente_cas::store falló (no-fatal)"); + } + + let text = String::from_utf8_lossy(&buf).to_string(); let v = backend.embed_one(&text)?; + + if let Some(cache) = cache { + cache.put(&file_sha, model_id, &v); + } + let resp = EmbedResponse { embedding: v, - model: super::model_id().to_string(), + model: model_id.to_string(), elapsed_ms: started.elapsed().as_millis() as u64, }; serde_json::to_string(&resp).map_err(|e| format!("encode: {e}")) diff --git a/crates/modules/nouser/nous-real/src/main.rs b/crates/modules/nouser/nous-real/src/main.rs index 9aa1692..3bd8f0b 100644 --- a/crates/modules/nouser/nous-real/src/main.rs +++ b/crates/modules/nouser/nous-real/src/main.rs @@ -40,6 +40,8 @@ use nouser_nous::{transport, FLOW_EMBED_REQUEST, FLOW_EMBED_RESULT, FLOW_TYPE_NA use tokio::net::UnixListener; use tracing::info; +#[cfg(feature = "embeddings")] +mod cache; #[cfg(feature = "embeddings")] mod embeddings; #[cfg(not(feature = "embeddings"))] @@ -90,15 +92,30 @@ async fn main() -> std::io::Result<()> { #[cfg(feature = "embeddings")] let backend = std::sync::Arc::new(backend); - // 4. Accept loop. + // 4. Abrir el cache de embeddings (sled local, sha256-keyed). + // Si falla, seguimos sin cache — degrada a "siempre embed". + #[cfg(feature = "embeddings")] + let embed_cache = match cache::EmbedCache::open() { + Ok(c) => { + info!(entries = c.len(), "embed-cache abierto"); + Some(c) + } + Err(e) => { + tracing::warn!(error = %e, "embed-cache no disponible — todas las requests irán al modelo"); + None + } + }; + + // 5. Accept loop. loop { let (stream, _addr) = listener.accept().await?; #[cfg(feature = "embeddings")] { let backend = backend.clone(); + let cache = embed_cache.clone(); tokio::spawn(async move { - if let Err(e) = embeddings::handle_conn(stream, backend).await { + if let Err(e) = embeddings::handle_conn(stream, backend, cache).await { tracing::warn!(error = %e, "conn falló"); } });