feat(sandokan-remote): B1.4 — RemoteEngine vía SSH socket-forward
Opción B: RemoteEngine orquesta en un host remoto tunelando el wire
del daemon sobre un canal SSH direct-streamlocal hacia el sandokan.sock
remoto. El protocolo es idéntico al de DaemonEngine (postcard
length-prefixed) — sólo cambia el transporte, así que read_frame/
write_frame se reusan tal cual.
- brahman-ssh-multiplex: + SshSession::forward_unix — abre un canal
direct-streamlocal y devuelve su ChannelStream (AsyncRead+AsyncWrite).
- sandokan-daemon: protocol ahora pub, exporta read_frame/write_frame.
- sandokan-remote: RemoteEngine { SshSession + remote_socket }.
connect() o with_session(); cada operación abre un canal nuevo
(multiplexado sobre la conexión maestra).
- sandokan umbrella re-exporta RemoteEngine.
Completa Fase B: sandokan tiene Local + Daemon + Remote + auto().
cargo check --workspace verde. RemoteEngine necesita un host remoto
con `sandokan daemon` para validación runtime (sin unit test).
Opción A (text-parse del CLI por compat) queda pendiente por decisión
del usuario.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "sandokan-remote"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
authors.workspace = true
|
||||
publish.workspace = true
|
||||
description = "sandokan RemoteEngine — orquesta en un host remoto tunelando el wire del daemon sobre un canal SSH direct-streamlocal."
|
||||
|
||||
[dependencies]
|
||||
sandokan-core = { path = "../sandokan-core" }
|
||||
sandokan-daemon = { path = "../sandokan-daemon" }
|
||||
sandokan-lifecycle = { path = "../sandokan-lifecycle" }
|
||||
brahman-ssh-multiplex = { path = "../../protocol/brahman-ssh-multiplex" }
|
||||
async-trait = { workspace = true }
|
||||
ulid = { workspace = true }
|
||||
@@ -0,0 +1,112 @@
|
||||
//! `sandokan-remote` — `RemoteEngine`: orquesta en un host remoto.
|
||||
//!
|
||||
//! Misma técnica que `DaemonEngine` pero el transporte es un canal SSH
|
||||
//! `direct-streamlocal` hacia el `sandokan.sock` del host remoto. El wire
|
||||
//! es idéntico (postcard length-prefixed) — sólo cambia el túnel, así
|
||||
//! que el código de protocolo se reusa tal cual.
|
||||
//!
|
||||
//! Requiere que el host remoto corra `sandokan daemon` escuchando en
|
||||
//! `remote_socket`.
|
||||
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use brahman_ssh_multiplex::{SshConfig, SshSession};
|
||||
use sandokan_core::{Engine, EngineError, ExecHandle, Intent, TelemetryFrame};
|
||||
use sandokan_daemon::{read_frame, write_frame, DaemonRequest, DaemonResponse};
|
||||
use sandokan_lifecycle::LifecycleState;
|
||||
use std::time::Duration;
|
||||
use ulid::Ulid;
|
||||
|
||||
/// Engine que delega a un daemon sandokan en un host remoto, tunelando
|
||||
/// el wire sobre SSH. La sesión SSH maestra se mantiene; cada operación
|
||||
/// abre un canal `direct-streamlocal` nuevo (multiplexado, barato).
|
||||
pub struct RemoteEngine {
|
||||
session: SshSession,
|
||||
remote_socket: String,
|
||||
}
|
||||
|
||||
impl RemoteEngine {
|
||||
/// Conecta por SSH al host y prepara el túnel al socket del daemon.
|
||||
pub async fn connect(
|
||||
ssh: &SshConfig,
|
||||
remote_socket: impl Into<String>,
|
||||
) -> Result<Self, EngineError> {
|
||||
let session = SshSession::connect(ssh)
|
||||
.await
|
||||
.map_err(|e| EngineError::Transport(format!("ssh connect: {e}")))?;
|
||||
Ok(Self { session, remote_socket: remote_socket.into() })
|
||||
}
|
||||
|
||||
/// Construye un `RemoteEngine` sobre una `SshSession` ya establecida
|
||||
/// (permite compartir la conexión maestra con otros consumidores).
|
||||
pub fn with_session(session: SshSession, remote_socket: impl Into<String>) -> Self {
|
||||
Self { session, remote_socket: remote_socket.into() }
|
||||
}
|
||||
|
||||
async fn roundtrip(&self, req: DaemonRequest) -> Result<DaemonResponse, EngineError> {
|
||||
let mut stream = self
|
||||
.session
|
||||
.forward_unix(&self.remote_socket)
|
||||
.await
|
||||
.map_err(|e| EngineError::Transport(format!("ssh forward: {e}")))?;
|
||||
write_frame(&mut stream, &req)
|
||||
.await
|
||||
.map_err(|e| EngineError::Transport(format!("send: {e}")))?;
|
||||
read_frame::<_, DaemonResponse>(&mut stream)
|
||||
.await
|
||||
.map_err(|e| EngineError::Transport(format!("recv: {e}")))
|
||||
}
|
||||
}
|
||||
|
||||
/// Un response que no corresponde al request enviado.
|
||||
fn mismatch() -> EngineError {
|
||||
EngineError::Transport("respuesta remota no coincide con el request".into())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Engine for RemoteEngine {
|
||||
async fn run(&self, intent: Intent) -> Result<ExecHandle, EngineError> {
|
||||
match self.roundtrip(DaemonRequest::Run(intent)).await? {
|
||||
DaemonResponse::Ran(h) => Ok(h),
|
||||
DaemonResponse::Err(e) => Err(e),
|
||||
_ => Err(mismatch()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop(&self, card_id: Ulid, grace: Duration) -> Result<(), EngineError> {
|
||||
let req = DaemonRequest::Stop {
|
||||
card_id,
|
||||
grace_ms: grace.as_millis() as u64,
|
||||
};
|
||||
match self.roundtrip(req).await? {
|
||||
DaemonResponse::Stopped => Ok(()),
|
||||
DaemonResponse::Err(e) => Err(e),
|
||||
_ => Err(mismatch()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<ExecHandle>, EngineError> {
|
||||
match self.roundtrip(DaemonRequest::List).await? {
|
||||
DaemonResponse::Listed(v) => Ok(v),
|
||||
DaemonResponse::Err(e) => Err(e),
|
||||
_ => Err(mismatch()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn status(&self, card_id: Ulid) -> Result<LifecycleState, EngineError> {
|
||||
match self.roundtrip(DaemonRequest::Status { card_id }).await? {
|
||||
DaemonResponse::Status(s) => Ok(s),
|
||||
DaemonResponse::Err(e) => Err(e),
|
||||
_ => Err(mismatch()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn telemetry(&self, card_id: Ulid) -> Result<TelemetryFrame, EngineError> {
|
||||
match self.roundtrip(DaemonRequest::Telemetry { card_id }).await? {
|
||||
DaemonResponse::Telemetry(t) => Ok(t),
|
||||
DaemonResponse::Err(e) => Err(e),
|
||||
_ => Err(mismatch()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user