diff --git a/Cargo.lock b/Cargo.lock index c403b21..1b8d2fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13274,6 +13274,18 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "verbo-daemon" +version = "0.1.0" +dependencies = [ + "async-trait", + "postcard", + "serde", + "tokio", + "verbo-core", + "verbo-mock", +] + [[package]] name = "verbo-mock" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index fa87bfa..4b27245 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ members = [ # ============================================================ "crates/modules/verbo/verbo-core", "crates/modules/verbo/verbo-mock", + "crates/modules/verbo/verbo-daemon", # ============================================================ # modules/nakui/ — ERP matemático (categórico) diff --git a/crates/modules/verbo/verbo-daemon/Cargo.toml b/crates/modules/verbo/verbo-daemon/Cargo.toml new file mode 100644 index 0000000..c636afd --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "verbo-daemon" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "verbo — daemon multi-instancia: sirve cualquier Provider sobre un socket Unix; el DaemonClient lo consume desde otro proceso. Un modelo cargado, N procesos." + +[dependencies] +verbo-core = { path = "../verbo-core" } +async-trait = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +postcard = { workspace = true } + +[dev-dependencies] +verbo-mock = { path = "../verbo-mock" } diff --git a/crates/modules/verbo/verbo-daemon/src/client.rs b/crates/modules/verbo/verbo-daemon/src/client.rs new file mode 100644 index 0000000..39ac897 --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/src/client.rs @@ -0,0 +1,81 @@ +//! El cliente: consume un daemon presentándose como un `Provider`. +//! +//! Un [`DaemonClient`] implementa `verbo_core::Provider`, así que +//! cualquier consumidor (`fana-semantic`, `badu`, `chasqui`) lo usa sin +//! saber que el modelo vive en otro proceso. Cada llamada es un +//! round-trip independiente: sin estado de conexión que reparar. + +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use tokio::net::UnixStream; +use verbo_core::{EmbedError, EmbeddingVector, ModelId, Provider}; + +use crate::wire::{read_frame, write_frame, Request, Response}; + +/// Cliente de un [`crate::Daemon`]. Se comporta como un `Provider` +/// local — los consumidores no notan que el modelo es remoto. +pub struct DaemonClient { + path: PathBuf, + model: ModelId, +} + +impl DaemonClient { + /// Conecta a un daemon y hace el handshake del modelo. El `ModelId` + /// queda cacheado: marca los vectores y nunca cambia en vida del + /// daemon. + pub async fn connect(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + let model = match round_trip(&path, &Request::ModelId).await? { + Response::ModelId(m) => m, + other => return Err(unexpected(other)), + }; + Ok(Self { path, model }) + } +} + +/// Mapea una respuesta fuera de contrato a un `EmbedError`. +fn unexpected(r: Response) -> EmbedError { + match r { + Response::Error(e) => EmbedError::Backend(e), + _ => EmbedError::Backend("respuesta del daemon verbo inesperada".into()), + } +} + +/// Un round-trip completo: conecta, manda el request, lee la respuesta. +async fn round_trip(path: &Path, req: &Request) -> Result { + let mut stream = UnixStream::connect(path) + .await + .map_err(|e| EmbedError::Backend(format!("conexión al daemon verbo: {e}")))?; + write_frame(&mut stream, req) + .await + .map_err(|e| EmbedError::Backend(format!("envío al daemon verbo: {e}")))?; + match read_frame::<_, Response>(&mut stream).await { + Ok(Some(resp)) => Ok(resp), + Ok(None) => Err(EmbedError::Backend( + "el daemon verbo cerró la conexión sin responder".into(), + )), + Err(e) => Err(EmbedError::Backend(format!("lectura del daemon verbo: {e}"))), + } +} + +#[async_trait] +impl Provider for DaemonClient { + fn model_id(&self) -> &ModelId { + &self.model + } + + async fn embed(&self, text: &str) -> Result { + match round_trip(&self.path, &Request::Embed(text.to_string())).await? { + Response::Embed(v) => Ok(v), + other => Err(unexpected(other)), + } + } + + async fn embed_batch(&self, texts: &[String]) -> Result, EmbedError> { + match round_trip(&self.path, &Request::EmbedBatch(texts.to_vec())).await? { + Response::EmbedBatch(v) => Ok(v), + other => Err(unexpected(other)), + } + } +} diff --git a/crates/modules/verbo/verbo-daemon/src/lib.rs b/crates/modules/verbo/verbo-daemon/src/lib.rs new file mode 100644 index 0000000..5178f59 --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/src/lib.rs @@ -0,0 +1,34 @@ +//! `verbo-daemon` — embeddings compartidos entre procesos. +//! +//! El problema: cada proceso que quiera embeddings cargaría su propia +//! copia del modelo (cientos de MB de RAM, descargas duplicadas). La +//! solución: un [`Daemon`] carga el modelo una vez y lo sirve sobre un +//! socket Unix; cada proceso usa un [`DaemonClient`] que, por +//! implementar `verbo_core::Provider`, es indistinguible de un backend +//! local. +//! +//! ```text +//! ┌── proceso A ──┐ ┌── proceso B ──┐ ┌── proceso C ──┐ +//! │ DaemonClient │ │ DaemonClient │ │ DaemonClient │ +//! └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ +//! └───────── socket Unix ─────────────────┘ +//! │ +//! ┌─────────┴─────────┐ +//! │ Daemon (Arc

) │ ← un modelo en RAM +//! └───────────────────┘ +//! ``` +//! +//! **Multi-instancia**: para servir varios modelos a la vez se levanta +//! un daemon por modelo, cada uno en su socket — el daemon es agnóstico +//! del backend (sirve cualquier `Provider`: `verbo-mock`, un backend +//! Cohere, uno BGE local). + +#![forbid(unsafe_code)] + +mod client; +mod server; +mod wire; + +pub use client::DaemonClient; +pub use server::Daemon; +pub use wire::{Request, Response}; diff --git a/crates/modules/verbo/verbo-daemon/src/server.rs b/crates/modules/verbo/verbo-daemon/src/server.rs new file mode 100644 index 0000000..b3c8686 --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/src/server.rs @@ -0,0 +1,84 @@ +//! El daemon: sirve un `Provider` sobre un socket Unix. +//! +//! Un modelo se carga una vez en memoria del daemon; N procesos lo +//! consumen vía [`crate::DaemonClient`]. Para coexistencia multi-modelo +//! se levanta un daemon por modelo, cada uno en su propio socket — +//! convención operativa, no de código. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use tokio::net::{UnixListener, UnixStream}; +use verbo_core::Provider; + +use crate::wire::{read_frame, write_frame, Request, Response}; + +/// Daemon de embeddings ligado a un socket Unix. +pub struct Daemon { + listener: UnixListener, + path: PathBuf, +} + +impl Daemon { + /// Bindea el socket Unix en `path`. Si quedó un socket huérfano de + /// una corrida anterior, se remueve antes de bindear. + pub fn bind(path: impl AsRef) -> std::io::Result { + let path = path.as_ref().to_path_buf(); + let _ = std::fs::remove_file(&path); + let listener = UnixListener::bind(&path)?; + Ok(Self { listener, path }) + } + + /// Ruta del socket que este daemon escucha. + pub fn path(&self) -> &Path { + &self.path + } + + /// Atiende conexiones para siempre, sirviendo `provider`. Cada + /// conexión corre en su propia task; el provider se comparte por + /// `Arc` — un modelo, muchos clientes concurrentes. + pub async fn serve(self, provider: Arc

) -> std::io::Result<()> { + loop { + let (stream, _) = self.listener.accept().await?; + let provider = provider.clone(); + tokio::spawn(async move { + // Una conexión muerta no debe tumbar el daemon. + let _ = handle_conn(stream, provider).await; + }); + } + } +} + +impl Drop for Daemon { + fn drop(&mut self) { + // Sin esto el socket Unix queda como archivo huérfano. + let _ = std::fs::remove_file(&self.path); + } +} + +/// Bucle de una conexión: lee requests hasta EOF, responde cada uno. +async fn handle_conn( + mut stream: UnixStream, + provider: Arc

, +) -> std::io::Result<()> { + while let Some(req) = read_frame::<_, Request>(&mut stream).await? { + let resp = dispatch(&*provider, req).await; + write_frame(&mut stream, &resp).await?; + } + Ok(()) +} + +/// Traduce un `Request` a una llamada al provider y arma el `Response`. +async fn dispatch(provider: &P, req: Request) -> Response { + match req { + Request::ModelId => Response::ModelId(provider.model_id().clone()), + Request::Embed(text) => match provider.embed(&text).await { + Ok(v) => Response::Embed(v), + Err(e) => Response::Error(e.to_string()), + }, + Request::EmbedBatch(texts) => match provider.embed_batch(&texts).await { + Ok(v) => Response::EmbedBatch(v), + Err(e) => Response::Error(e.to_string()), + }, + } +} diff --git a/crates/modules/verbo/verbo-daemon/src/wire.rs b/crates/modules/verbo/verbo-daemon/src/wire.rs new file mode 100644 index 0000000..f2ca4a5 --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/src/wire.rs @@ -0,0 +1,97 @@ +//! Protocolo de cable del daemon — frames postcard con prefijo de largo. +//! +//! Cada mensaje va como `u32` little-endian (largo) + bytes postcard. +//! Es el mismo encuadre que usa el resto de brahman para sockets. + +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::io::{self, ErrorKind}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use verbo_core::{EmbeddingVector, ModelId}; + +/// Tope de tamaño de un frame (8 MiB). Un lote grande de embeddings +/// cabe holgado; cualquier cosa mayor se trata como frame corrupto. +const MAX_FRAME: usize = 8 * 1024 * 1024; + +/// Petición del cliente al daemon. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Request { + /// Handshake: pide la identidad del modelo servido. + ModelId, + /// Embebe un texto. + Embed(String), + /// Embebe un lote en un solo round-trip. + EmbedBatch(Vec), +} + +/// Respuesta del daemon al cliente. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Response { + ModelId(ModelId), + Embed(EmbeddingVector), + EmbedBatch(Vec), + /// El backend falló; el texto es el `Display` del `EmbedError`. + Error(String), +} + +/// Serializa `msg` y lo escribe como frame con prefijo de largo. +pub async fn write_frame(w: &mut W, msg: &T) -> io::Result<()> +where + W: AsyncWrite + Unpin, + T: Serialize, +{ + let bytes = postcard::to_stdvec(msg) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; + if bytes.len() > MAX_FRAME { + return Err(io::Error::new(ErrorKind::InvalidData, "frame demasiado grande")); + } + w.write_all(&(bytes.len() as u32).to_le_bytes()).await?; + w.write_all(&bytes).await?; + w.flush().await?; + Ok(()) +} + +/// Lee un frame y lo deserializa. `Ok(None)` si el peer cerró limpio +/// antes de empezar un frame nuevo (EOF esperado). +pub async fn read_frame(r: &mut R) -> io::Result> +where + R: AsyncRead + Unpin, + T: DeserializeOwned, +{ + let mut len_buf = [0u8; 4]; + match r.read_exact(&mut len_buf).await { + Ok(_) => {} + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e), + } + let len = u32::from_le_bytes(len_buf) as usize; + if len > MAX_FRAME { + return Err(io::Error::new(ErrorKind::InvalidData, "frame demasiado grande")); + } + let mut buf = vec![0u8; len]; + r.read_exact(&mut buf).await?; + let msg = postcard::from_bytes(&buf) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; + Ok(Some(msg)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn frame_roundtrips_through_a_buffer() { + let mut buf: Vec = Vec::new(); + write_frame(&mut buf, &Request::Embed("hola".into())).await.unwrap(); + let mut cursor = std::io::Cursor::new(buf); + let got: Request = read_frame(&mut cursor).await.unwrap().unwrap(); + assert!(matches!(got, Request::Embed(t) if t == "hola")); + } + + #[tokio::test] + async fn empty_stream_reads_as_none() { + let mut cursor = std::io::Cursor::new(Vec::::new()); + let got: Option = read_frame(&mut cursor).await.unwrap(); + assert!(got.is_none()); + } +} diff --git a/crates/modules/verbo/verbo-daemon/tests/roundtrip.rs b/crates/modules/verbo/verbo-daemon/tests/roundtrip.rs new file mode 100644 index 0000000..939b375 --- /dev/null +++ b/crates/modules/verbo/verbo-daemon/tests/roundtrip.rs @@ -0,0 +1,103 @@ +//! Pruebas de integración: un daemon real sobre socket Unix + clientes. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use verbo_core::Provider; +use verbo_daemon::{Daemon, DaemonClient}; +use verbo_mock::MockProvider; + +/// Ruta de socket única por test — evita choques entre tests paralelos. +fn unique_socket() -> std::path::PathBuf { + static N: AtomicU32 = AtomicU32::new(0); + let n = N.fetch_add(1, Ordering::Relaxed); + std::env::temp_dir().join(format!("verbo-d-{}-{n}.sock", std::process::id())) +} + +/// Levanta un daemon sirviendo un `MockProvider` y devuelve su ruta + el +/// handle de la task (para abortarla al final). +fn spawn_daemon(dim: usize) -> (std::path::PathBuf, tokio::task::JoinHandle<()>) { + let path = unique_socket(); + let daemon = Daemon::bind(&path).expect("bind"); + let provider = Arc::new(MockProvider::new(dim)); + let handle = tokio::spawn(async move { + let _ = daemon.serve(provider).await; + }); + (path, handle) +} + +#[tokio::test] +async fn client_embed_matches_direct_provider() { + let (path, handle) = spawn_daemon(32); + let client = DaemonClient::connect(&path).await.expect("connect"); + + let over_socket = client.embed("texto de prueba").await.unwrap(); + let direct = MockProvider::new(32).embed("texto de prueba").await.unwrap(); + + // El daemon no debe alterar el vector: byte a byte igual al directo. + assert_eq!(over_socket.values, direct.values); + assert_eq!(over_socket.model, direct.model); + + handle.abort(); +} + +#[tokio::test] +async fn handshake_reports_model_id() { + let (path, handle) = spawn_daemon(384); + let client = DaemonClient::connect(&path).await.expect("connect"); + assert_eq!(client.model_id().dimension, 384); + handle.abort(); +} + +#[tokio::test] +async fn batch_over_socket_matches_individual() { + let (path, handle) = spawn_daemon(16); + let client = DaemonClient::connect(&path).await.expect("connect"); + + let texts = vec!["uno".to_string(), "dos".to_string(), "tres".to_string()]; + let batch = client.embed_batch(&texts).await.unwrap(); + assert_eq!(batch.len(), 3); + + let single = client.embed("dos").await.unwrap(); + assert_eq!(batch[1].values, single.values); + + handle.abort(); +} + +#[tokio::test] +async fn many_requests_on_one_client() { + // El cliente hace round-trip por llamada: varias llamadas seguidas + // sobre el mismo cliente deben funcionar sin estado corrupto. + let (path, handle) = spawn_daemon(8); + let client = DaemonClient::connect(&path).await.expect("connect"); + for word in ["a", "bb", "ccc", "a"] { + let v = client.embed(word).await.unwrap(); + assert_eq!(v.values.len(), 8); + } + // Mismo texto → mismo vector incluso tras otras llamadas. + let first = client.embed("a").await.unwrap(); + let again = client.embed("a").await.unwrap(); + assert_eq!(first.values, again.values); + handle.abort(); +} + +#[tokio::test] +async fn two_clients_share_one_daemon() { + let (path, handle) = spawn_daemon(24); + let a = DaemonClient::connect(&path).await.expect("connect a"); + let b = DaemonClient::connect(&path).await.expect("connect b"); + + let va = a.embed("compartido").await.unwrap(); + let vb = b.embed("compartido").await.unwrap(); + // Dos procesos, un modelo: el mismo texto da el mismo vector. + assert_eq!(va.values, vb.values); + + handle.abort(); +} + +#[tokio::test] +async fn connect_to_missing_daemon_errors() { + let path = unique_socket(); // nunca se bindeó + let result = DaemonClient::connect(&path).await; + assert!(result.is_err()); +}