feat(nakui-ui): snapshot/compaction automático del event log al startup

Wirea el snapshot machinery existente en nakui-core (Snapshot,
replay_with_snapshot_into, EventLog::compact_through) al runtime de
la UI. Reduce el costo de boot de O(events) a O(events_post_snapshot).

- Path: sibling del log, extensión `.snap.json`.
- Threshold via `NAKUI_SNAPSHOT_THRESHOLD` env (default 50; 0 = off).
- Helper `maybe_compact_log` captura snapshot + compacta dejando la
  última entry como anchor del cursor (sin anchor, EventLog::open
  vería log vacío y perdería next_seq).
- WAL order: write snap antes de compactar; un crash entre los dos
  da el mismo outcome al próximo boot (replay skipea entries
  cubiertas por snap).
- Fail-soft: snap load/compact errors no son fatales, sólo banner.

5 tests nuevos: derivación del path, dos no-ops bajo threshold, E2E
60 entries → snapshot+compact → reopen+replay → 60 records intactos.

31 tests verdes en nakui-ui. Workspace build verde.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sergio
2026-05-09 21:37:04 +00:00
parent 70f8c66548
commit 7e2be96a57
2 changed files with 371 additions and 9 deletions
+289 -9
View File
@@ -31,7 +31,9 @@ use gpui::{
Render, SharedString, Window, WindowBounds, WindowOptions,
};
use nakui_core::delta::{FieldOp, FieldPath};
use nakui_core::event_log::{execute_and_log_with_recovery, replay_into, EventLog, LogEntry};
use nakui_core::event_log::{
execute_and_log_with_recovery, replay_with_snapshot_into, EventLog, LogEntry, Snapshot,
};
use nakui_core::executor::Executor;
use nakui_core::store::{MemoryStore, Store};
use nakui_ui_schema::{
@@ -123,24 +125,61 @@ impl MetaUi {
),
};
// Persistencia: abrir/crear el event log y hacer replay al
// store. Path por env `NAKUI_EVENT_LOG`, default
// `./nakui-ui-state.jsonl`. Si abrir o replay falla, el
// runtime sigue en modo in-memory (sin persistencia) y el
// load_error se acumula al banner.
// Persistencia: abrir/crear el event log + opcionalmente un
// snapshot sibling para acortar el replay. Path del log por
// env `NAKUI_EVENT_LOG`, default `./nakui-ui-state.jsonl`. El
// snapshot vive como sibling con extensión `.snap.json`.
//
// Si abrir o replay falla, el runtime sigue en modo in-memory
// (sin persistencia) y el load_error se acumula al banner.
//
// Threshold de auto-compaction via env
// `NAKUI_SNAPSHOT_THRESHOLD` (default 50): después del replay,
// si el log file tiene >= N entries, capturamos un snapshot
// del store actual y compactamos el log. La próxima boot ya
// arranca de snapshot + log corto.
let log_path = std::env::var("NAKUI_EVENT_LOG")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("nakui-ui-state.jsonl"));
let snap_path = snapshot_path_for(&log_path);
let snapshot_threshold: usize = std::env::var("NAKUI_SNAPSHOT_THRESHOLD")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50);
let mut store = MemoryStore::new();
let mut initial_toast: Option<SharedString> = None;
// Cargar snapshot (si existe y no falla). Un snapshot
// corrupto no es fatal: caemos a full replay del log.
let snapshot: Option<Snapshot> = match Snapshot::load(&snap_path) {
Ok(s) => s,
Err(e) => {
let msg = format!(
"snapshot {}: {e} — full replay",
snap_path.display()
);
match &load_error {
Some(prev) => {
load_error = Some(SharedString::from(format!("{prev}; {msg}")));
}
None => load_error = Some(SharedString::from(msg)),
}
None
}
};
let event_log = match EventLog::open(&log_path) {
Ok(log) => {
match replay_into(&log, &mut store) {
Ok(mut log) => {
match replay_with_snapshot_into(&log, snapshot.as_ref(), &mut store) {
Ok(()) => {
let n = log.next_seq();
let from_snap = snapshot
.as_ref()
.map(|s| format!(" (snapshot @ seq {})", s.seq))
.unwrap_or_default();
if n > 0 {
initial_toast = Some(SharedString::from(format!(
"log {} cargado: {n} evento(s) replayed",
"log {} cargado: next_seq={n}{from_snap}",
log_path.display()
)));
} else {
@@ -149,6 +188,37 @@ impl MetaUi {
log_path.display()
)));
}
// Auto-compact si pasamos el threshold. No
// fatal — un fallo deja log+snap como están.
match maybe_compact_log(
&mut log,
&snap_path,
&store,
snapshot_threshold,
) {
Ok(Some(msg)) => {
let prev = initial_toast
.map(|t| t.to_string())
.unwrap_or_default();
initial_toast = Some(SharedString::from(format!(
"{prev}; {msg}"
)));
}
Ok(None) => {}
Err(e) => {
let msg = format!("auto-compact: {e}");
match &load_error {
Some(prev) => {
load_error = Some(SharedString::from(format!(
"{prev}; {msg}"
)));
}
None => load_error = Some(SharedString::from(msg)),
}
}
}
Some(Arc::new(Mutex::new(log)))
}
Err(e) => {
@@ -707,6 +777,69 @@ impl CommitOutcome {
}
}
/// Devuelve el path del snapshot sibling para un log dado:
/// `nakui-ui-state.jsonl` → `nakui-ui-state.snap.json`. Mantiene el
/// snapshot junto al log para que un usuario pueda mover la pareja
/// sin desincronizarlos.
fn snapshot_path_for(log_path: &std::path::Path) -> PathBuf {
log_path.with_extension("snap.json")
}
/// Si el log file tiene >= `threshold` entries, captura un snapshot
/// del store actual y compacta el log. Idempotente abajo del threshold.
///
/// Cursor invariant: `EventLog::open` re-deriva `next_seq` del primer
/// entry del archivo. Si compactáramos *todo*, al reabrir el cursor
/// volvería a 0 — el próximo append crashearía con NonMonotonic. Por
/// eso siempre dejamos la última entry como anchor: compactamos sólo
/// hasta `next_seq - 2`. La survivor entry del snap.seq queda en
/// disco pero `replay_with_snapshot_into` la skipea (snap ya cubre
/// hasta snap.seq inclusive), así el costo es 1 línea de log y el
/// resultado del replay es idéntico.
///
/// Orden de operaciones (importa por crash safety):
/// 1. Capturar snapshot en memoria.
/// 2. `Snapshot::write` (atómico via tempfile + fsync + rename).
/// 3. `EventLog::compact_through` (atómico igual).
/// Si (3) falla tras (2) éxito, el próximo boot ve snap@K + log con
/// entries 0..N — `replay_with_snapshot_into` skipea las cubiertas
/// por snap, outcome idéntico.
///
/// Devuelve `Ok(Some(msg))` si compactó, `Ok(None)` si no había
/// nada que hacer, `Err(s)` si snapshot/compact falló.
fn maybe_compact_log(
log: &mut EventLog,
snap_path: &std::path::Path,
store: &MemoryStore,
threshold: usize,
) -> Result<Option<String>, String> {
if threshold == 0 {
return Ok(None);
}
let entry_count = log
.entries()
.map_err(|e| format!("read entries: {e}"))?
.len();
if entry_count < threshold || entry_count < 2 {
// < 2 entries: la regla "dejar 1 como anchor" no permite
// dropear nada útil (sólo el anchor sobreviviría).
// entry_count<threshold también incluye el caso post-compact
// donde sobrevive 1 anchor: idempotente, no rebote infinito.
return Ok(None);
}
let snap_seq = log.next_seq() - 1;
let through = log.next_seq() - 2;
let snap = Snapshot::from_memory_store(store, snap_seq);
snap.write(snap_path)
.map_err(|e| format!("write snapshot {}: {e}", snap_path.display()))?;
log.compact_through(through)
.map_err(|e| format!("compact_through({through}): {e}"))?;
Ok(Some(format!(
"auto-compact: snapshot @ seq {snap_seq}, {} entries dropped (1 anchor kept)",
entry_count - 1
)))
}
/// Calcula el delta entre el record actual y los valores propuestos
/// del form. Devuelve un Map con sólo los campos cuyo valor difiere.
///
@@ -1689,6 +1822,153 @@ mod tests {
assert!(!delta.contains_key("internal_marker"));
}
#[test]
fn snapshot_path_for_replaces_extension() {
use std::path::Path;
assert_eq!(
snapshot_path_for(Path::new("nakui-ui-state.jsonl")),
std::path::PathBuf::from("nakui-ui-state.snap.json"),
);
// Sin extensión: agrega .snap.json.
assert_eq!(
snapshot_path_for(Path::new("/tmp/foo")),
std::path::PathBuf::from("/tmp/foo.snap.json"),
);
}
#[test]
fn maybe_compact_log_below_threshold_noops() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
drop(tmp);
let snap_path = snapshot_path_for(&path);
let mut log = EventLog::open(&path).unwrap();
for i in 0..5 {
log.append(LogEntry::Seed {
seq: i,
entity: "x".into(),
id: Uuid::new_v4(),
data: json!({"i": i}),
schema_hash: None,
})
.unwrap();
}
let store = MemoryStore::new();
let res = maybe_compact_log(&mut log, &snap_path, &store, 50).unwrap();
assert!(res.is_none(), "5 < 50 → no debería compactar");
assert_eq!(log.entries().unwrap().len(), 5, "log intacto");
assert!(!snap_path.exists(), "no debería haber escrito snapshot");
let _ = std::fs::remove_file(&path);
}
#[test]
fn maybe_compact_log_threshold_zero_noops() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
drop(tmp);
let snap_path = snapshot_path_for(&path);
let mut log = EventLog::open(&path).unwrap();
for i in 0..3 {
log.append(LogEntry::Seed {
seq: i,
entity: "x".into(),
id: Uuid::new_v4(),
data: json!({"i": i}),
schema_hash: None,
})
.unwrap();
}
let store = MemoryStore::new();
let res = maybe_compact_log(&mut log, &snap_path, &store, 0).unwrap();
assert!(res.is_none(), "threshold 0 = disabled");
let _ = std::fs::remove_file(&path);
}
/// E2E del ciclo completo: write log con N entries por encima del
/// threshold → maybe_compact_log captura snapshot y trunca el log
/// → re-open + replay con snapshot → store final == store que
/// resultaría de full replay sin snapshot.
#[test]
fn maybe_compact_log_then_reopen_preserves_records() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
drop(tmp);
let snap_path = snapshot_path_for(&path);
// 1. Escribir 60 seeds + popular store en sync.
let mut store = MemoryStore::new();
let mut ids = Vec::new();
let mut log = EventLog::open(&path).unwrap();
for i in 0..60u64 {
let id = Uuid::new_v4();
log.append(LogEntry::Seed {
seq: i,
entity: "row".into(),
id,
data: json!({"i": i}),
schema_hash: None,
})
.unwrap();
store.seed("row", id, json!({"i": i}));
ids.push(id);
}
assert_eq!(log.next_seq(), 60);
assert_eq!(log.entries().unwrap().len(), 60);
// 2. Compactar (60 >= 50).
let res = maybe_compact_log(&mut log, &snap_path, &store, 50).unwrap();
assert!(res.is_some(), "60 >= 50 debe compactar");
let msg = res.unwrap();
assert!(msg.contains("seq 59"), "msg debe incluir el seq: {msg}");
assert!(
msg.contains("59 entries dropped"),
"msg debe reportar 59 dropped (60 - 1 anchor): {msg}"
);
// 3. Verificar: snapshot existe, log queda con 1 entry
// (anchor del cursor), next_seq se preserva (60).
assert!(snap_path.exists(), "snapshot debería existir");
let log_after = EventLog::open(&path).unwrap();
assert_eq!(
log_after.entries().unwrap().len(),
1,
"log debería tener 1 anchor entry tras compact"
);
assert_eq!(
log_after.next_seq(),
60,
"next_seq se preserva via anchor entry"
);
// 4. Re-open + replay desde snapshot → todos los records.
// El anchor entry cae bajo snap.seq así que se skipea.
let snap = Snapshot::load(&snap_path).unwrap().expect("snap loadeable");
assert_eq!(snap.seq, 59);
let mut fresh_store = MemoryStore::new();
replay_with_snapshot_into(&log_after, Some(&snap), &mut fresh_store).unwrap();
for (i, id) in ids.iter().enumerate() {
assert_eq!(
fresh_store.load("row", *id),
Some(json!({"i": i as u64})),
"record {i} debería estar tras snapshot+replay"
);
}
// 5. Idempotencia: segunda corrida del compact con threshold=1
// no hace nada — queda 1 anchor entry, no hay nada útil
// que dropear.
let mut log_reopened = EventLog::open(&path).unwrap();
let res2 =
maybe_compact_log(&mut log_reopened, &snap_path, &fresh_store, 1).unwrap();
assert!(
res2.is_none(),
"post-compact: 1 anchor entry, segundo compact debe ser no-op"
);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(&snap_path);
}
#[test]
fn resolve_param_strict_number_parses_i64() {
let s = spec("qty", FieldKind::Number, true);