diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f1a9ce..b4726d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,48 @@ ratio/diff ver `git show `. ## 2026-05-09 +### feat(nouser+sidecar): watcher con debounce + re-publish al broker +Cierra las dos limitaciones del watcher previo: ya no spamea N veces por +una sola edición, y el broker ve los cambios estructurales en lugar de +quedarse con manifests congelados al arranque. + + $ nouser daemon /tmp/x & + $ touch /tmp/x/src/a.rs /tmp/x/src/b.rs /tmp/x/src/c.rs + # daemon log (un solo batch, no 9 reacciones): + [watcher] ⚙ batch: 6 path(s) coalescidos → re-scan + [watcher] ✦ x/src nace (3 miembros, lens=Code) + [watcher] ⌃ delta: 1 nuevas, 0 refrescadas, 0 cerradas — 3 sesiones vivas + +Mecánica del debounce (150ms): +- `spawn_fs_watcher` arma dos threads: **dispatcher** filtra eventos + notify Create/Modify/Remove a un canal de paths; **coordinator** + mantiene `HashMap` y dispara batch sólo cuando + todos los paths llevan ≥150ms quietos. +- Un `:w` típico de vim (~5 eventos por archivo) colapsa a 1 batch. + +Mecánica del re-publish: +- `SidecarPool` ahora trackea `HashMap` indexado + por `Card.id`. Llamar `pool.spawn(card)` con un id ya presente + aborta la sesión previa y abre una nueva — `spawn` se vuelve + idempotente: re-publicar una Mónada cuya composición cambió + refresca su sesión en el broker sin dejar zombies. +- Nueva API `pool.drop_session(id)` para cerrar una sesión + explícitamente cuando una Mónada desaparece (directorio quedó + bajo `min_files` o se borró). +- `pool.live_sessions()` para introspección/logs. +- `process_change_batch` re-scanea + re-clusteriza con hidratación, + diffea contra prior_monads, y para cada Mónada decide: + - removida → `drop_session` + - nueva → `spawn` con ✦ + - composición cambió (members o centroid distintos) → `spawn` con ↻ + - idéntica → no-op + +Trade-off aceptado: re-scan global por batch (no incremental). Es +O(N archivos) por evento y para árboles típicos (<10k) cae en +<100ms. Optimizar a re-cluster parcial cuando duela. + +Tests: workspace completo verde. + ### feat(nouser): notify watcher — el sistema reacciona en tiempo real El daemon ahora monta un `notify::recommended_watcher` recursivo sobre el directorio. Cada `Create`/`Modify` de archivo regular diff --git a/crates/modules/nouser/core/src/bin/nouser.rs b/crates/modules/nouser/core/src/bin/nouser.rs index b58a360..62d4d38 100644 --- a/crates/modules/nouser/core/src/bin/nouser.rs +++ b/crates/modules/nouser/core/src/bin/nouser.rs @@ -175,8 +175,9 @@ fn cmd_json(args: &[String]) -> Cmd { fn cmd_daemon(args: &[String]) -> Cmd { let dir = require_dir(args)?; - let pool = brahman_sidecar::SidecarPool::new() - .map_err(|e| format!("crear pool: {e}"))?; + let pool = std::sync::Arc::new( + brahman_sidecar::SidecarPool::new().map_err(|e| format!("crear pool: {e}"))?, + ); // 1. Engine como Ente. let engine_card = build_engine_card(); @@ -271,14 +272,21 @@ fn cmd_daemon(args: &[String]) -> Cmd { scanned_count, newly_spawned ); - // Watcher: cada cambio en el árbol dispara un cálculo de - // atracción. Esto vuelve "vivo" al sistema — `vim archivo.rs` - // produce inmediatamente "→ atraído a brahman-handshake/src 0.91". + // Watcher: cada cambio en el árbol — coalescido con debounce de + // 150ms — dispara un re-scan + re-cluster del directorio y + // re-publica al broker las Mónadas afectadas (drop + spawn por id, + // gracias al replace en `SidecarPool::spawn`). let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db)); - let _watcher = match spawn_fs_watcher(dir.clone(), db_shared.clone()) { + let _watcher = match spawn_fs_watcher( + dir.clone(), + db_shared.clone(), + pool.clone(), + engine_id, + engine_label.clone(), + ) { Ok(w) => { eprintln!( - "nouser daemon: watcher activo en {} — Ctrl-C para terminar.", + "nouser daemon: watcher activo en {} (debounce 150ms, re-publish on) — Ctrl-C para terminar.", dir.display() ); Some(w) @@ -297,26 +305,51 @@ fn cmd_daemon(args: &[String]) -> Cmd { Ok(()) } -/// Watcher de filesystem: por cada Create/Modify en el árbol, -/// computa el embedding del archivo y reporta a qué Mónada se -/// atrae. No re-publica ni muta el broker — sólo observa y narra. -/// La invalidación selectiva queda como work futuro. +/// Ventana de debounce: notify dispara Create+Modify(+) por cada +/// edición; sin coalescer veríamos N reacciones por un solo `:w`. +/// 150ms es generoso para editores típicos (vim/code) y mantiene el +/// feedback "vivo" para el usuario. +const WATCHER_DEBOUNCE_MS: u64 = 150; + +/// Watcher de filesystem con debounce + re-publish al broker. +/// +/// Pipeline: +/// +/// 1. **notify** dispara eventos crudos a un canal interno. +/// 2. **dispatcher**: filtra a Create/Modify/Remove de paths bajo +/// `dir`, descarta el resto, reenvía al canal de debounce. +/// 3. **coordinator**: mantiene un `HashMap`. +/// Cada vez que el canal queda en silencio durante +/// `WATCHER_DEBOUNCE_MS`, agrupa los paths cuya última actividad +/// superó la ventana y los procesa en **un solo batch**. +/// 4. **process_change_batch**: re-scan + re-cluster hidratado + +/// diff vs DB + `pool.drop_session` para Mónadas desaparecidas +/// + `pool.spawn` para Mónadas nuevas o con composición distinta. +/// `pool.spawn` reemplaza la sesión previa con el mismo `Card.id`, +/// así que el broker ve el manifest fresco sin sesiones huérfanas. fn spawn_fs_watcher( dir: std::path::PathBuf, db: std::sync::Arc>, + pool: std::sync::Arc, + engine_id: brahman_card::ulid::Ulid, + engine_label: String, ) -> Result> { use notify::{Event, EventKind, RecursiveMode, Watcher}; - let (tx, rx) = std::sync::mpsc::channel::>(); + let (notify_tx, notify_rx) = std::sync::mpsc::channel::>(); let mut watcher = notify::recommended_watcher(move |res| { - let _ = tx.send(res); + let _ = notify_tx.send(res); })?; watcher.watch(&dir, RecursiveMode::Recursive)?; + let (path_tx, path_rx) = std::sync::mpsc::channel::(); + + // Dispatcher: notify → filtro → canal de paths. + let dispatch_dir = dir.clone(); std::thread::Builder::new() - .name("nouser-watcher".into()) + .name("nouser-watcher-dispatch".into()) .spawn(move || { - for res in rx { + for res in notify_rx { let event = match res { Ok(e) => e, Err(e) => { @@ -324,92 +357,180 @@ fn spawn_fs_watcher( continue; } }; - // Sólo reaccionamos a Create/Modify de archivos - // regulares; renames / removes los manejará un re-scan. + // Create/Modify viven; Remove también nos importa + // (puede colapsar Mónadas). let interesting = matches!( event.kind, - EventKind::Create(_) | EventKind::Modify(_) + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) ); if !interesting { continue; } - for path in &event.paths { - let metadata = match std::fs::metadata(path) { - Ok(m) if m.is_file() => m, - _ => continue, - }; - react_to_change(path, &metadata, &db); + for path in event.paths { + if !path.starts_with(&dispatch_dir) { + continue; + } + let _ = path_tx.send(path); } } })?; + // Coordinator: debounce + batch dispatch. + let coord_dir = dir; + std::thread::Builder::new() + .name("nouser-watcher-coord".into()) + .spawn(move || { + let debounce = std::time::Duration::from_millis(WATCHER_DEBOUNCE_MS); + let mut pending: std::collections::HashMap< + std::path::PathBuf, + std::time::Instant, + > = std::collections::HashMap::new(); + loop { + match path_rx.recv_timeout(debounce) { + Ok(path) => { + pending.insert(path, std::time::Instant::now()); + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, + } + let now = std::time::Instant::now(); + let due: Vec = pending + .iter() + .filter(|(_, t)| now.duration_since(**t) >= debounce) + .map(|(p, _)| p.clone()) + .collect(); + if due.is_empty() { + continue; + } + for p in &due { + pending.remove(p); + } + process_change_batch( + &due, + &coord_dir, + &db, + &pool, + engine_id, + &engine_label, + ); + } + })?; + Ok(watcher) } -fn react_to_change( - path: &std::path::Path, - metadata: &std::fs::Metadata, +/// Procesa un batch de paths cambiados: re-scanea el árbol, re-clusteriza +/// con hidratación, y propaga el delta de Mónadas al broker. +/// +/// El re-scan global es deliberado: el clustering por directorio es global +/// por diseño, así que un cambio en `src/foo.rs` puede mover Mónadas en +/// `src/` sin tocar `tests/`. Coste O(N archivos), aceptable para +/// directorios típicos (<10k archivos). Optimizar a re-cluster parcial +/// cuando duela. +fn process_change_batch( + paths: &[std::path::PathBuf], + dir: &std::path::Path, db: &std::sync::Arc>, + pool: &std::sync::Arc, + engine_id: brahman_card::ulid::Ulid, + engine_label: &str, ) { - let mtime_ms = metadata - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_millis() as u64) - .unwrap_or(0); - let target = nouser_card::FileEntry { - id: nouser_card::FileId::from(nouser_card::ulid::Ulid::new()), - path: path.to_path_buf(), - content_hash: None, - size: metadata.len(), - mtime_ms, - extension: path - .extension() - .and_then(|s| s.to_str()) - .map(|s| s.to_lowercase()), - }; - let v = embed::embed(&target); + eprintln!( + "[watcher] ⚙ batch: {} path(s) coalescidos → re-scan", + paths.len() + ); - let db_lock = match db.lock() { + let files = match scanner::scan_directory(dir, &scanner::ScanConfig::default()) { + Ok(f) => f, + Err(e) => { + eprintln!("[watcher] re-scan falló: {e}"); + return; + } + }; + + let mut db_lock = match db.lock() { Ok(g) => g, - Err(_) => return, // mutex envenenado, salimos silenciosos + Err(_) => { + eprintln!("[watcher] mutex envenenado — abortando batch"); + return; + } }; - // Filtramos por modelo coincidente (mismo cuidado que cmd_attract). - let best = db_lock - .monads() - .filter(|m| !m.centroid.is_empty()) - .filter(|m| { - m.centroid_model - .as_deref() - .map(|id| id == embed::MODEL_ID) - .unwrap_or(false) - }) - .map(|m| (m, embed::attraction_score(&v, m))) - .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); + let prior_monads: Vec = db_lock.monads().cloned().collect(); + let prior_ref: &db::MonadDb = &db_lock; + let monads = cluster::by_directory_hydrated(&files, min_files(), Some(prior_ref)); - match best { - Some((m, score)) if score >= embed::DEFAULT_ATTRACTION_THRESHOLD => { + let prior_ids: std::collections::BTreeSet<_> = + prior_monads.iter().map(|m| m.id).collect(); + let new_ids: std::collections::BTreeSet<_> = monads.iter().map(|m| m.id).collect(); + + // Mónadas que ya no existen (directorio quedó por debajo de + // min_files o fue removido): cerramos su sesión en el broker. + let mut removed = 0usize; + for id in prior_ids.difference(&new_ids) { + pool.drop_session(*id); + removed += 1; + if let Some(prev) = prior_monads.iter().find(|m| &m.id == id) { eprintln!( - "[watcher] 🧲 {} → {} ({:.4})", - path.display(), - m.label, - score + "[watcher] ✖ {} ({}) desapareció — sesión cerrada", + &id.to_string()[..8], + prev.label ); } - Some((m, score)) => { - eprintln!( - "[watcher] · {} → {} (mejor, {:.4} < umbral {:.4})", - path.display(), - m.label, - score, - embed::DEFAULT_ATTRACTION_THRESHOLD - ); - } - None => { - eprintln!("[watcher] {} (ninguna Mónada con centroide compatible)", path.display()); - } } + + // Mónadas nuevas o cuya composición cambió (members/centroid): + // (re)spawn — el pool reemplaza la sesión previa con el mismo id. + let mut respawned = 0usize; + let mut fresh = 0usize; + for monad in &monads { + let prev = prior_monads.iter().find(|m| m.id == monad.id); + let is_new = prev.is_none(); + let changed = match prev { + Some(p) => p.members != monad.members || p.centroid != monad.centroid, + None => true, + }; + if !changed { + continue; + } + let mut card = monad.to_brahman_card(); + card.references.push(brahman_card::CardReference { + kind: brahman_card::RelationshipKind::OwnedBy, + target_id: engine_id, + target_label: engine_label.to_string(), + }); + pool.spawn(card); + if is_new { + fresh += 1; + eprintln!( + "[watcher] ✦ {} nace ({} miembros, lens={:?})", + monad.label, monad.cardinality, monad.dominant_lens + ); + } else { + respawned += 1; + let prev = prev.unwrap(); + let delta_members = monad.members.len() as i64 - prev.members.len() as i64; + eprintln!( + "[watcher] ↻ {} refresh ({} miembros, Δ={:+})", + monad.label, monad.cardinality, delta_members + ); + } + } + + if removed == 0 && fresh == 0 && respawned == 0 { + eprintln!("[watcher] (sin cambios estructurales tras re-cluster)"); + } else { + eprintln!( + "[watcher] ⌃ delta: {} nuevas, {} refrescadas, {} cerradas — {} sesiones vivas", + fresh, + respawned, + removed, + pool.live_sessions() + ); + } + + db_lock.ingest_files(files); + db_lock.replace_monads(monads); } fn cmd_attract(args: &[String]) -> Cmd { diff --git a/crates/shared/brahman-sidecar/src/lib.rs b/crates/shared/brahman-sidecar/src/lib.rs index d6f3763..5fcf099 100644 --- a/crates/shared/brahman-sidecar/src/lib.rs +++ b/crates/shared/brahman-sidecar/src/lib.rs @@ -16,12 +16,14 @@ #![forbid(unsafe_code)] #![warn(rust_2018_idioms)] -use std::sync::mpsc; +use std::collections::HashMap; +use std::sync::{mpsc, Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; -use brahman_card::{Card, WitInterface}; +use brahman_card::{ulid::Ulid, Card, WitInterface}; use brahman_handshake::{client::Client, transport}; +use tokio::task::AbortHandle; use tracing::{info, warn}; /// Período entre pings al Init. @@ -101,6 +103,11 @@ pub fn spawn_with_handle(config: SidecarConfig) -> std::io::Result>>, _thread: JoinHandle<()>, } @@ -133,6 +140,7 @@ impl SidecarPool { .map_err(|_| std::io::Error::other("pool runtime no respondió"))?; Ok(Self { handle, + sessions: Arc::new(Mutex::new(HashMap::new())), _thread: thread, }) } @@ -148,8 +156,36 @@ impl SidecarPool { } /// Añade una sesión con configuración custom. + /// + /// Si ya existía una sesión para el mismo `Card.id`, la previa + /// se aborta antes de spawnear la nueva. Esto hace `spawn` + /// idempotente respecto al id: re-publicar una Mónada cuya + /// composición cambió "refresca" la sesión en el broker. pub fn spawn_with_config(&self, config: SidecarConfig) { - self.handle.spawn(run_client(config)); + let card_id = config.card.id; + let join = self.handle.spawn(run_client(config)); + let abort = join.abort_handle(); + if let Ok(mut sessions) = self.sessions.lock() { + if let Some(prev) = sessions.insert(card_id, abort) { + prev.abort(); + } + } + } + + /// Cierra explícitamente la sesión asociada a `card_id`. No-op si + /// no había sesión registrada. + pub fn drop_session(&self, card_id: Ulid) { + if let Ok(mut sessions) = self.sessions.lock() { + if let Some(abort) = sessions.remove(&card_id) { + abort.abort(); + } + } + } + + /// Cantidad actual de sesiones vivas (estimada — puede haber + /// drift transitorio entre abort y limpieza). + pub fn live_sessions(&self) -> usize { + self.sessions.lock().map(|s| s.len()).unwrap_or(0) } }