feat(shipote): audit log persistente + HTTP gateway (fase S)
- Daemon escribe append-only a $XDG_STATE_HOME/shipote/audit.log además del tracing. Single-line: ts=<ms> uid=<peer> action=<verb> <detail>. Rotación simple a .log.1 al pasar 1 MiB. - shipote-gateway: TCP listener 127.0.0.1:7378 default. POST /rpc traduce JSON ↔ postcard contra daemon socket. GET / health text. HTTP parser ad-hoc (~70 LOC), sin dep de hyper/axum. Sin auth — bind a localhost + SHIPOTE_TRUST_ANYONE=1 en prod. E2E: curl --noproxy '*' POST /rpc → "Pong", Health JSON, Capabilities JSON. Audit log persiste mutaciones con uid del peer. 85 tests pasan (features nuevos son binarios, no library mods). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Generated
+12
@@ -9661,6 +9661,18 @@ dependencies = [
|
|||||||
"toml 0.8.23",
|
"toml 0.8.23",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "shipote-gateway"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"serde_json",
|
||||||
|
"shipote-protocol",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shipote-protocol"
|
name = "shipote-protocol"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|||||||
@@ -109,6 +109,7 @@ members = [
|
|||||||
"crates/apps/brahman-demo",
|
"crates/apps/brahman-demo",
|
||||||
"crates/apps/shipote-daemon",
|
"crates/apps/shipote-daemon",
|
||||||
"crates/apps/shipote-cli",
|
"crates/apps/shipote-cli",
|
||||||
|
"crates/apps/shipote-gateway",
|
||||||
"crates/apps/shipote-shell",
|
"crates/apps/shipote-shell",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -297,6 +297,44 @@ async fn handle_client(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Path canónico del audit log: `$XDG_STATE_HOME/shipote/audit.log` o
|
||||||
|
/// fallback `$HOME/.local/state/shipote/audit.log`.
|
||||||
|
fn default_audit_log_path() -> std::path::PathBuf {
|
||||||
|
if let Ok(state) = std::env::var("XDG_STATE_HOME") {
|
||||||
|
return std::path::PathBuf::from(state).join("shipote/audit.log");
|
||||||
|
}
|
||||||
|
if let Ok(home) = std::env::var("HOME") {
|
||||||
|
return std::path::PathBuf::from(home).join(".local/state/shipote/audit.log");
|
||||||
|
}
|
||||||
|
std::path::PathBuf::from("/tmp/shipote-audit.log")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cap del audit log antes de rotar a `audit.log.1`. 1 MiB.
|
||||||
|
const AUDIT_LOG_MAX_BYTES: u64 = 1 << 20;
|
||||||
|
|
||||||
|
/// Append + rotate (mueve a `.1` si supera el cap). Append-only, sin
|
||||||
|
/// reordenar. Sync: cada line, fsync no — el log es defensive, no
|
||||||
|
/// transactional.
|
||||||
|
fn append_audit_line(path: &std::path::Path, line: &str) -> std::io::Result<()> {
|
||||||
|
use std::io::Write;
|
||||||
|
// Rotar si pasa el cap.
|
||||||
|
if let Ok(meta) = std::fs::metadata(path) {
|
||||||
|
if meta.len() >= AUDIT_LOG_MAX_BYTES {
|
||||||
|
let rotated = path.with_extension("log.1");
|
||||||
|
let _ = std::fs::rename(path, &rotated);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(parent) = path.parent() {
|
||||||
|
let _ = std::fs::create_dir_all(parent);
|
||||||
|
}
|
||||||
|
let mut f = std::fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(path)?;
|
||||||
|
writeln!(f, "{line}")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Loguea cada mutación con target="audit" y el peer uid. Reads (ping,
|
/// Loguea cada mutación con target="audit" y el peer uid. Reads (ping,
|
||||||
/// list, stats) se omiten para no inundar el log.
|
/// list, stats) se omiten para no inundar el log.
|
||||||
fn audit_request(peer_uid: u32, req: &Request) {
|
fn audit_request(peer_uid: u32, req: &Request) {
|
||||||
@@ -330,6 +368,17 @@ fn audit_request(peer_uid: u32, req: &Request) {
|
|||||||
| Request::Capabilities => return,
|
| Request::Capabilities => return,
|
||||||
};
|
};
|
||||||
info!(target: "audit", uid = peer_uid, action, detail = %detail, "audit");
|
info!(target: "audit", uid = peer_uid, action, detail = %detail, "audit");
|
||||||
|
// Append a file. Failure no es fatal — sólo se pierde la entry.
|
||||||
|
let ts = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_millis())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let line = format!("ts={ts} uid={peer_uid} action={action} {detail}");
|
||||||
|
let path = default_audit_log_path();
|
||||||
|
if let Err(e) = append_audit_line(&path, &line) {
|
||||||
|
// Sólo loguear si el filesystem está roto. No reportar al cliente.
|
||||||
|
tracing::debug!(?e, path = %path.display(), "audit file write failed");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn dispatch(
|
async fn dispatch(
|
||||||
|
|||||||
@@ -0,0 +1,21 @@
|
|||||||
|
[package]
|
||||||
|
name = "shipote-gateway"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
rust-version.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
authors.workspace = true
|
||||||
|
publish.workspace = true
|
||||||
|
description = "HTTP/JSON gateway para shipote — traduce JSON ↔ postcard contra el admin socket."
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "shipote-gateway"
|
||||||
|
path = "src/main.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
shipote-protocol = { path = "../../modules/shipote/shipote-protocol" }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
tracing-subscriber = { workspace = true }
|
||||||
@@ -0,0 +1,168 @@
|
|||||||
|
//! `shipote-gateway` — HTTP/JSON adapter para el daemon.
|
||||||
|
//!
|
||||||
|
//! Acepta `POST /rpc` con body JSON serializado como `shipote_protocol::Request`,
|
||||||
|
//! hace round-trip al admin socket via postcard, devuelve `Response` como JSON.
|
||||||
|
//!
|
||||||
|
//! Diseñado para clients no-Rust (curl, Python, web app) que no pueden
|
||||||
|
//! hablar postcard directo. NO es un proxy completo — sólo translation
|
||||||
|
//! layer del protocolo.
|
||||||
|
//!
|
||||||
|
//! Sin dep de axum/hyper: HTTP parser ad-hoc, suficiente para 1 endpoint.
|
||||||
|
|
||||||
|
use shipote_protocol::{default_socket_path, read_frame, write_frame, Request, Response};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::{TcpListener, TcpStream, UnixStream};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
const DEFAULT_LISTEN: &str = "127.0.0.1:7378";
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
init_tracing();
|
||||||
|
let listen = std::env::var("SHIPOTE_GATEWAY_LISTEN").unwrap_or_else(|_| DEFAULT_LISTEN.into());
|
||||||
|
let daemon_sock = Arc::new(default_socket_path());
|
||||||
|
let listener = TcpListener::bind(&listen).await?;
|
||||||
|
info!(listen = %listen, daemon = %daemon_sock.display(), "shipote-gateway listening");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((stream, peer)) => {
|
||||||
|
let sock = daemon_sock.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_http(stream, sock).await {
|
||||||
|
warn!(?e, ?peer, "request error");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(?e, "accept failed");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_http(mut stream: TcpStream, daemon_sock: Arc<std::path::PathBuf>) -> anyhow::Result<()> {
|
||||||
|
// Parser HTTP mínimo: read hasta `\r\n\r\n`, parsear request line +
|
||||||
|
// Content-Length, después leer body exacto.
|
||||||
|
let mut buf = Vec::with_capacity(4096);
|
||||||
|
let mut tmp = [0u8; 4096];
|
||||||
|
let header_end;
|
||||||
|
loop {
|
||||||
|
let n = stream.read(&mut tmp).await?;
|
||||||
|
if n == 0 {
|
||||||
|
return Ok(()); // closed
|
||||||
|
}
|
||||||
|
buf.extend_from_slice(&tmp[..n]);
|
||||||
|
if let Some(pos) = find_double_crlf(&buf) {
|
||||||
|
header_end = pos + 4;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if buf.len() > 64 * 1024 {
|
||||||
|
return write_error(&mut stream, 413, "headers too large").await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let header_str = std::str::from_utf8(&buf[..header_end - 4])?;
|
||||||
|
let mut lines = header_str.lines();
|
||||||
|
let request_line = lines.next().unwrap_or("");
|
||||||
|
let mut parts = request_line.split_whitespace();
|
||||||
|
let method = parts.next().unwrap_or("");
|
||||||
|
let path = parts.next().unwrap_or("");
|
||||||
|
let mut content_length: usize = 0;
|
||||||
|
for line in lines {
|
||||||
|
if let Some(v) = line.strip_prefix("Content-Length:").or_else(|| line.strip_prefix("content-length:")) {
|
||||||
|
content_length = v.trim().parse().unwrap_or(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rutas:
|
||||||
|
if method == "GET" && (path == "/" || path == "/health") {
|
||||||
|
return write_text(&mut stream, 200, "shipote-gateway ok\n").await;
|
||||||
|
}
|
||||||
|
if method != "POST" || path != "/rpc" {
|
||||||
|
return write_error(&mut stream, 404, "use POST /rpc").await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Leer body.
|
||||||
|
let mut body = buf[header_end..].to_vec();
|
||||||
|
while body.len() < content_length {
|
||||||
|
let n = stream.read(&mut tmp).await?;
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
body.extend_from_slice(&tmp[..n]);
|
||||||
|
}
|
||||||
|
body.truncate(content_length);
|
||||||
|
|
||||||
|
// Parsear JSON → Request.
|
||||||
|
let req: Request = match serde_json::from_slice(&body) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => return write_error(&mut stream, 400, &format!("bad json: {e}")).await,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Round-trip al daemon.
|
||||||
|
let resp = match round_trip_daemon(&daemon_sock, &req).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => return write_error(&mut stream, 502, &format!("daemon: {e}")).await,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Serializar Response → JSON.
|
||||||
|
let body_json = serde_json::to_vec(&resp)?;
|
||||||
|
write_response(&mut stream, 200, "application/json", &body_json).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn round_trip_daemon(sock: &std::path::Path, req: &Request) -> anyhow::Result<Response> {
|
||||||
|
let mut stream = UnixStream::connect(sock).await?;
|
||||||
|
write_frame(&mut stream, req).await?;
|
||||||
|
let resp: Response = read_frame(&mut stream).await?;
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_double_crlf(buf: &[u8]) -> Option<usize> {
|
||||||
|
buf.windows(4).position(|w| w == b"\r\n\r\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_response(
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
code: u16,
|
||||||
|
content_type: &str,
|
||||||
|
body: &[u8],
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let status = match code {
|
||||||
|
200 => "OK",
|
||||||
|
400 => "Bad Request",
|
||||||
|
404 => "Not Found",
|
||||||
|
413 => "Payload Too Large",
|
||||||
|
502 => "Bad Gateway",
|
||||||
|
_ => "Unknown",
|
||||||
|
};
|
||||||
|
let head = format!(
|
||||||
|
"HTTP/1.1 {code} {status}\r\n\
|
||||||
|
Content-Type: {content_type}\r\n\
|
||||||
|
Content-Length: {}\r\n\
|
||||||
|
Connection: close\r\n\
|
||||||
|
\r\n",
|
||||||
|
body.len()
|
||||||
|
);
|
||||||
|
stream.write_all(head.as_bytes()).await?;
|
||||||
|
stream.write_all(body).await?;
|
||||||
|
stream.flush().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_text(stream: &mut TcpStream, code: u16, body: &str) -> anyhow::Result<()> {
|
||||||
|
write_response(stream, code, "text/plain", body.as_bytes()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_error(stream: &mut TcpStream, code: u16, msg: &str) -> anyhow::Result<()> {
|
||||||
|
let body = serde_json::json!({ "error": msg }).to_string();
|
||||||
|
write_response(stream, code, "application/json", body.as_bytes()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_tracing() {
|
||||||
|
use tracing_subscriber::{fmt, EnvFilter};
|
||||||
|
let filter = EnvFilter::try_from_env("SHIPOTE_GATEWAY_LOG").unwrap_or_else(|_| EnvFilter::new("info"));
|
||||||
|
fmt().with_env_filter(filter).init();
|
||||||
|
}
|
||||||
@@ -67,6 +67,7 @@
|
|||||||
- **`crates/apps/shipote-daemon`** — long-running. Escucha admin socket, ejecuta el dispatch, reaper periódico.
|
- **`crates/apps/shipote-daemon`** — long-running. Escucha admin socket, ejecuta el dispatch, reaper periódico.
|
||||||
- **`crates/apps/shipote-cli`** — `shipote` (clap). Para administración + scripting.
|
- **`crates/apps/shipote-cli`** — `shipote` (clap). Para administración + scripting.
|
||||||
- **`crates/apps/shipote-shell`** — GUI con `yahweh_launcher`. Dashboard: estado, workspaces, comandos, flows, quotas, sparkline.
|
- **`crates/apps/shipote-shell`** — GUI con `yahweh_launcher`. Dashboard: estado, workspaces, comandos, flows, quotas, sparkline.
|
||||||
|
- **`crates/apps/shipote-gateway`** — HTTP/JSON adapter. `POST /rpc` con body JSON `Request` → traduce a postcard, round-trip al daemon, retorna JSON. Default `127.0.0.1:7378`.
|
||||||
|
|
||||||
### Consumidores del discerner (externos a shipote pero usándolo)
|
### Consumidores del discerner (externos a shipote pero usándolo)
|
||||||
- **`crates/modules/ui_engine/libs/providers/fs`** — `FileDataProvider` ahora puebla `mime_type` con `DiscernPipeline`.
|
- **`crates/modules/ui_engine/libs/providers/fs`** — `FileDataProvider` ahora puebla `mime_type` con `DiscernPipeline`.
|
||||||
|
|||||||
Reference in New Issue
Block a user