feat(nous-real): cache de embeddings + write-through al CAS de arje

Cierra el ciclo del feedback: el modelo real (fastembed-allMiniLML6V2,
~1-50ms por archivo) era invocado ciegamente en cada re-cluster del
watcher. Ahora se cachea por sha256(bytes-vistos) + model_id, con
write-through al CAS de arje.

Pipeline en handle_file:
1. Lee primeros 8 KiB del archivo (igual que antes).
2. file_sha = ente_cas::sha256_of(buf) — hash de los bytes que el
   modelo *realmente* verá. Garantiza que un archivo creciendo mas
   alla de la ventana sin tocar la cabeza siga sirviendo cache hits.
3. Cache lookup -> HIT: respuesta en us, sin invocar fastembed.
4. MISS: ente_cas::store(&buf) (write-through, no-fatal si falla) ->
   backend.embed_one(text) -> cache.put(...).

Backend de cache: sled local en
$XDG_CACHE_HOME/brahman/nouser-nous-real-embed-cache.sled. Tree
versionado embed_cache_v1; el MODEL_ID viaja en la key, asi que
cambiar de modelo invalida el cache implicitamente. Override por env
NOUSER_NOUS_REAL_CACHE.

Encoding compacto: cada Vec<f32> se serializa como bytes little-endian
(4B por f32, sin overhead). Para 384-d son 1.5 KiB por entry. Decode
tolera bytes corruptos (longitud no-multiplo de 4 -> None, no panic).

Por que sled y no ente-cas directo: el CAS de arje es flat
sha256-keyed; la cache necesita un mapeo (file_sha, model_id) ->
embedding, no expresable como entry CAS. El write-through a CAS queda
como registro consultable + futura GC.

Mock NO se modifica — su embedding pseudo-32d es metadata-hashing puro,
sin costo. Cachearlo seria overhead.

Tests: 5 unitarios verdes (roundtrip, miss, model collision, content
collision, corrupted value). Stub mode (sin feature) sigue compilando
sin tocar cache.
This commit is contained in:
Sergio
2026-05-09 02:57:55 +00:00
parent 79d42aba28
commit b23ddf2980
6 changed files with 334 additions and 7 deletions
@@ -28,6 +28,8 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tracing::{info, warn};
use crate::cache::EmbedCache;
const MAX_FILE_BYTES: usize = 8192;
/// Backend concreto: posee el modelo cargado.
@@ -56,7 +58,11 @@ impl Backend {
}
}
pub async fn handle_conn(stream: UnixStream, backend: Arc<Backend>) -> std::io::Result<()> {
pub async fn handle_conn(
stream: UnixStream,
backend: Arc<Backend>,
cache: Option<EmbedCache>,
) -> std::io::Result<()> {
let mut reader = BufReader::new(stream);
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
@@ -73,7 +79,7 @@ pub async fn handle_conn(stream: UnixStream, backend: Arc<Backend>) -> std::io::
let started = Instant::now();
let result = match req.kind {
RequestKind::EmbedFile => handle_file(req.payload, &backend, started),
RequestKind::EmbedFile => handle_file(req.payload, &backend, cache.as_ref(), started),
RequestKind::EmbedText => handle_text(req.payload, &backend, started),
RequestKind::Ping => handle_ping(),
};
@@ -110,23 +116,71 @@ fn handle_text(
fn handle_file(
payload: serde_json::Value,
backend: &Backend,
cache: Option<&EmbedCache>,
started: Instant,
) -> Result<String, String> {
let p: EmbedFilePayload =
serde_json::from_value(payload).map_err(|e| format!("payload: {e}"))?;
info!(path = %p.path, "embed_file (lee contenido)");
let path = PathBuf::from(&p.path);
let mut file = File::open(&path).map_err(|e| format!("abrir archivo: {e}"))?;
let mut buf = vec![0u8; MAX_FILE_BYTES];
let n = file.read(&mut buf).map_err(|e| format!("leer archivo: {e}"))?;
buf.truncate(n);
let text = String::from_utf8_lossy(&buf).to_string();
let model_id = super::model_id();
// Hash de los bytes que el modelo realmente verá. Si el archivo
// crece pasada la ventana MAX_FILE_BYTES sin modificar la cabeza,
// el hash NO cambia — el embedding cacheado sigue siendo válido
// bajo la semántica del proveedor (el modelo nunca vio los bytes
// adicionales). Si la cabeza cambia, el hash cambia y caemos a
// re-embed naturalmente.
let file_sha = ente_cas::sha256_of(&buf);
if let Some(cache) = cache {
if let Some(cached) = cache.get(&file_sha, model_id) {
info!(
path = %p.path,
sha = %ente_cas::hex(&file_sha),
bytes = n,
"embed_file: cache HIT"
);
let resp = EmbedResponse {
embedding: cached,
model: model_id.to_string(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
return serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"));
}
}
info!(
path = %p.path,
sha = %ente_cas::hex(&file_sha),
bytes = n,
"embed_file: cache MISS — invocando modelo"
);
// Write-through al CAS de arje: hacemos la cabeza del archivo
// direccionable por contenido. No es la fuente de verdad para
// el cache (sled lo es) pero deja un registro consultable por
// herramientas como `ente-cas gc` y permite que otros consumers
// resuelvan los bytes por hash.
if let Err(e) = ente_cas::store(&buf) {
// No-fatal: si CAS no escribe, cacheamos el embedding igual.
warn!(error = %e, "ente_cas::store falló (no-fatal)");
}
let text = String::from_utf8_lossy(&buf).to_string();
let v = backend.embed_one(&text)?;
if let Some(cache) = cache {
cache.put(&file_sha, model_id, &v);
}
let resp = EmbedResponse {
embedding: v,
model: super::model_id().to_string(),
model: model_id.to_string(),
elapsed_ms: started.elapsed().as_millis() as u64,
};
serde_json::to_string(&resp).map_err(|e| format!("encode: {e}"))