feat(nouser+sidecar): watcher con debounce 150ms + re-publish al broker

Cierra los dos pendientes documentados en 487c457: el spam de eventos
duplicados de notify y la falta de propagación al broker cuando una
Mónada cambia composición.

SidecarPool ahora es idempotente respecto a Card.id: spawn rastrea un
HashMap<Ulid, AbortHandle> y aborta la sesión previa si el id ya
existía. Nuevo drop_session(id) para cerrar Mónadas que desaparecen y
live_sessions() para introspección.

Watcher reorganizado en dos threads: dispatcher filtra notify a un
canal de paths; coordinator agrupa con HashMap<PathBuf, Instant> y
dispara batch sólo cuando todos llevan ≥150ms quietos. Cada batch
re-scanea + re-clusteriza con hidratación + diffea contra prior:
removidas → drop_session, nuevas o con composición distinta → spawn
(que reemplaza la sesión previa). Re-scan global por batch es
deliberado y O(N archivos) — aceptable hasta que duela.
This commit is contained in:
Sergio
2026-05-09 01:37:39 +00:00
parent 487c457e5b
commit 2725d6a297
3 changed files with 279 additions and 80 deletions
+198 -77
View File
@@ -175,8 +175,9 @@ fn cmd_json(args: &[String]) -> Cmd {
fn cmd_daemon(args: &[String]) -> Cmd {
let dir = require_dir(args)?;
let pool = brahman_sidecar::SidecarPool::new()
.map_err(|e| format!("crear pool: {e}"))?;
let pool = std::sync::Arc::new(
brahman_sidecar::SidecarPool::new().map_err(|e| format!("crear pool: {e}"))?,
);
// 1. Engine como Ente.
let engine_card = build_engine_card();
@@ -271,14 +272,21 @@ fn cmd_daemon(args: &[String]) -> Cmd {
scanned_count, newly_spawned
);
// Watcher: cada cambio en el árbol dispara un cálculo de
// atracción. Esto vuelve "vivo" al sistema — `vim archivo.rs`
// produce inmediatamente "→ atraído a brahman-handshake/src 0.91".
// Watcher: cada cambio en el árbol — coalescido con debounce de
// 150ms — dispara un re-scan + re-cluster del directorio y
// re-publica al broker las Mónadas afectadas (drop + spawn por id,
// gracias al replace en `SidecarPool::spawn`).
let db_shared = std::sync::Arc::new(std::sync::Mutex::new(db));
let _watcher = match spawn_fs_watcher(dir.clone(), db_shared.clone()) {
let _watcher = match spawn_fs_watcher(
dir.clone(),
db_shared.clone(),
pool.clone(),
engine_id,
engine_label.clone(),
) {
Ok(w) => {
eprintln!(
"nouser daemon: watcher activo en {} — Ctrl-C para terminar.",
"nouser daemon: watcher activo en {} (debounce 150ms, re-publish on) — Ctrl-C para terminar.",
dir.display()
);
Some(w)
@@ -297,26 +305,51 @@ fn cmd_daemon(args: &[String]) -> Cmd {
Ok(())
}
/// Watcher de filesystem: por cada Create/Modify en el árbol,
/// computa el embedding del archivo y reporta a qué Mónada se
/// atrae. No re-publica ni muta el broker — sólo observa y narra.
/// La invalidación selectiva queda como work futuro.
/// Ventana de debounce: notify dispara Create+Modify(+) por cada
/// edición; sin coalescer veríamos N reacciones por un solo `:w`.
/// 150ms es generoso para editores típicos (vim/code) y mantiene el
/// feedback "vivo" para el usuario.
const WATCHER_DEBOUNCE_MS: u64 = 150;
/// Watcher de filesystem con debounce + re-publish al broker.
///
/// Pipeline:
///
/// 1. **notify** dispara eventos crudos a un canal interno.
/// 2. **dispatcher**: filtra a Create/Modify/Remove de paths bajo
/// `dir`, descarta el resto, reenvía al canal de debounce.
/// 3. **coordinator**: mantiene un `HashMap<PathBuf, Instant>`.
/// Cada vez que el canal queda en silencio durante
/// `WATCHER_DEBOUNCE_MS`, agrupa los paths cuya última actividad
/// superó la ventana y los procesa en **un solo batch**.
/// 4. **process_change_batch**: re-scan + re-cluster hidratado +
/// diff vs DB + `pool.drop_session` para Mónadas desaparecidas
/// + `pool.spawn` para Mónadas nuevas o con composición distinta.
/// `pool.spawn` reemplaza la sesión previa con el mismo `Card.id`,
/// así que el broker ve el manifest fresco sin sesiones huérfanas.
fn spawn_fs_watcher(
dir: std::path::PathBuf,
db: std::sync::Arc<std::sync::Mutex<db::MonadDb>>,
pool: std::sync::Arc<brahman_sidecar::SidecarPool>,
engine_id: brahman_card::ulid::Ulid,
engine_label: String,
) -> Result<notify::RecommendedWatcher, Box<dyn std::error::Error>> {
use notify::{Event, EventKind, RecursiveMode, Watcher};
let (tx, rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
let (notify_tx, notify_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.send(res);
let _ = notify_tx.send(res);
})?;
watcher.watch(&dir, RecursiveMode::Recursive)?;
let (path_tx, path_rx) = std::sync::mpsc::channel::<std::path::PathBuf>();
// Dispatcher: notify → filtro → canal de paths.
let dispatch_dir = dir.clone();
std::thread::Builder::new()
.name("nouser-watcher".into())
.name("nouser-watcher-dispatch".into())
.spawn(move || {
for res in rx {
for res in notify_rx {
let event = match res {
Ok(e) => e,
Err(e) => {
@@ -324,92 +357,180 @@ fn spawn_fs_watcher(
continue;
}
};
// Sólo reaccionamos a Create/Modify de archivos
// regulares; renames / removes los manejará un re-scan.
// Create/Modify viven; Remove también nos importa
// (puede colapsar Mónadas).
let interesting = matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_)
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
);
if !interesting {
continue;
}
for path in &event.paths {
let metadata = match std::fs::metadata(path) {
Ok(m) if m.is_file() => m,
_ => continue,
};
react_to_change(path, &metadata, &db);
for path in event.paths {
if !path.starts_with(&dispatch_dir) {
continue;
}
let _ = path_tx.send(path);
}
}
})?;
// Coordinator: debounce + batch dispatch.
let coord_dir = dir;
std::thread::Builder::new()
.name("nouser-watcher-coord".into())
.spawn(move || {
let debounce = std::time::Duration::from_millis(WATCHER_DEBOUNCE_MS);
let mut pending: std::collections::HashMap<
std::path::PathBuf,
std::time::Instant,
> = std::collections::HashMap::new();
loop {
match path_rx.recv_timeout(debounce) {
Ok(path) => {
pending.insert(path, std::time::Instant::now());
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
}
let now = std::time::Instant::now();
let due: Vec<std::path::PathBuf> = pending
.iter()
.filter(|(_, t)| now.duration_since(**t) >= debounce)
.map(|(p, _)| p.clone())
.collect();
if due.is_empty() {
continue;
}
for p in &due {
pending.remove(p);
}
process_change_batch(
&due,
&coord_dir,
&db,
&pool,
engine_id,
&engine_label,
);
}
})?;
Ok(watcher)
}
fn react_to_change(
path: &std::path::Path,
metadata: &std::fs::Metadata,
/// Procesa un batch de paths cambiados: re-scanea el árbol, re-clusteriza
/// con hidratación, y propaga el delta de Mónadas al broker.
///
/// El re-scan global es deliberado: el clustering por directorio es global
/// por diseño, así que un cambio en `src/foo.rs` puede mover Mónadas en
/// `src/` sin tocar `tests/`. Coste O(N archivos), aceptable para
/// directorios típicos (<10k archivos). Optimizar a re-cluster parcial
/// cuando duela.
fn process_change_batch(
paths: &[std::path::PathBuf],
dir: &std::path::Path,
db: &std::sync::Arc<std::sync::Mutex<db::MonadDb>>,
pool: &std::sync::Arc<brahman_sidecar::SidecarPool>,
engine_id: brahman_card::ulid::Ulid,
engine_label: &str,
) {
let mtime_ms = metadata
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let target = nouser_card::FileEntry {
id: nouser_card::FileId::from(nouser_card::ulid::Ulid::new()),
path: path.to_path_buf(),
content_hash: None,
size: metadata.len(),
mtime_ms,
extension: path
.extension()
.and_then(|s| s.to_str())
.map(|s| s.to_lowercase()),
};
let v = embed::embed(&target);
eprintln!(
"[watcher] ⚙ batch: {} path(s) coalescidos → re-scan",
paths.len()
);
let db_lock = match db.lock() {
let files = match scanner::scan_directory(dir, &scanner::ScanConfig::default()) {
Ok(f) => f,
Err(e) => {
eprintln!("[watcher] re-scan falló: {e}");
return;
}
};
let mut db_lock = match db.lock() {
Ok(g) => g,
Err(_) => return, // mutex envenenado, salimos silenciosos
Err(_) => {
eprintln!("[watcher] mutex envenenado — abortando batch");
return;
}
};
// Filtramos por modelo coincidente (mismo cuidado que cmd_attract).
let best = db_lock
.monads()
.filter(|m| !m.centroid.is_empty())
.filter(|m| {
m.centroid_model
.as_deref()
.map(|id| id == embed::MODEL_ID)
.unwrap_or(false)
})
.map(|m| (m, embed::attraction_score(&v, m)))
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let prior_monads: Vec<nouser_card::MonadManifest> = db_lock.monads().cloned().collect();
let prior_ref: &db::MonadDb = &db_lock;
let monads = cluster::by_directory_hydrated(&files, min_files(), Some(prior_ref));
match best {
Some((m, score)) if score >= embed::DEFAULT_ATTRACTION_THRESHOLD => {
let prior_ids: std::collections::BTreeSet<_> =
prior_monads.iter().map(|m| m.id).collect();
let new_ids: std::collections::BTreeSet<_> = monads.iter().map(|m| m.id).collect();
// Mónadas que ya no existen (directorio quedó por debajo de
// min_files o fue removido): cerramos su sesión en el broker.
let mut removed = 0usize;
for id in prior_ids.difference(&new_ids) {
pool.drop_session(*id);
removed += 1;
if let Some(prev) = prior_monads.iter().find(|m| &m.id == id) {
eprintln!(
"[watcher] 🧲 {}{} ({:.4})",
path.display(),
m.label,
score
"[watcher] {} ({}) desapareció — sesión cerrada",
&id.to_string()[..8],
prev.label
);
}
Some((m, score)) => {
eprintln!(
"[watcher] · {}{} (mejor, {:.4} < umbral {:.4})",
path.display(),
m.label,
score,
embed::DEFAULT_ATTRACTION_THRESHOLD
);
}
None => {
eprintln!("[watcher] {} (ninguna Mónada con centroide compatible)", path.display());
}
}
// Mónadas nuevas o cuya composición cambió (members/centroid):
// (re)spawn — el pool reemplaza la sesión previa con el mismo id.
let mut respawned = 0usize;
let mut fresh = 0usize;
for monad in &monads {
let prev = prior_monads.iter().find(|m| m.id == monad.id);
let is_new = prev.is_none();
let changed = match prev {
Some(p) => p.members != monad.members || p.centroid != monad.centroid,
None => true,
};
if !changed {
continue;
}
let mut card = monad.to_brahman_card();
card.references.push(brahman_card::CardReference {
kind: brahman_card::RelationshipKind::OwnedBy,
target_id: engine_id,
target_label: engine_label.to_string(),
});
pool.spawn(card);
if is_new {
fresh += 1;
eprintln!(
"[watcher] ✦ {} nace ({} miembros, lens={:?})",
monad.label, monad.cardinality, monad.dominant_lens
);
} else {
respawned += 1;
let prev = prev.unwrap();
let delta_members = monad.members.len() as i64 - prev.members.len() as i64;
eprintln!(
"[watcher] ↻ {} refresh ({} miembros, Δ={:+})",
monad.label, monad.cardinality, delta_members
);
}
}
if removed == 0 && fresh == 0 && respawned == 0 {
eprintln!("[watcher] (sin cambios estructurales tras re-cluster)");
} else {
eprintln!(
"[watcher] ⌃ delta: {} nuevas, {} refrescadas, {} cerradas — {} sesiones vivas",
fresh,
respawned,
removed,
pool.live_sessions()
);
}
db_lock.ingest_files(files);
db_lock.replace_monads(monads);
}
fn cmd_attract(args: &[String]) -> Cmd {