feat(verbo): verbo-daemon — embeddings compartidos entre procesos

Daemon que carga un Provider una vez y lo sirve sobre socket Unix;
DaemonClient lo consume desde otro proceso implementando el trait
Provider (indistinguible de un backend local). Multi-instancia: un
daemon por modelo, cada uno en su socket. Frames postcard con
prefijo de largo. 8 tests (wire + integración real sobre socket).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sergio
2026-05-20 16:25:56 +00:00
parent cbca62f8f1
commit 649ca02d4d
8 changed files with 430 additions and 0 deletions
@@ -0,0 +1,18 @@
[package]
name = "verbo-daemon"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
publish.workspace = true
description = "verbo — daemon multi-instancia: sirve cualquier Provider sobre un socket Unix; el DaemonClient lo consume desde otro proceso. Un modelo cargado, N procesos."
[dependencies]
verbo-core = { path = "../verbo-core" }
async-trait = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
postcard = { workspace = true }
[dev-dependencies]
verbo-mock = { path = "../verbo-mock" }
@@ -0,0 +1,81 @@
//! El cliente: consume un daemon presentándose como un `Provider`.
//!
//! Un [`DaemonClient`] implementa `verbo_core::Provider`, así que
//! cualquier consumidor (`fana-semantic`, `badu`, `chasqui`) lo usa sin
//! saber que el modelo vive en otro proceso. Cada llamada es un
//! round-trip independiente: sin estado de conexión que reparar.
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use tokio::net::UnixStream;
use verbo_core::{EmbedError, EmbeddingVector, ModelId, Provider};
use crate::wire::{read_frame, write_frame, Request, Response};
/// Cliente de un [`crate::Daemon`]. Se comporta como un `Provider`
/// local — los consumidores no notan que el modelo es remoto.
pub struct DaemonClient {
path: PathBuf,
model: ModelId,
}
impl DaemonClient {
/// Conecta a un daemon y hace el handshake del modelo. El `ModelId`
/// queda cacheado: marca los vectores y nunca cambia en vida del
/// daemon.
pub async fn connect(path: impl AsRef<Path>) -> Result<Self, EmbedError> {
let path = path.as_ref().to_path_buf();
let model = match round_trip(&path, &Request::ModelId).await? {
Response::ModelId(m) => m,
other => return Err(unexpected(other)),
};
Ok(Self { path, model })
}
}
/// Mapea una respuesta fuera de contrato a un `EmbedError`.
fn unexpected(r: Response) -> EmbedError {
match r {
Response::Error(e) => EmbedError::Backend(e),
_ => EmbedError::Backend("respuesta del daemon verbo inesperada".into()),
}
}
/// Un round-trip completo: conecta, manda el request, lee la respuesta.
async fn round_trip(path: &Path, req: &Request) -> Result<Response, EmbedError> {
let mut stream = UnixStream::connect(path)
.await
.map_err(|e| EmbedError::Backend(format!("conexión al daemon verbo: {e}")))?;
write_frame(&mut stream, req)
.await
.map_err(|e| EmbedError::Backend(format!("envío al daemon verbo: {e}")))?;
match read_frame::<_, Response>(&mut stream).await {
Ok(Some(resp)) => Ok(resp),
Ok(None) => Err(EmbedError::Backend(
"el daemon verbo cerró la conexión sin responder".into(),
)),
Err(e) => Err(EmbedError::Backend(format!("lectura del daemon verbo: {e}"))),
}
}
#[async_trait]
impl Provider for DaemonClient {
fn model_id(&self) -> &ModelId {
&self.model
}
async fn embed(&self, text: &str) -> Result<EmbeddingVector, EmbedError> {
match round_trip(&self.path, &Request::Embed(text.to_string())).await? {
Response::Embed(v) => Ok(v),
other => Err(unexpected(other)),
}
}
async fn embed_batch(&self, texts: &[String]) -> Result<Vec<EmbeddingVector>, EmbedError> {
match round_trip(&self.path, &Request::EmbedBatch(texts.to_vec())).await? {
Response::EmbedBatch(v) => Ok(v),
other => Err(unexpected(other)),
}
}
}
@@ -0,0 +1,34 @@
//! `verbo-daemon` — embeddings compartidos entre procesos.
//!
//! El problema: cada proceso que quiera embeddings cargaría su propia
//! copia del modelo (cientos de MB de RAM, descargas duplicadas). La
//! solución: un [`Daemon`] carga el modelo una vez y lo sirve sobre un
//! socket Unix; cada proceso usa un [`DaemonClient`] que, por
//! implementar `verbo_core::Provider`, es indistinguible de un backend
//! local.
//!
//! ```text
//! ┌── proceso A ──┐ ┌── proceso B ──┐ ┌── proceso C ──┐
//! │ DaemonClient │ │ DaemonClient │ │ DaemonClient │
//! └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
//! └───────── socket Unix ─────────────────┘
//! │
//! ┌─────────┴─────────┐
//! │ Daemon (Arc<P>) │ ← un modelo en RAM
//! └───────────────────┘
//! ```
//!
//! **Multi-instancia**: para servir varios modelos a la vez se levanta
//! un daemon por modelo, cada uno en su socket — el daemon es agnóstico
//! del backend (sirve cualquier `Provider`: `verbo-mock`, un backend
//! Cohere, uno BGE local).
#![forbid(unsafe_code)]
mod client;
mod server;
mod wire;
pub use client::DaemonClient;
pub use server::Daemon;
pub use wire::{Request, Response};
@@ -0,0 +1,84 @@
//! El daemon: sirve un `Provider` sobre un socket Unix.
//!
//! Un modelo se carga una vez en memoria del daemon; N procesos lo
//! consumen vía [`crate::DaemonClient`]. Para coexistencia multi-modelo
//! se levanta un daemon por modelo, cada uno en su propio socket —
//! convención operativa, no de código.
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use verbo_core::Provider;
use crate::wire::{read_frame, write_frame, Request, Response};
/// Daemon de embeddings ligado a un socket Unix.
pub struct Daemon {
listener: UnixListener,
path: PathBuf,
}
impl Daemon {
/// Bindea el socket Unix en `path`. Si quedó un socket huérfano de
/// una corrida anterior, se remueve antes de bindear.
pub fn bind(path: impl AsRef<Path>) -> std::io::Result<Self> {
let path = path.as_ref().to_path_buf();
let _ = std::fs::remove_file(&path);
let listener = UnixListener::bind(&path)?;
Ok(Self { listener, path })
}
/// Ruta del socket que este daemon escucha.
pub fn path(&self) -> &Path {
&self.path
}
/// Atiende conexiones para siempre, sirviendo `provider`. Cada
/// conexión corre en su propia task; el provider se comparte por
/// `Arc` — un modelo, muchos clientes concurrentes.
pub async fn serve<P: Provider + 'static>(self, provider: Arc<P>) -> std::io::Result<()> {
loop {
let (stream, _) = self.listener.accept().await?;
let provider = provider.clone();
tokio::spawn(async move {
// Una conexión muerta no debe tumbar el daemon.
let _ = handle_conn(stream, provider).await;
});
}
}
}
impl Drop for Daemon {
fn drop(&mut self) {
// Sin esto el socket Unix queda como archivo huérfano.
let _ = std::fs::remove_file(&self.path);
}
}
/// Bucle de una conexión: lee requests hasta EOF, responde cada uno.
async fn handle_conn<P: Provider>(
mut stream: UnixStream,
provider: Arc<P>,
) -> std::io::Result<()> {
while let Some(req) = read_frame::<_, Request>(&mut stream).await? {
let resp = dispatch(&*provider, req).await;
write_frame(&mut stream, &resp).await?;
}
Ok(())
}
/// Traduce un `Request` a una llamada al provider y arma el `Response`.
async fn dispatch<P: Provider>(provider: &P, req: Request) -> Response {
match req {
Request::ModelId => Response::ModelId(provider.model_id().clone()),
Request::Embed(text) => match provider.embed(&text).await {
Ok(v) => Response::Embed(v),
Err(e) => Response::Error(e.to_string()),
},
Request::EmbedBatch(texts) => match provider.embed_batch(&texts).await {
Ok(v) => Response::EmbedBatch(v),
Err(e) => Response::Error(e.to_string()),
},
}
}
@@ -0,0 +1,97 @@
//! Protocolo de cable del daemon — frames postcard con prefijo de largo.
//!
//! Cada mensaje va como `u32` little-endian (largo) + bytes postcard.
//! Es el mismo encuadre que usa el resto de brahman para sockets.
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::io::{self, ErrorKind};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use verbo_core::{EmbeddingVector, ModelId};
/// Tope de tamaño de un frame (8 MiB). Un lote grande de embeddings
/// cabe holgado; cualquier cosa mayor se trata como frame corrupto.
const MAX_FRAME: usize = 8 * 1024 * 1024;
/// Petición del cliente al daemon.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Request {
/// Handshake: pide la identidad del modelo servido.
ModelId,
/// Embebe un texto.
Embed(String),
/// Embebe un lote en un solo round-trip.
EmbedBatch(Vec<String>),
}
/// Respuesta del daemon al cliente.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
ModelId(ModelId),
Embed(EmbeddingVector),
EmbedBatch(Vec<EmbeddingVector>),
/// El backend falló; el texto es el `Display` del `EmbedError`.
Error(String),
}
/// Serializa `msg` y lo escribe como frame con prefijo de largo.
pub async fn write_frame<W, T>(w: &mut W, msg: &T) -> io::Result<()>
where
W: AsyncWrite + Unpin,
T: Serialize,
{
let bytes = postcard::to_stdvec(msg)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
if bytes.len() > MAX_FRAME {
return Err(io::Error::new(ErrorKind::InvalidData, "frame demasiado grande"));
}
w.write_all(&(bytes.len() as u32).to_le_bytes()).await?;
w.write_all(&bytes).await?;
w.flush().await?;
Ok(())
}
/// Lee un frame y lo deserializa. `Ok(None)` si el peer cerró limpio
/// antes de empezar un frame nuevo (EOF esperado).
pub async fn read_frame<R, T>(r: &mut R) -> io::Result<Option<T>>
where
R: AsyncRead + Unpin,
T: DeserializeOwned,
{
let mut len_buf = [0u8; 4];
match r.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
}
let len = u32::from_le_bytes(len_buf) as usize;
if len > MAX_FRAME {
return Err(io::Error::new(ErrorKind::InvalidData, "frame demasiado grande"));
}
let mut buf = vec![0u8; len];
r.read_exact(&mut buf).await?;
let msg = postcard::from_bytes(&buf)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
Ok(Some(msg))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn frame_roundtrips_through_a_buffer() {
let mut buf: Vec<u8> = Vec::new();
write_frame(&mut buf, &Request::Embed("hola".into())).await.unwrap();
let mut cursor = std::io::Cursor::new(buf);
let got: Request = read_frame(&mut cursor).await.unwrap().unwrap();
assert!(matches!(got, Request::Embed(t) if t == "hola"));
}
#[tokio::test]
async fn empty_stream_reads_as_none() {
let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
let got: Option<Request> = read_frame(&mut cursor).await.unwrap();
assert!(got.is_none());
}
}
@@ -0,0 +1,103 @@
//! Pruebas de integración: un daemon real sobre socket Unix + clientes.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use verbo_core::Provider;
use verbo_daemon::{Daemon, DaemonClient};
use verbo_mock::MockProvider;
/// Ruta de socket única por test — evita choques entre tests paralelos.
fn unique_socket() -> std::path::PathBuf {
static N: AtomicU32 = AtomicU32::new(0);
let n = N.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("verbo-d-{}-{n}.sock", std::process::id()))
}
/// Levanta un daemon sirviendo un `MockProvider` y devuelve su ruta + el
/// handle de la task (para abortarla al final).
fn spawn_daemon(dim: usize) -> (std::path::PathBuf, tokio::task::JoinHandle<()>) {
let path = unique_socket();
let daemon = Daemon::bind(&path).expect("bind");
let provider = Arc::new(MockProvider::new(dim));
let handle = tokio::spawn(async move {
let _ = daemon.serve(provider).await;
});
(path, handle)
}
#[tokio::test]
async fn client_embed_matches_direct_provider() {
let (path, handle) = spawn_daemon(32);
let client = DaemonClient::connect(&path).await.expect("connect");
let over_socket = client.embed("texto de prueba").await.unwrap();
let direct = MockProvider::new(32).embed("texto de prueba").await.unwrap();
// El daemon no debe alterar el vector: byte a byte igual al directo.
assert_eq!(over_socket.values, direct.values);
assert_eq!(over_socket.model, direct.model);
handle.abort();
}
#[tokio::test]
async fn handshake_reports_model_id() {
let (path, handle) = spawn_daemon(384);
let client = DaemonClient::connect(&path).await.expect("connect");
assert_eq!(client.model_id().dimension, 384);
handle.abort();
}
#[tokio::test]
async fn batch_over_socket_matches_individual() {
let (path, handle) = spawn_daemon(16);
let client = DaemonClient::connect(&path).await.expect("connect");
let texts = vec!["uno".to_string(), "dos".to_string(), "tres".to_string()];
let batch = client.embed_batch(&texts).await.unwrap();
assert_eq!(batch.len(), 3);
let single = client.embed("dos").await.unwrap();
assert_eq!(batch[1].values, single.values);
handle.abort();
}
#[tokio::test]
async fn many_requests_on_one_client() {
// El cliente hace round-trip por llamada: varias llamadas seguidas
// sobre el mismo cliente deben funcionar sin estado corrupto.
let (path, handle) = spawn_daemon(8);
let client = DaemonClient::connect(&path).await.expect("connect");
for word in ["a", "bb", "ccc", "a"] {
let v = client.embed(word).await.unwrap();
assert_eq!(v.values.len(), 8);
}
// Mismo texto → mismo vector incluso tras otras llamadas.
let first = client.embed("a").await.unwrap();
let again = client.embed("a").await.unwrap();
assert_eq!(first.values, again.values);
handle.abort();
}
#[tokio::test]
async fn two_clients_share_one_daemon() {
let (path, handle) = spawn_daemon(24);
let a = DaemonClient::connect(&path).await.expect("connect a");
let b = DaemonClient::connect(&path).await.expect("connect b");
let va = a.embed("compartido").await.unwrap();
let vb = b.embed("compartido").await.unwrap();
// Dos procesos, un modelo: el mismo texto da el mismo vector.
assert_eq!(va.values, vb.values);
handle.abort();
}
#[tokio::test]
async fn connect_to_missing_daemon_errors() {
let path = unique_socket(); // nunca se bindeó
let result = DaemonClient::connect(&path).await;
assert!(result.is_err());
}