From b7d9d7abd94594597414cb59f27511b3ee01b789 Mon Sep 17 00:00:00 2001 From: sergio Date: Wed, 20 May 2026 14:04:22 +0000 Subject: [PATCH] =?UTF-8?q?feat(sandokan-daemon):=20B1.3=20=E2=80=94=20Dae?= =?UTF-8?q?monEngine=20+=20protocolo=20wire?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DaemonEngine: implementación del trait Engine que delega a otro proceso vía Unix socket. Materializa el patrón horizontal de sandokan (el binario que arranca primero expone el engine; los demás se le suman). - protocol.rs — DaemonRequest/DaemonResponse (espejan los métodos de Engine) + framing postcard length-prefixed (u32 LE + bytes), con MAX_FRAME 16 MiB defensivo. - client.rs — DaemonEngine: stateless, un round-trip por llamada; is_reachable() para el probe de auto(). - server.rs — serve(engine, socket): envuelve cualquier Engine, una task por conexión, multi-request por conexión. EngineError ahora es Serialize/Deserialize (viaja por el wire); NotFound se propaga tipado a través del socket. 1 test de integración: roundtrip real DaemonEngine ↔ serve ↔ LocalEngine (list vacío + NotFound propagado). cargo check --workspace verde. Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 16 +++ Cargo.toml | 1 + crates/runtime/sandokan-core/src/error.rs | 6 +- crates/runtime/sandokan-daemon/Cargo.toml | 22 ++++ crates/runtime/sandokan-daemon/src/client.rs | 96 ++++++++++++++ crates/runtime/sandokan-daemon/src/lib.rs | 19 +++ .../runtime/sandokan-daemon/src/protocol.rs | 76 +++++++++++ crates/runtime/sandokan-daemon/src/server.rs | 123 ++++++++++++++++++ 8 files changed, 358 insertions(+), 1 deletion(-) create mode 100644 crates/runtime/sandokan-daemon/Cargo.toml create mode 100644 crates/runtime/sandokan-daemon/src/client.rs create mode 100644 crates/runtime/sandokan-daemon/src/lib.rs create mode 100644 crates/runtime/sandokan-daemon/src/protocol.rs create mode 100644 crates/runtime/sandokan-daemon/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index fc4eb8f..6a0aeff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10139,6 +10139,22 @@ dependencies = [ "ulid", ] +[[package]] +name = "sandokan-daemon" +version = "0.1.0" +dependencies = [ + "async-trait", + "postcard", + "sandokan-core", + "sandokan-lifecycle", + "sandokan-local", + "serde", + "tempfile", + "tokio", + "tracing", + "ulid", +] + [[package]] name = "sandokan-lifecycle" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index f8c916e..cfb8ffe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "crates/runtime/sandokan-lifecycle", "crates/runtime/sandokan-core", "crates/runtime/sandokan-local", + "crates/runtime/sandokan-daemon", # ============================================================ # compat/ — Shims D-Bus para correr software systemd-aware diff --git a/crates/runtime/sandokan-core/src/error.rs b/crates/runtime/sandokan-core/src/error.rs index 67a51ce..9a81673 100644 --- a/crates/runtime/sandokan-core/src/error.rs +++ b/crates/runtime/sandokan-core/src/error.rs @@ -1,10 +1,14 @@ //! Errores del orquestador. +use serde::{Deserialize, Serialize}; use ulid::Ulid; /// Falla de una operación del `Engine`. Las impls concretas mapean sus /// errores internos (encarnación, IPC, SSH) a estas variantes. -#[derive(Debug, thiserror::Error)] +/// +/// Es `Serialize`/`Deserialize` porque viaja por el wire del +/// `DaemonEngine` (postcard sobre Unix socket). +#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)] pub enum EngineError { /// No existe ninguna entidad activa con ese `card_id`. #[error("card `{0}` no encontrada")] diff --git a/crates/runtime/sandokan-daemon/Cargo.toml b/crates/runtime/sandokan-daemon/Cargo.toml new file mode 100644 index 0000000..2aa3bc5 --- /dev/null +++ b/crates/runtime/sandokan-daemon/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "sandokan-daemon" +description = "DaemonEngine: orquestador sandokan delegado a otro proceso vía Unix socket (protocolo postcard length-prefixed) + loop servidor." +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true + +[dependencies] +sandokan-core = { path = "../sandokan-core" } +sandokan-lifecycle = { path = "../sandokan-lifecycle" } +async-trait = { workspace = true } +tokio = { workspace = true } +postcard = { workspace = true } +serde = { workspace = true } +ulid = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +sandokan-local = { path = "../sandokan-local" } +tempfile = { workspace = true } diff --git a/crates/runtime/sandokan-daemon/src/client.rs b/crates/runtime/sandokan-daemon/src/client.rs new file mode 100644 index 0000000..66d4ca5 --- /dev/null +++ b/crates/runtime/sandokan-daemon/src/client.rs @@ -0,0 +1,96 @@ +//! `DaemonEngine` — cliente que implementa `Engine` sobre el wire. + +use crate::protocol::{read_frame, write_frame, DaemonRequest, DaemonResponse}; +use async_trait::async_trait; +use sandokan_core::{Engine, EngineError, ExecHandle, Intent, TelemetryFrame}; +use sandokan_lifecycle::LifecycleState; +use std::path::PathBuf; +use std::time::Duration; +use tokio::net::UnixStream; +use ulid::Ulid; + +/// Engine que delega cada operación a un daemon vía Unix socket. +/// +/// Stateless: cada llamada abre una conexión, hace un round-trip y la +/// cierra. Simple y robusto; si el daemon no está, las llamadas fallan +/// con `EngineError::Transport`. +pub struct DaemonEngine { + socket_path: PathBuf, +} + +impl DaemonEngine { + /// Crea un cliente apuntando al socket dado. + pub fn new(socket_path: impl Into) -> Self { + Self { socket_path: socket_path.into() } + } + + /// `true` si el socket existe y acepta conexiones ahora mismo. + pub async fn is_reachable(&self) -> bool { + UnixStream::connect(&self.socket_path).await.is_ok() + } + + /// Un round-trip: conecta, envía el request, lee el response. + async fn roundtrip(&self, req: DaemonRequest) -> Result { + let mut stream = UnixStream::connect(&self.socket_path) + .await + .map_err(|e| EngineError::Transport(format!("connect: {e}")))?; + write_frame(&mut stream, &req) + .await + .map_err(|e| EngineError::Transport(format!("send: {e}")))?; + read_frame::<_, DaemonResponse>(&mut stream) + .await + .map_err(|e| EngineError::Transport(format!("recv: {e}"))) + } +} + +/// Un response que no corresponde al request enviado. +fn mismatch() -> EngineError { + EngineError::Transport("respuesta del daemon no coincide con el request".into()) +} + +#[async_trait] +impl Engine for DaemonEngine { + async fn run(&self, intent: Intent) -> Result { + match self.roundtrip(DaemonRequest::Run(intent)).await? { + DaemonResponse::Ran(h) => Ok(h), + DaemonResponse::Err(e) => Err(e), + _ => Err(mismatch()), + } + } + + async fn stop(&self, card_id: Ulid, grace: Duration) -> Result<(), EngineError> { + let req = DaemonRequest::Stop { + card_id, + grace_ms: grace.as_millis() as u64, + }; + match self.roundtrip(req).await? { + DaemonResponse::Stopped => Ok(()), + DaemonResponse::Err(e) => Err(e), + _ => Err(mismatch()), + } + } + + async fn list(&self) -> Result, EngineError> { + match self.roundtrip(DaemonRequest::List).await? { + DaemonResponse::Listed(v) => Ok(v), + DaemonResponse::Err(e) => Err(e), + _ => Err(mismatch()), + } + } + + async fn status(&self, card_id: Ulid) -> Result { + match self.roundtrip(DaemonRequest::Status { card_id }).await? { + DaemonResponse::Status(s) => Ok(s), + DaemonResponse::Err(e) => Err(e), + _ => Err(mismatch()), + } + } + + async fn telemetry(&self, card_id: Ulid) -> Result { + match self.roundtrip(DaemonRequest::Telemetry { card_id }).await? { + DaemonResponse::Telemetry(t) => Ok(t), + DaemonResponse::Err(e) => Err(e), + _ => Err(mismatch()), + } + } +} diff --git a/crates/runtime/sandokan-daemon/src/lib.rs b/crates/runtime/sandokan-daemon/src/lib.rs new file mode 100644 index 0000000..4e0d903 --- /dev/null +++ b/crates/runtime/sandokan-daemon/src/lib.rs @@ -0,0 +1,19 @@ +//! sandokan-daemon — `DaemonEngine` + loop servidor. +//! +//! Permite que el orquestador corra en un proceso y otros lo consuman +//! sin reimplementar la lógica: el `DaemonEngine` (cliente) implementa +//! el trait [`sandokan_core::Engine`] enviando requests postcard +//! length-prefixed sobre un Unix socket; [`serve`] corre el lado +//! servidor envolviendo cualquier `Engine` (típicamente un `LocalEngine`). +//! +//! Es la pieza que materializa el patrón horizontal de sandokan: el +//! primer binario que arranca gana el socket y expone el engine; los +//! demás se le suman como `DaemonEngine`. + +mod client; +mod protocol; +mod server; + +pub use client::DaemonEngine; +pub use protocol::{DaemonRequest, DaemonResponse}; +pub use server::serve; diff --git a/crates/runtime/sandokan-daemon/src/protocol.rs b/crates/runtime/sandokan-daemon/src/protocol.rs new file mode 100644 index 0000000..3ca9955 --- /dev/null +++ b/crates/runtime/sandokan-daemon/src/protocol.rs @@ -0,0 +1,76 @@ +//! Protocolo wire del daemon: requests/responses + framing. +//! +//! Encoding: postcard. Framing: prefijo de longitud `u32` little-endian +//! seguido de los bytes postcard. Mismo patrón que el wire de shuma. + +use sandokan_core::{EngineError, ExecHandle, Intent, TelemetryFrame}; +use sandokan_lifecycle::LifecycleState; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use ulid::Ulid; + +/// Request del cliente al daemon. Espeja los métodos de `Engine`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DaemonRequest { + Run(Intent), + Stop { card_id: Ulid, grace_ms: u64 }, + List, + Status { card_id: Ulid }, + Telemetry { card_id: Ulid }, +} + +/// Response del daemon al cliente. Una variante por resultado posible. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DaemonResponse { + Ran(ExecHandle), + Stopped, + Listed(Vec), + Status(LifecycleState), + Telemetry(TelemetryFrame), + Err(EngineError), +} + +/// Límite defensivo de tamaño de frame (16 MiB). Un Intent con una Card +/// grande sigue cabiendo; protege contra frames corruptos/maliciosos. +pub const MAX_FRAME: u32 = 16 * 1024 * 1024; + +/// Escribe un valor serializable como frame length-prefixed. +pub async fn write_frame(w: &mut W, value: &T) -> std::io::Result<()> +where + W: AsyncWriteExt + Unpin, + T: Serialize, +{ + let bytes = postcard::to_stdvec(value) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + if bytes.len() as u64 > MAX_FRAME as u64 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "frame excede MAX_FRAME", + )); + } + w.write_all(&(bytes.len() as u32).to_le_bytes()).await?; + w.write_all(&bytes).await?; + w.flush().await?; + Ok(()) +} + +/// Lee un frame length-prefixed y lo deserializa. +pub async fn read_frame(r: &mut R) -> std::io::Result +where + R: AsyncReadExt + Unpin, + T: for<'de> Deserialize<'de>, +{ + let mut len_buf = [0u8; 4]; + r.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf); + if len > MAX_FRAME { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "frame entrante excede MAX_FRAME", + )); + } + let mut buf = vec![0u8; len as usize]; + r.read_exact(&mut buf).await?; + postcard::from_bytes(&buf) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} diff --git a/crates/runtime/sandokan-daemon/src/server.rs b/crates/runtime/sandokan-daemon/src/server.rs new file mode 100644 index 0000000..1a00ed0 --- /dev/null +++ b/crates/runtime/sandokan-daemon/src/server.rs @@ -0,0 +1,123 @@ +//! Loop servidor: envuelve cualquier `Engine` y lo expone por Unix socket. + +use crate::protocol::{read_frame, write_frame, DaemonRequest, DaemonResponse}; +use sandokan_core::Engine; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::{UnixListener, UnixStream}; + +/// Sirve `engine` en `socket_path` hasta que el future se cancele. +/// +/// Si el socket ya existe (daemon previo que no limpió), se borra antes +/// de bind. Cada conexión se atiende en su propia task; una conexión +/// puede mandar múltiples requests secuenciales. +pub async fn serve(engine: Arc, socket_path: &Path) -> std::io::Result<()> +where + E: Engine + 'static, +{ + if socket_path.exists() { + let _ = std::fs::remove_file(socket_path); + } + let listener = UnixListener::bind(socket_path)?; + tracing::info!(socket = %socket_path.display(), "sandokan-daemon escuchando"); + + loop { + let (stream, _addr) = listener.accept().await?; + let engine = Arc::clone(&engine); + tokio::spawn(async move { + if let Err(e) = handle_conn(stream, engine).await { + tracing::debug!(error = %e, "conexión terminada"); + } + }); + } +} + +/// Atiende una conexión: lee requests hasta EOF, responde cada uno. +async fn handle_conn(mut stream: UnixStream, engine: Arc) -> std::io::Result<()> +where + E: Engine, +{ + loop { + let req: DaemonRequest = match read_frame(&mut stream).await { + Ok(r) => r, + // EOF limpio = el cliente cerró; no es error. + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()), + Err(e) => return Err(e), + }; + let resp = dispatch(&*engine, req).await; + write_frame(&mut stream, &resp).await?; + } +} + +/// Traduce un request a la llamada `Engine` correspondiente. +async fn dispatch(engine: &E, req: DaemonRequest) -> DaemonResponse { + match req { + DaemonRequest::Run(intent) => match engine.run(intent).await { + Ok(h) => DaemonResponse::Ran(h), + Err(e) => DaemonResponse::Err(e), + }, + DaemonRequest::Stop { card_id, grace_ms } => { + match engine.stop(card_id, Duration::from_millis(grace_ms)).await { + Ok(()) => DaemonResponse::Stopped, + Err(e) => DaemonResponse::Err(e), + } + } + DaemonRequest::List => match engine.list().await { + Ok(v) => DaemonResponse::Listed(v), + Err(e) => DaemonResponse::Err(e), + }, + DaemonRequest::Status { card_id } => match engine.status(card_id).await { + Ok(s) => DaemonResponse::Status(s), + Err(e) => DaemonResponse::Err(e), + }, + DaemonRequest::Telemetry { card_id } => match engine.telemetry(card_id).await { + Ok(t) => DaemonResponse::Telemetry(t), + Err(e) => DaemonResponse::Err(e), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sandokan_core::{Engine, EngineError}; + use sandokan_local::LocalEngine; + use ulid::Ulid; + + #[tokio::test] + async fn roundtrip_list_and_notfound() { + let dir = tempfile::tempdir().unwrap(); + let sock = dir.path().join("sandokan.sock"); + + let engine = Arc::new(LocalEngine::new()); + let sock_srv = sock.clone(); + let srv = tokio::spawn(async move { + let _ = serve(engine, &sock_srv).await; + }); + + // Espera a que el socket esté listo. + for _ in 0..50 { + if sock.exists() { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + + let client = crate::DaemonEngine::new(sock.clone()); + assert!(client.is_reachable().await); + + // list() sobre engine vacío → vacío. + let listed = client.list().await.expect("list"); + assert!(listed.is_empty()); + + // status() de un id desconocido → NotFound propagado por el wire. + let unknown = Ulid::new(); + match client.status(unknown).await { + Err(EngineError::NotFound(id)) => assert_eq!(id, unknown), + other => panic!("esperaba NotFound, fue {other:?}"), + } + + srv.abort(); + } +}