feat(sandokan-daemon): B1.3 — DaemonEngine + protocolo wire
DaemonEngine: implementación del trait Engine que delega a otro proceso vía Unix socket. Materializa el patrón horizontal de sandokan (el binario que arranca primero expone el engine; los demás se le suman). - protocol.rs — DaemonRequest/DaemonResponse (espejan los métodos de Engine) + framing postcard length-prefixed (u32 LE + bytes), con MAX_FRAME 16 MiB defensivo. - client.rs — DaemonEngine: stateless, un round-trip por llamada; is_reachable() para el probe de auto(). - server.rs — serve(engine, socket): envuelve cualquier Engine, una task por conexión, multi-request por conexión. EngineError ahora es Serialize/Deserialize (viaja por el wire); NotFound se propaga tipado a través del socket. 1 test de integración: roundtrip real DaemonEngine ↔ serve ↔ LocalEngine (list vacío + NotFound propagado). cargo check --workspace verde. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Generated
+16
@@ -10139,6 +10139,22 @@ dependencies = [
|
|||||||
"ulid",
|
"ulid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sandokan-daemon"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"postcard",
|
||||||
|
"sandokan-core",
|
||||||
|
"sandokan-lifecycle",
|
||||||
|
"sandokan-local",
|
||||||
|
"serde",
|
||||||
|
"tempfile",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"ulid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sandokan-lifecycle"
|
name = "sandokan-lifecycle"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ members = [
|
|||||||
"crates/runtime/sandokan-lifecycle",
|
"crates/runtime/sandokan-lifecycle",
|
||||||
"crates/runtime/sandokan-core",
|
"crates/runtime/sandokan-core",
|
||||||
"crates/runtime/sandokan-local",
|
"crates/runtime/sandokan-local",
|
||||||
|
"crates/runtime/sandokan-daemon",
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# compat/ — Shims D-Bus para correr software systemd-aware
|
# compat/ — Shims D-Bus para correr software systemd-aware
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
//! Errores del orquestador.
|
//! Errores del orquestador.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
|
|
||||||
/// Falla de una operación del `Engine`. Las impls concretas mapean sus
|
/// Falla de una operación del `Engine`. Las impls concretas mapean sus
|
||||||
/// errores internos (encarnación, IPC, SSH) a estas variantes.
|
/// errores internos (encarnación, IPC, SSH) a estas variantes.
|
||||||
#[derive(Debug, thiserror::Error)]
|
///
|
||||||
|
/// Es `Serialize`/`Deserialize` porque viaja por el wire del
|
||||||
|
/// `DaemonEngine` (postcard sobre Unix socket).
|
||||||
|
#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
|
||||||
pub enum EngineError {
|
pub enum EngineError {
|
||||||
/// No existe ninguna entidad activa con ese `card_id`.
|
/// No existe ninguna entidad activa con ese `card_id`.
|
||||||
#[error("card `{0}` no encontrada")]
|
#[error("card `{0}` no encontrada")]
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
[package]
|
||||||
|
name = "sandokan-daemon"
|
||||||
|
description = "DaemonEngine: orquestador sandokan delegado a otro proceso vía Unix socket (protocolo postcard length-prefixed) + loop servidor."
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
authors.workspace = true
|
||||||
|
publish.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sandokan-core = { path = "../sandokan-core" }
|
||||||
|
sandokan-lifecycle = { path = "../sandokan-lifecycle" }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
postcard = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
ulid = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
sandokan-local = { path = "../sandokan-local" }
|
||||||
|
tempfile = { workspace = true }
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
//! `DaemonEngine` — cliente que implementa `Engine` sobre el wire.
|
||||||
|
|
||||||
|
use crate::protocol::{read_frame, write_frame, DaemonRequest, DaemonResponse};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use sandokan_core::{Engine, EngineError, ExecHandle, Intent, TelemetryFrame};
|
||||||
|
use sandokan_lifecycle::LifecycleState;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
use ulid::Ulid;
|
||||||
|
|
||||||
|
/// Engine que delega cada operación a un daemon vía Unix socket.
|
||||||
|
///
|
||||||
|
/// Stateless: cada llamada abre una conexión, hace un round-trip y la
|
||||||
|
/// cierra. Simple y robusto; si el daemon no está, las llamadas fallan
|
||||||
|
/// con `EngineError::Transport`.
|
||||||
|
pub struct DaemonEngine {
|
||||||
|
socket_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DaemonEngine {
|
||||||
|
/// Crea un cliente apuntando al socket dado.
|
||||||
|
pub fn new(socket_path: impl Into<PathBuf>) -> Self {
|
||||||
|
Self { socket_path: socket_path.into() }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `true` si el socket existe y acepta conexiones ahora mismo.
|
||||||
|
pub async fn is_reachable(&self) -> bool {
|
||||||
|
UnixStream::connect(&self.socket_path).await.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Un round-trip: conecta, envía el request, lee el response.
|
||||||
|
async fn roundtrip(&self, req: DaemonRequest) -> Result<DaemonResponse, EngineError> {
|
||||||
|
let mut stream = UnixStream::connect(&self.socket_path)
|
||||||
|
.await
|
||||||
|
.map_err(|e| EngineError::Transport(format!("connect: {e}")))?;
|
||||||
|
write_frame(&mut stream, &req)
|
||||||
|
.await
|
||||||
|
.map_err(|e| EngineError::Transport(format!("send: {e}")))?;
|
||||||
|
read_frame::<_, DaemonResponse>(&mut stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| EngineError::Transport(format!("recv: {e}")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Un response que no corresponde al request enviado.
|
||||||
|
fn mismatch() -> EngineError {
|
||||||
|
EngineError::Transport("respuesta del daemon no coincide con el request".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Engine for DaemonEngine {
|
||||||
|
async fn run(&self, intent: Intent) -> Result<ExecHandle, EngineError> {
|
||||||
|
match self.roundtrip(DaemonRequest::Run(intent)).await? {
|
||||||
|
DaemonResponse::Ran(h) => Ok(h),
|
||||||
|
DaemonResponse::Err(e) => Err(e),
|
||||||
|
_ => Err(mismatch()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop(&self, card_id: Ulid, grace: Duration) -> Result<(), EngineError> {
|
||||||
|
let req = DaemonRequest::Stop {
|
||||||
|
card_id,
|
||||||
|
grace_ms: grace.as_millis() as u64,
|
||||||
|
};
|
||||||
|
match self.roundtrip(req).await? {
|
||||||
|
DaemonResponse::Stopped => Ok(()),
|
||||||
|
DaemonResponse::Err(e) => Err(e),
|
||||||
|
_ => Err(mismatch()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list(&self) -> Result<Vec<ExecHandle>, EngineError> {
|
||||||
|
match self.roundtrip(DaemonRequest::List).await? {
|
||||||
|
DaemonResponse::Listed(v) => Ok(v),
|
||||||
|
DaemonResponse::Err(e) => Err(e),
|
||||||
|
_ => Err(mismatch()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn status(&self, card_id: Ulid) -> Result<LifecycleState, EngineError> {
|
||||||
|
match self.roundtrip(DaemonRequest::Status { card_id }).await? {
|
||||||
|
DaemonResponse::Status(s) => Ok(s),
|
||||||
|
DaemonResponse::Err(e) => Err(e),
|
||||||
|
_ => Err(mismatch()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn telemetry(&self, card_id: Ulid) -> Result<TelemetryFrame, EngineError> {
|
||||||
|
match self.roundtrip(DaemonRequest::Telemetry { card_id }).await? {
|
||||||
|
DaemonResponse::Telemetry(t) => Ok(t),
|
||||||
|
DaemonResponse::Err(e) => Err(e),
|
||||||
|
_ => Err(mismatch()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
//! sandokan-daemon — `DaemonEngine` + loop servidor.
|
||||||
|
//!
|
||||||
|
//! Permite que el orquestador corra en un proceso y otros lo consuman
|
||||||
|
//! sin reimplementar la lógica: el `DaemonEngine` (cliente) implementa
|
||||||
|
//! el trait [`sandokan_core::Engine`] enviando requests postcard
|
||||||
|
//! length-prefixed sobre un Unix socket; [`serve`] corre el lado
|
||||||
|
//! servidor envolviendo cualquier `Engine` (típicamente un `LocalEngine`).
|
||||||
|
//!
|
||||||
|
//! Es la pieza que materializa el patrón horizontal de sandokan: el
|
||||||
|
//! primer binario que arranca gana el socket y expone el engine; los
|
||||||
|
//! demás se le suman como `DaemonEngine`.
|
||||||
|
|
||||||
|
mod client;
|
||||||
|
mod protocol;
|
||||||
|
mod server;
|
||||||
|
|
||||||
|
pub use client::DaemonEngine;
|
||||||
|
pub use protocol::{DaemonRequest, DaemonResponse};
|
||||||
|
pub use server::serve;
|
||||||
@@ -0,0 +1,76 @@
|
|||||||
|
//! Protocolo wire del daemon: requests/responses + framing.
|
||||||
|
//!
|
||||||
|
//! Encoding: postcard. Framing: prefijo de longitud `u32` little-endian
|
||||||
|
//! seguido de los bytes postcard. Mismo patrón que el wire de shuma.
|
||||||
|
|
||||||
|
use sandokan_core::{EngineError, ExecHandle, Intent, TelemetryFrame};
|
||||||
|
use sandokan_lifecycle::LifecycleState;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use ulid::Ulid;
|
||||||
|
|
||||||
|
/// Request del cliente al daemon. Espeja los métodos de `Engine`.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum DaemonRequest {
|
||||||
|
Run(Intent),
|
||||||
|
Stop { card_id: Ulid, grace_ms: u64 },
|
||||||
|
List,
|
||||||
|
Status { card_id: Ulid },
|
||||||
|
Telemetry { card_id: Ulid },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Response del daemon al cliente. Una variante por resultado posible.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum DaemonResponse {
|
||||||
|
Ran(ExecHandle),
|
||||||
|
Stopped,
|
||||||
|
Listed(Vec<ExecHandle>),
|
||||||
|
Status(LifecycleState),
|
||||||
|
Telemetry(TelemetryFrame),
|
||||||
|
Err(EngineError),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Límite defensivo de tamaño de frame (16 MiB). Un Intent con una Card
|
||||||
|
/// grande sigue cabiendo; protege contra frames corruptos/maliciosos.
|
||||||
|
pub const MAX_FRAME: u32 = 16 * 1024 * 1024;
|
||||||
|
|
||||||
|
/// Escribe un valor serializable como frame length-prefixed.
|
||||||
|
pub async fn write_frame<W, T>(w: &mut W, value: &T) -> std::io::Result<()>
|
||||||
|
where
|
||||||
|
W: AsyncWriteExt + Unpin,
|
||||||
|
T: Serialize,
|
||||||
|
{
|
||||||
|
let bytes = postcard::to_stdvec(value)
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||||
|
if bytes.len() as u64 > MAX_FRAME as u64 {
|
||||||
|
return Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::InvalidData,
|
||||||
|
"frame excede MAX_FRAME",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
w.write_all(&(bytes.len() as u32).to_le_bytes()).await?;
|
||||||
|
w.write_all(&bytes).await?;
|
||||||
|
w.flush().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lee un frame length-prefixed y lo deserializa.
|
||||||
|
pub async fn read_frame<R, T>(r: &mut R) -> std::io::Result<T>
|
||||||
|
where
|
||||||
|
R: AsyncReadExt + Unpin,
|
||||||
|
T: for<'de> Deserialize<'de>,
|
||||||
|
{
|
||||||
|
let mut len_buf = [0u8; 4];
|
||||||
|
r.read_exact(&mut len_buf).await?;
|
||||||
|
let len = u32::from_le_bytes(len_buf);
|
||||||
|
if len > MAX_FRAME {
|
||||||
|
return Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::InvalidData,
|
||||||
|
"frame entrante excede MAX_FRAME",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let mut buf = vec![0u8; len as usize];
|
||||||
|
r.read_exact(&mut buf).await?;
|
||||||
|
postcard::from_bytes(&buf)
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
|
||||||
|
}
|
||||||
@@ -0,0 +1,123 @@
|
|||||||
|
//! Loop servidor: envuelve cualquier `Engine` y lo expone por Unix socket.
|
||||||
|
|
||||||
|
use crate::protocol::{read_frame, write_frame, DaemonRequest, DaemonResponse};
|
||||||
|
use sandokan_core::Engine;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
|
|
||||||
|
/// Sirve `engine` en `socket_path` hasta que el future se cancele.
|
||||||
|
///
|
||||||
|
/// Si el socket ya existe (daemon previo que no limpió), se borra antes
|
||||||
|
/// de bind. Cada conexión se atiende en su propia task; una conexión
|
||||||
|
/// puede mandar múltiples requests secuenciales.
|
||||||
|
pub async fn serve<E>(engine: Arc<E>, socket_path: &Path) -> std::io::Result<()>
|
||||||
|
where
|
||||||
|
E: Engine + 'static,
|
||||||
|
{
|
||||||
|
if socket_path.exists() {
|
||||||
|
let _ = std::fs::remove_file(socket_path);
|
||||||
|
}
|
||||||
|
let listener = UnixListener::bind(socket_path)?;
|
||||||
|
tracing::info!(socket = %socket_path.display(), "sandokan-daemon escuchando");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (stream, _addr) = listener.accept().await?;
|
||||||
|
let engine = Arc::clone(&engine);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_conn(stream, engine).await {
|
||||||
|
tracing::debug!(error = %e, "conexión terminada");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atiende una conexión: lee requests hasta EOF, responde cada uno.
|
||||||
|
async fn handle_conn<E>(mut stream: UnixStream, engine: Arc<E>) -> std::io::Result<()>
|
||||||
|
where
|
||||||
|
E: Engine,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
let req: DaemonRequest = match read_frame(&mut stream).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
// EOF limpio = el cliente cerró; no es error.
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
let resp = dispatch(&*engine, req).await;
|
||||||
|
write_frame(&mut stream, &resp).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Traduce un request a la llamada `Engine` correspondiente.
|
||||||
|
async fn dispatch<E: Engine>(engine: &E, req: DaemonRequest) -> DaemonResponse {
|
||||||
|
match req {
|
||||||
|
DaemonRequest::Run(intent) => match engine.run(intent).await {
|
||||||
|
Ok(h) => DaemonResponse::Ran(h),
|
||||||
|
Err(e) => DaemonResponse::Err(e),
|
||||||
|
},
|
||||||
|
DaemonRequest::Stop { card_id, grace_ms } => {
|
||||||
|
match engine.stop(card_id, Duration::from_millis(grace_ms)).await {
|
||||||
|
Ok(()) => DaemonResponse::Stopped,
|
||||||
|
Err(e) => DaemonResponse::Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DaemonRequest::List => match engine.list().await {
|
||||||
|
Ok(v) => DaemonResponse::Listed(v),
|
||||||
|
Err(e) => DaemonResponse::Err(e),
|
||||||
|
},
|
||||||
|
DaemonRequest::Status { card_id } => match engine.status(card_id).await {
|
||||||
|
Ok(s) => DaemonResponse::Status(s),
|
||||||
|
Err(e) => DaemonResponse::Err(e),
|
||||||
|
},
|
||||||
|
DaemonRequest::Telemetry { card_id } => match engine.telemetry(card_id).await {
|
||||||
|
Ok(t) => DaemonResponse::Telemetry(t),
|
||||||
|
Err(e) => DaemonResponse::Err(e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use sandokan_core::{Engine, EngineError};
|
||||||
|
use sandokan_local::LocalEngine;
|
||||||
|
use ulid::Ulid;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn roundtrip_list_and_notfound() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let sock = dir.path().join("sandokan.sock");
|
||||||
|
|
||||||
|
let engine = Arc::new(LocalEngine::new());
|
||||||
|
let sock_srv = sock.clone();
|
||||||
|
let srv = tokio::spawn(async move {
|
||||||
|
let _ = serve(engine, &sock_srv).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Espera a que el socket esté listo.
|
||||||
|
for _ in 0..50 {
|
||||||
|
if sock.exists() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = crate::DaemonEngine::new(sock.clone());
|
||||||
|
assert!(client.is_reachable().await);
|
||||||
|
|
||||||
|
// list() sobre engine vacío → vacío.
|
||||||
|
let listed = client.list().await.expect("list");
|
||||||
|
assert!(listed.is_empty());
|
||||||
|
|
||||||
|
// status() de un id desconocido → NotFound propagado por el wire.
|
||||||
|
let unknown = Ulid::new();
|
||||||
|
match client.status(unknown).await {
|
||||||
|
Err(EngineError::NotFound(id)) => assert_eq!(id, unknown),
|
||||||
|
other => panic!("esperaba NotFound, fue {other:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
srv.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user