From 6596c8127173465026fd24de9e53140a80d5dd44 Mon Sep 17 00:00:00 2001 From: sergio Date: Mon, 11 May 2026 17:16:11 +0000 Subject: [PATCH] feat(shipote): audit log persistente + HTTP gateway (fase S) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Daemon escribe append-only a $XDG_STATE_HOME/shipote/audit.log además del tracing. Single-line: ts= uid= action= . Rotación simple a .log.1 al pasar 1 MiB. - shipote-gateway: TCP listener 127.0.0.1:7378 default. POST /rpc traduce JSON ↔ postcard contra daemon socket. GET / health text. HTTP parser ad-hoc (~70 LOC), sin dep de hyper/axum. Sin auth — bind a localhost + SHIPOTE_TRUST_ANYONE=1 en prod. E2E: curl --noproxy '*' POST /rpc → "Pong", Health JSON, Capabilities JSON. Audit log persiste mutaciones con uid del peer. 85 tests pasan (features nuevos son binarios, no library mods). Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 12 ++ Cargo.toml | 1 + crates/apps/shipote-daemon/src/main.rs | 49 ++++++ crates/apps/shipote-gateway/Cargo.toml | 21 +++ crates/apps/shipote-gateway/src/main.rs | 168 ++++++++++++++++++++ crates/modules/shipote/docs/ARCHITECTURE.md | 1 + 6 files changed, 252 insertions(+) create mode 100644 crates/apps/shipote-gateway/Cargo.toml create mode 100644 crates/apps/shipote-gateway/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 1d597cc..a0f255d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9661,6 +9661,18 @@ dependencies = [ "toml 0.8.23", ] +[[package]] +name = "shipote-gateway" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde_json", + "shipote-protocol", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "shipote-protocol" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 11e2a1d..454787c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ members = [ "crates/apps/brahman-demo", "crates/apps/shipote-daemon", "crates/apps/shipote-cli", + "crates/apps/shipote-gateway", "crates/apps/shipote-shell", ] diff --git a/crates/apps/shipote-daemon/src/main.rs b/crates/apps/shipote-daemon/src/main.rs index 5709b64..d4fa0b6 100644 --- a/crates/apps/shipote-daemon/src/main.rs +++ b/crates/apps/shipote-daemon/src/main.rs @@ -297,6 +297,44 @@ async fn handle_client( } } +/// Path canónico del audit log: `$XDG_STATE_HOME/shipote/audit.log` o +/// fallback `$HOME/.local/state/shipote/audit.log`. +fn default_audit_log_path() -> std::path::PathBuf { + if let Ok(state) = std::env::var("XDG_STATE_HOME") { + return std::path::PathBuf::from(state).join("shipote/audit.log"); + } + if let Ok(home) = std::env::var("HOME") { + return std::path::PathBuf::from(home).join(".local/state/shipote/audit.log"); + } + std::path::PathBuf::from("/tmp/shipote-audit.log") +} + +/// Cap del audit log antes de rotar a `audit.log.1`. 1 MiB. +const AUDIT_LOG_MAX_BYTES: u64 = 1 << 20; + +/// Append + rotate (mueve a `.1` si supera el cap). Append-only, sin +/// reordenar. Sync: cada line, fsync no — el log es defensive, no +/// transactional. +fn append_audit_line(path: &std::path::Path, line: &str) -> std::io::Result<()> { + use std::io::Write; + // Rotar si pasa el cap. + if let Ok(meta) = std::fs::metadata(path) { + if meta.len() >= AUDIT_LOG_MAX_BYTES { + let rotated = path.with_extension("log.1"); + let _ = std::fs::rename(path, &rotated); + } + } + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let mut f = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + writeln!(f, "{line}")?; + Ok(()) +} + /// Loguea cada mutación con target="audit" y el peer uid. Reads (ping, /// list, stats) se omiten para no inundar el log. fn audit_request(peer_uid: u32, req: &Request) { @@ -330,6 +368,17 @@ fn audit_request(peer_uid: u32, req: &Request) { | Request::Capabilities => return, }; info!(target: "audit", uid = peer_uid, action, detail = %detail, "audit"); + // Append a file. Failure no es fatal — sólo se pierde la entry. + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(0); + let line = format!("ts={ts} uid={peer_uid} action={action} {detail}"); + let path = default_audit_log_path(); + if let Err(e) = append_audit_line(&path, &line) { + // Sólo loguear si el filesystem está roto. No reportar al cliente. + tracing::debug!(?e, path = %path.display(), "audit file write failed"); + } } async fn dispatch( diff --git a/crates/apps/shipote-gateway/Cargo.toml b/crates/apps/shipote-gateway/Cargo.toml new file mode 100644 index 0000000..8a10a91 --- /dev/null +++ b/crates/apps/shipote-gateway/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "shipote-gateway" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "HTTP/JSON gateway para shipote — traduce JSON ↔ postcard contra el admin socket." + +[[bin]] +name = "shipote-gateway" +path = "src/main.rs" + +[dependencies] +shipote-protocol = { path = "../../modules/shipote/shipote-protocol" } +anyhow = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/crates/apps/shipote-gateway/src/main.rs b/crates/apps/shipote-gateway/src/main.rs new file mode 100644 index 0000000..77a00d8 --- /dev/null +++ b/crates/apps/shipote-gateway/src/main.rs @@ -0,0 +1,168 @@ +//! `shipote-gateway` — HTTP/JSON adapter para el daemon. +//! +//! Acepta `POST /rpc` con body JSON serializado como `shipote_protocol::Request`, +//! hace round-trip al admin socket via postcard, devuelve `Response` como JSON. +//! +//! Diseñado para clients no-Rust (curl, Python, web app) que no pueden +//! hablar postcard directo. NO es un proxy completo — sólo translation +//! layer del protocolo. +//! +//! Sin dep de axum/hyper: HTTP parser ad-hoc, suficiente para 1 endpoint. + +use shipote_protocol::{default_socket_path, read_frame, write_frame, Request, Response}; +use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream, UnixStream}; +use tracing::{info, warn}; + +const DEFAULT_LISTEN: &str = "127.0.0.1:7378"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + init_tracing(); + let listen = std::env::var("SHIPOTE_GATEWAY_LISTEN").unwrap_or_else(|_| DEFAULT_LISTEN.into()); + let daemon_sock = Arc::new(default_socket_path()); + let listener = TcpListener::bind(&listen).await?; + info!(listen = %listen, daemon = %daemon_sock.display(), "shipote-gateway listening"); + + loop { + match listener.accept().await { + Ok((stream, peer)) => { + let sock = daemon_sock.clone(); + tokio::spawn(async move { + if let Err(e) = handle_http(stream, sock).await { + warn!(?e, ?peer, "request error"); + } + }); + } + Err(e) => { + warn!(?e, "accept failed"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + } +} + +async fn handle_http(mut stream: TcpStream, daemon_sock: Arc) -> anyhow::Result<()> { + // Parser HTTP mínimo: read hasta `\r\n\r\n`, parsear request line + + // Content-Length, después leer body exacto. + let mut buf = Vec::with_capacity(4096); + let mut tmp = [0u8; 4096]; + let header_end; + loop { + let n = stream.read(&mut tmp).await?; + if n == 0 { + return Ok(()); // closed + } + buf.extend_from_slice(&tmp[..n]); + if let Some(pos) = find_double_crlf(&buf) { + header_end = pos + 4; + break; + } + if buf.len() > 64 * 1024 { + return write_error(&mut stream, 413, "headers too large").await; + } + } + + let header_str = std::str::from_utf8(&buf[..header_end - 4])?; + let mut lines = header_str.lines(); + let request_line = lines.next().unwrap_or(""); + let mut parts = request_line.split_whitespace(); + let method = parts.next().unwrap_or(""); + let path = parts.next().unwrap_or(""); + let mut content_length: usize = 0; + for line in lines { + if let Some(v) = line.strip_prefix("Content-Length:").or_else(|| line.strip_prefix("content-length:")) { + content_length = v.trim().parse().unwrap_or(0); + } + } + + // Rutas: + if method == "GET" && (path == "/" || path == "/health") { + return write_text(&mut stream, 200, "shipote-gateway ok\n").await; + } + if method != "POST" || path != "/rpc" { + return write_error(&mut stream, 404, "use POST /rpc").await; + } + + // Leer body. + let mut body = buf[header_end..].to_vec(); + while body.len() < content_length { + let n = stream.read(&mut tmp).await?; + if n == 0 { + break; + } + body.extend_from_slice(&tmp[..n]); + } + body.truncate(content_length); + + // Parsear JSON → Request. + let req: Request = match serde_json::from_slice(&body) { + Ok(r) => r, + Err(e) => return write_error(&mut stream, 400, &format!("bad json: {e}")).await, + }; + + // Round-trip al daemon. + let resp = match round_trip_daemon(&daemon_sock, &req).await { + Ok(r) => r, + Err(e) => return write_error(&mut stream, 502, &format!("daemon: {e}")).await, + }; + + // Serializar Response → JSON. + let body_json = serde_json::to_vec(&resp)?; + write_response(&mut stream, 200, "application/json", &body_json).await +} + +async fn round_trip_daemon(sock: &std::path::Path, req: &Request) -> anyhow::Result { + let mut stream = UnixStream::connect(sock).await?; + write_frame(&mut stream, req).await?; + let resp: Response = read_frame(&mut stream).await?; + Ok(resp) +} + +fn find_double_crlf(buf: &[u8]) -> Option { + buf.windows(4).position(|w| w == b"\r\n\r\n") +} + +async fn write_response( + stream: &mut TcpStream, + code: u16, + content_type: &str, + body: &[u8], +) -> anyhow::Result<()> { + let status = match code { + 200 => "OK", + 400 => "Bad Request", + 404 => "Not Found", + 413 => "Payload Too Large", + 502 => "Bad Gateway", + _ => "Unknown", + }; + let head = format!( + "HTTP/1.1 {code} {status}\r\n\ + Content-Type: {content_type}\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n", + body.len() + ); + stream.write_all(head.as_bytes()).await?; + stream.write_all(body).await?; + stream.flush().await?; + Ok(()) +} + +async fn write_text(stream: &mut TcpStream, code: u16, body: &str) -> anyhow::Result<()> { + write_response(stream, code, "text/plain", body.as_bytes()).await +} + +async fn write_error(stream: &mut TcpStream, code: u16, msg: &str) -> anyhow::Result<()> { + let body = serde_json::json!({ "error": msg }).to_string(); + write_response(stream, code, "application/json", body.as_bytes()).await +} + +fn init_tracing() { + use tracing_subscriber::{fmt, EnvFilter}; + let filter = EnvFilter::try_from_env("SHIPOTE_GATEWAY_LOG").unwrap_or_else(|_| EnvFilter::new("info")); + fmt().with_env_filter(filter).init(); +} diff --git a/crates/modules/shipote/docs/ARCHITECTURE.md b/crates/modules/shipote/docs/ARCHITECTURE.md index f4f4781..ced31d7 100644 --- a/crates/modules/shipote/docs/ARCHITECTURE.md +++ b/crates/modules/shipote/docs/ARCHITECTURE.md @@ -67,6 +67,7 @@ - **`crates/apps/shipote-daemon`** — long-running. Escucha admin socket, ejecuta el dispatch, reaper periódico. - **`crates/apps/shipote-cli`** — `shipote` (clap). Para administración + scripting. - **`crates/apps/shipote-shell`** — GUI con `yahweh_launcher`. Dashboard: estado, workspaces, comandos, flows, quotas, sparkline. +- **`crates/apps/shipote-gateway`** — HTTP/JSON adapter. `POST /rpc` con body JSON `Request` → traduce a postcard, round-trip al daemon, retorna JSON. Default `127.0.0.1:7378`. ### Consumidores del discerner (externos a shipote pero usándolo) - **`crates/modules/ui_engine/libs/providers/fs`** — `FileDataProvider` ahora puebla `mime_type` con `DiscernPipeline`.