From f57c61fe3eba10b00b5a701ce2f4526e96de0650 Mon Sep 17 00:00:00 2001 From: sergio Date: Wed, 20 May 2026 21:01:56 +0000 Subject: [PATCH] =?UTF-8?q?feat(mirada):=20mirada-link=20=E2=80=94=20trans?= =?UTF-8?q?porte=20Cerebro=E2=86=94Cuerpo=20del=20compositor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Link sobre socket Unix: hilo lector de fondo + canal mpsc para sondeo no bloqueante. BrainLink/BodyLink, connected_pair (socketpair), connect/listen por ruta; Drop cierra el socket y propaga EOF. 7 tests. Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 8 + Cargo.toml | 1 + crates/modules/mirada/mirada-link/Cargo.toml | 12 + crates/modules/mirada/mirada-link/src/lib.rs | 250 +++++++++++++++++++ 4 files changed, 271 insertions(+) create mode 100644 crates/modules/mirada/mirada-link/Cargo.toml create mode 100644 crates/modules/mirada/mirada-link/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 38d5968..6a5c778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7405,6 +7405,14 @@ dependencies = [ "serde", ] +[[package]] +name = "mirada-link" +version = "0.1.0" +dependencies = [ + "mirada-protocol", + "serde", +] + [[package]] name = "mirada-protocol" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index c1f2ea7..96c70f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,6 +163,7 @@ members = [ "crates/modules/mirada/mirada-layout", "crates/modules/mirada/mirada-protocol", "crates/modules/mirada/mirada-brain", + "crates/modules/mirada/mirada-link", # ============================================================ # modules/nakui/ — ERP matemático (categórico) diff --git a/crates/modules/mirada/mirada-link/Cargo.toml b/crates/modules/mirada/mirada-link/Cargo.toml new file mode 100644 index 0000000..e294d03 --- /dev/null +++ b/crates/modules/mirada/mirada-link/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mirada-link" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +publish.workspace = true +description = "mirada — transporte Cerebro↔Cuerpo del compositor: el canal de socket Unix que mueve BrainCommand y BodyEvent sobre el marco postcard de mirada-protocol." + +[dependencies] +mirada-protocol = { path = "../mirada-protocol" } +serde = { workspace = true } diff --git a/crates/modules/mirada/mirada-link/src/lib.rs b/crates/modules/mirada/mirada-link/src/lib.rs new file mode 100644 index 0000000..3e494cf --- /dev/null +++ b/crates/modules/mirada/mirada-link/src/lib.rs @@ -0,0 +1,250 @@ +//! `mirada-link` — el transporte Cerebro↔Cuerpo del compositor. +//! +//! [`mirada_protocol`] define *qué* se dice (los enums y el marco de +//! cable); este crate define *cómo viaja*: un socket Unix con un hilo +//! lector de fondo que entrega los mensajes recibidos por un canal, para +//! que el dueño del [`Link`] sólo tenga que sondear sin bloquearse. +//! +//! Los dos procesos usan el mismo tipo, parametrizado al revés: +//! +//! - El Cerebro tiene un [`BrainLink`]: envía [`BrainCommand`], recibe +//! [`BodyEvent`]. +//! - El Cuerpo tiene un [`BodyLink`]: envía [`BodyEvent`], recibe +//! [`BrainCommand`]. +//! +//! Para arrancar el par hay tres caminos: [`connected_pair`] (un +//! `socketpair`, ideal para heredar un fd al lanzar al hijo o para +//! tests), [`Link::connect`] (conectar a una ruta) y [`Link::listen`] +//! (escuchar en una ruta y aceptar una conexión). + +#![forbid(unsafe_code)] + +use std::io::{self, BufReader}; +use std::marker::PhantomData; +use std::net::Shutdown; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::Path; +use std::sync::mpsc::{self, Receiver}; +use std::thread; + +use serde::de::DeserializeOwned; +use serde::Serialize; + +use mirada_protocol::{read_frame, write_frame, BodyEvent, BrainCommand}; + +/// El extremo del Cerebro: envía [`BrainCommand`], recibe [`BodyEvent`]. +pub type BrainLink = Link; + +/// El extremo del Cuerpo: envía [`BodyEvent`], recibe [`BrainCommand`]. +pub type BodyLink = Link; + +/// Un extremo del canal: envía mensajes de tipo `Out` y recibe `In`. +/// +/// La escritura es síncrona sobre el socket; la lectura la hace un hilo +/// de fondo que deposita lo recibido en un canal interno. Al soltar el +/// `Link` se cierra el socket, lo que termina el hilo lector propio y le +/// señala EOF al otro extremo. +pub struct Link { + writer: UnixStream, + incoming: Receiver, + _out: PhantomData, +} + +impl Link +where + Out: Serialize, + In: DeserializeOwned + Send + 'static, +{ + /// Construye un `Link` sobre un socket ya conectado. + pub fn from_stream(stream: UnixStream) -> io::Result { + let reader = stream.try_clone()?; + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + let mut r = BufReader::new(reader); + // Lee marcos hasta EOF limpio o error de socket. + while let Ok(Some(msg)) = read_frame::<_, In>(&mut r) { + if tx.send(msg).is_err() { + break; // el dueño soltó el Link + } + } + }); + Ok(Self { writer: stream, incoming: rx, _out: PhantomData }) + } + + /// Conecta a un socket Unix en `path` (lado cliente). + pub fn connect>(path: P) -> io::Result { + Self::from_stream(UnixStream::connect(path)?) + } + + /// Escucha en `path` y bloquea hasta aceptar una conexión (lado + /// servidor). El socket de escucha se cierra tras el primer cliente. + pub fn listen>(path: P) -> io::Result { + let listener = UnixListener::bind(path)?; + let (stream, _) = listener.accept()?; + Self::from_stream(stream) + } + + /// Envía un mensaje. Falla si el otro extremo cerró el canal. + pub fn send(&mut self, msg: &Out) -> io::Result<()> { + write_frame(&mut self.writer, msg) + } + + /// Recoge un mensaje si hay alguno pendiente, sin bloquear. + pub fn try_recv(&self) -> Option { + self.incoming.try_recv().ok() + } + + /// Vacía todos los mensajes pendientes — un tick del bucle de eventos. + pub fn drain(&self) -> Vec { + self.incoming.try_iter().collect() + } + + /// Bloquea hasta recibir un mensaje. Devuelve `None` si el otro + /// extremo cerró el canal. + pub fn recv(&self) -> Option { + self.incoming.recv().ok() + } +} + +impl Drop for Link { + fn drop(&mut self) { + // Cierra la conexión: el hilo lector propio recibe EOF y termina, + // y el otro extremo ve EOF en su próxima lectura. + let _ = self.writer.shutdown(Shutdown::Both); + } +} + +/// Crea un par Cerebro↔Cuerpo conectado en memoria, con un `socketpair`. +/// +/// Es el camino de los tests y también el del despliegue real cuando el +/// Cerebro lanza al Cuerpo como proceso hijo y le hereda un extremo. +pub fn connected_pair() -> io::Result<(BrainLink, BodyLink)> { + let (a, b) = UnixStream::pair()?; + Ok((Link::from_stream(a)?, Link::from_stream(b)?)) +} + +#[cfg(test)] +mod tests { + use super::*; + use mirada_protocol::{Rect, WindowPlacement}; + use std::time::Duration; + + fn place(id: u64) -> BrainCommand { + BrainCommand::Place(vec![WindowPlacement { + id, + rect: Rect::new(0, 0, 800, 600), + visible: true, + focused: true, + }]) + } + + #[test] + fn brain_command_reaches_the_body() { + let (mut brain, body) = connected_pair().unwrap(); + brain.send(&place(1)).unwrap(); + // Da un instante al hilo lector. + for _ in 0..100 { + if let Some(cmd) = body.try_recv() { + assert_eq!(cmd, place(1)); + return; + } + thread::sleep(Duration::from_millis(2)); + } + panic!("el comando no llegó al Cuerpo"); + } + + #[test] + fn body_event_reaches_the_brain() { + let (brain, mut body) = connected_pair().unwrap(); + let ev = BodyEvent::Keybind("Super+Return".into()); + body.send(&ev).unwrap(); + assert_eq!(brain.recv(), Some(ev)); + } + + #[test] + fn many_messages_keep_their_order() { + let (brain, mut body) = connected_pair().unwrap(); + for id in 0..20 { + body.send(&BodyEvent::WindowClosed { id }).unwrap(); + } + for id in 0..20 { + assert_eq!(brain.recv(), Some(BodyEvent::WindowClosed { id })); + } + } + + #[test] + fn drain_collects_everything_pending() { + let (mut brain, body) = connected_pair().unwrap(); + for id in 1..=5 { + brain.send(&place(id)).unwrap(); + } + // Espera a que el hilo lector encole los cinco. + let mut got = Vec::new(); + for _ in 0..100 { + got.extend(body.drain()); + if got.len() == 5 { + break; + } + thread::sleep(Duration::from_millis(2)); + } + assert_eq!(got.len(), 5); + } + + #[test] + fn dropping_one_end_closes_the_other() { + let (brain, body) = connected_pair().unwrap(); + drop(body); + // Sin nadie al otro lado, recv termina con None en vez de colgarse. + assert_eq!(brain.recv(), None); + } + + #[test] + fn sending_into_a_closed_link_errors() { + let (mut brain, body) = connected_pair().unwrap(); + drop(body); + // La primera escritura puede pasar al búfer del socket; alguna + // de ellas acaba fallando con tubería rota. + let mut errored = false; + for id in 0..1000 { + if brain.send(&place(id)).is_err() { + errored = true; + break; + } + } + assert!(errored, "se esperaba un error de tubería rota"); + } + + #[test] + fn connect_and_listen_round_trip_over_a_path() { + let dir = std::env::temp_dir(); + let path = dir.join(format!("mirada-link-test-{}.sock", std::process::id())); + let _ = std::fs::remove_file(&path); + + let server_path = path.clone(); + let server = thread::spawn(move || { + let mut link: BodyLink = Link::listen(&server_path).unwrap(); + link.send(&BodyEvent::OutputAdded { id: 0, width: 1920, height: 1080 }) + .unwrap(); + // Mantén vivo el extremo hasta que el cliente lea. + link.recv() + }); + + // Espera a que el servidor publique el socket. + let mut brain: Option = None; + for _ in 0..200 { + if let Ok(l) = Link::connect(&path) { + brain = Some(l); + break; + } + thread::sleep(Duration::from_millis(2)); + } + let mut brain = brain.expect("no se pudo conectar al servidor"); + assert_eq!( + brain.recv(), + Some(BodyEvent::OutputAdded { id: 0, width: 1920, height: 1080 }) + ); + brain.send(&BrainCommand::Shutdown).unwrap(); + assert_eq!(server.join().unwrap(), Some(BrainCommand::Shutdown)); + let _ = std::fs::remove_file(&path); + } +}