feat(shipote): CPU% + pipeline live-tail + replay por bytes (fase J)
- CPU% derivado server-side entre samples (WorkspaceState.last_cpu_sample). 100% = 1 core saturado. Primer sample devuelve None (sin baseline). - shipote pipeline run --tail: tras lanzar, suscribe al primer flow_socket y vuelca bytes hasta EOF. Auto-implica --tap. - DiscernPolicy.replay_bytes: cap adicional por bytes para el replay buffer del FlowChannel. evict_for_incoming considera el chunk entrante para que post-push el buffer NUNCA exceda los caps. - shipote-shell: stats history extiende sparkline con %CPU. 80 tests pasan (ente-incarnate 16, nouser-core 27, shipote-card 8, shipote-core 21, shipote-discern 5, yahweh-provider-fs 3). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -216,6 +216,12 @@ pub struct DiscernPolicy {
|
||||
/// querés que los consumidores tardíos vean toda la salida.
|
||||
#[serde(default = "default_replay_chunks")]
|
||||
pub replay_chunks: usize,
|
||||
/// Tope adicional por **bytes** acumulados en el replay buffer. Lo
|
||||
/// que se exceda primero (chunks o bytes) drop-ea el chunk más viejo.
|
||||
/// `0` = sin tope por bytes (sólo aplica `replay_chunks`). Útil para
|
||||
/// productores con chunks de tamaño variable.
|
||||
#[serde(default)]
|
||||
pub replay_bytes: usize,
|
||||
}
|
||||
|
||||
impl Default for DiscernPolicy {
|
||||
@@ -224,6 +230,7 @@ impl Default for DiscernPolicy {
|
||||
sample_bytes: default_sample_bytes(),
|
||||
enrich_producer: default_true(),
|
||||
replay_chunks: default_replay_chunks(),
|
||||
replay_bytes: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,16 +45,33 @@ pub const DEFAULT_REPLAY_CHUNKS: usize = 32;
|
||||
pub struct FlowChannel {
|
||||
sender: broadcast::Sender<Arc<Vec<u8>>>,
|
||||
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
|
||||
replay_cap: usize,
|
||||
replay_caps: ReplayCaps,
|
||||
socket_path: PathBuf,
|
||||
_accept_handle: AbortOnDrop,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ReplayCaps {
|
||||
/// Máximo de chunks retenidos.
|
||||
pub chunks: usize,
|
||||
/// Máximo de bytes (sumando len de chunks). `0` = sin tope.
|
||||
pub bytes: usize,
|
||||
}
|
||||
|
||||
impl ReplayCaps {
|
||||
pub fn chunks_only(chunks: usize) -> Self {
|
||||
Self { chunks: chunks.max(1), bytes: 0 }
|
||||
}
|
||||
pub fn new(chunks: usize, bytes: usize) -> Self {
|
||||
Self { chunks: chunks.max(1), bytes }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FlowSender {
|
||||
sender: broadcast::Sender<Arc<Vec<u8>>>,
|
||||
replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>>,
|
||||
replay_cap: usize,
|
||||
replay_caps: ReplayCaps,
|
||||
}
|
||||
|
||||
impl FlowSender {
|
||||
@@ -62,17 +79,37 @@ impl FlowSender {
|
||||
/// el broadcast::send retorna Err pero igual guardamos en replay
|
||||
/// (subscribers tarde verán los chunks pasados).
|
||||
pub fn send(&self, data: Arc<Vec<u8>>) {
|
||||
let cap = self.replay_cap;
|
||||
let incoming = data.len();
|
||||
let caps = self.replay_caps;
|
||||
if let Ok(mut g) = self.replay.lock() {
|
||||
if g.len() >= cap {
|
||||
g.pop_front();
|
||||
}
|
||||
evict_for_incoming(&mut g, caps, incoming);
|
||||
g.push_back(data.clone());
|
||||
}
|
||||
let _ = self.sender.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
/// Evict los chunks más viejos para hacer espacio a un chunk entrante de
|
||||
/// `incoming` bytes — el buffer post-push queda dentro de los caps.
|
||||
fn evict_for_incoming(buf: &mut VecDeque<Arc<Vec<u8>>>, caps: ReplayCaps, incoming: usize) {
|
||||
// 1) chunks: dejar lugar para 1 más.
|
||||
while buf.len() + 1 > caps.chunks {
|
||||
if buf.pop_front().is_none() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// 2) bytes (si está activado).
|
||||
if caps.bytes > 0 {
|
||||
let mut current: usize = buf.iter().map(|a| a.len()).sum();
|
||||
while current + incoming > caps.bytes {
|
||||
match buf.pop_front() {
|
||||
Some(c) => current = current.saturating_sub(c.len()),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FlowChannel {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("FlowChannel")
|
||||
@@ -86,11 +123,14 @@ impl FlowChannel {
|
||||
/// Crea un FlowChannel atado al path `socket_path`. Si el path ya
|
||||
/// existe, lo borra antes de bind (asume restart limpio).
|
||||
pub fn new(socket_path: PathBuf) -> std::io::Result<Self> {
|
||||
Self::with_replay_cap(socket_path, DEFAULT_REPLAY_CHUNKS)
|
||||
Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(DEFAULT_REPLAY_CHUNKS))
|
||||
}
|
||||
|
||||
pub fn with_replay_cap(socket_path: PathBuf, replay_cap: usize) -> std::io::Result<Self> {
|
||||
let cap = replay_cap.max(1);
|
||||
pub fn with_replay_cap(socket_path: PathBuf, chunks: usize) -> std::io::Result<Self> {
|
||||
Self::with_replay_caps(socket_path, ReplayCaps::chunks_only(chunks))
|
||||
}
|
||||
|
||||
pub fn with_replay_caps(socket_path: PathBuf, caps: ReplayCaps) -> std::io::Result<Self> {
|
||||
if socket_path.exists() {
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
@@ -100,7 +140,7 @@ impl FlowChannel {
|
||||
let listener = UnixListener::bind(&socket_path)?;
|
||||
let (tx, _rx_unused) = broadcast::channel::<Arc<Vec<u8>>>(BROADCAST_CAP);
|
||||
let replay: Arc<Mutex<VecDeque<Arc<Vec<u8>>>>> =
|
||||
Arc::new(Mutex::new(VecDeque::with_capacity(cap)));
|
||||
Arc::new(Mutex::new(VecDeque::with_capacity(caps.chunks)));
|
||||
let tx_for_accept = tx.clone();
|
||||
let replay_for_accept = replay.clone();
|
||||
let path_for_log = socket_path.clone();
|
||||
@@ -153,21 +193,21 @@ impl FlowChannel {
|
||||
Ok(Self {
|
||||
sender: tx,
|
||||
replay,
|
||||
replay_cap: cap,
|
||||
replay_caps: caps,
|
||||
socket_path,
|
||||
_accept_handle: AbortOnDrop(join.abort_handle()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Push un chunk al channel. Si no hay subscribers, drop silencioso.
|
||||
/// Siempre se guarda en el replay buffer (con cap rotation).
|
||||
/// Siempre se guarda en el replay buffer (con cap rotation por chunks
|
||||
/// y opcionalmente por bytes).
|
||||
pub fn send(&self, data: Vec<u8>) {
|
||||
let incoming = data.len();
|
||||
let arc = Arc::new(data);
|
||||
let cap = self.replay_cap;
|
||||
let caps = self.replay_caps;
|
||||
if let Ok(mut g) = self.replay.lock() {
|
||||
if g.len() >= cap {
|
||||
g.pop_front();
|
||||
}
|
||||
evict_for_incoming(&mut g, caps, incoming);
|
||||
g.push_back(arc.clone());
|
||||
}
|
||||
let _ = self.sender.send(arc);
|
||||
@@ -184,7 +224,7 @@ impl FlowChannel {
|
||||
FlowSender {
|
||||
sender: self.sender.clone(),
|
||||
replay: self.replay.clone(),
|
||||
replay_cap: self.replay_cap,
|
||||
replay_caps: self.replay_caps,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,6 +343,47 @@ mod tests {
|
||||
assert!(s.contains("chunk-3"), "got: {s:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replay_evicts_by_bytes_cap() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let path = tmp.path().join("flow.sock");
|
||||
// chunks=100 (no limita), bytes=20: deberíamos retener sólo los
|
||||
// últimos chunks cuyos bytes sumen ≤ 20.
|
||||
let ch = FlowChannel::with_replay_caps(path.clone(), ReplayCaps::new(100, 20)).unwrap();
|
||||
ch.send(b"AAAAAAAA".to_vec()); // 8 bytes
|
||||
ch.send(b"BBBBBBBB".to_vec()); // 8 → total 16
|
||||
ch.send(b"CCCCCCCC".to_vec()); // 8 → total 24 > 20, evict A → 16
|
||||
ch.send(b"DDDDDDDD".to_vec()); // 8 → total 24 > 20, evict B → 16
|
||||
|
||||
let path_clone = path.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
let mut stream = UnixStream::connect(&path_clone).await.unwrap();
|
||||
let mut buf = vec![0u8; 64];
|
||||
let mut total = Vec::new();
|
||||
for _ in 0..20 {
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
total.extend_from_slice(&buf[..n]);
|
||||
if total.len() >= 16 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
total
|
||||
});
|
||||
let got = tokio::time::timeout(std::time::Duration::from_secs(2), task)
|
||||
.await
|
||||
.expect("timeout")
|
||||
.unwrap();
|
||||
let s = String::from_utf8_lossy(&got);
|
||||
// Sólo C y D (los más viejos A y B fueron evicted).
|
||||
assert!(!s.contains("AAAA"), "should have evicted A: {s:?}");
|
||||
assert!(!s.contains("BBBB"), "should have evicted B: {s:?}");
|
||||
assert!(s.contains("CCCC"), "should keep C: {s:?}");
|
||||
assert!(s.contains("DDDD"), "should keep D: {s:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn drop_removes_socket() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -48,6 +48,9 @@ pub struct WorkspaceState {
|
||||
pub root_card: Card,
|
||||
pub commands: HashMap<Ulid, CommandState>,
|
||||
pub started: Instant,
|
||||
/// Última muestra de `(wall_instant, cpu_usec)` usada para calcular
|
||||
/// `cpu_percent` en la próxima medición. None hasta el primer measure.
|
||||
pub last_cpu_sample: Option<(Instant, u64)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -354,9 +357,12 @@ impl WorkspaceManager {
|
||||
/// comandos vivos. Lee `/proc/<pid>/` directamente; si el spec declara
|
||||
/// `soma.cgroup.path`, también intenta el cgroup (más preciso, incluye
|
||||
/// descendants).
|
||||
///
|
||||
/// `cpu_percent` se calcula entre samples consecutivos. Necesita ≥2
|
||||
/// llamadas para tener un valor (la primera siempre retorna `None`).
|
||||
pub async fn workspace_stats(&self, id: WorkspaceId) -> Option<stats::WorkspaceStats> {
|
||||
let g = self.inner.lock().await;
|
||||
let ws = g.workspaces.get(&id)?;
|
||||
let mut g = self.inner.lock().await;
|
||||
let ws = g.workspaces.get_mut(&id)?;
|
||||
let alive: Vec<i32> = ws
|
||||
.commands
|
||||
.values()
|
||||
@@ -367,8 +373,6 @@ impl WorkspaceManager {
|
||||
let cgroup_path = if ws.spec.soma.cgroup.path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
// resolve_cgroup_path está en ente_incarnate, pero acá basta
|
||||
// con el path absoluto bajo /sys/fs/cgroup. Resolución gruesa.
|
||||
Some(std::path::PathBuf::from(format!(
|
||||
"/sys/fs/cgroup{}",
|
||||
ws.spec.soma.cgroup.path
|
||||
@@ -376,6 +380,20 @@ impl WorkspaceManager {
|
||||
};
|
||||
let mut s = stats::measure(&alive, cgroup_path.as_deref(), ws.started);
|
||||
s.commands_total = total;
|
||||
|
||||
// CPU%: diff entre el sample actual y el previo, dividido por
|
||||
// wall time. 100% = 1 core saturado. >100% = varios cores.
|
||||
let now = Instant::now();
|
||||
if let Some(cpu_now) = s.cpu_usec {
|
||||
if let Some((prev_t, prev_cpu)) = ws.last_cpu_sample {
|
||||
let dt_us = now.duration_since(prev_t).as_micros() as u64;
|
||||
let d_cpu = cpu_now.saturating_sub(prev_cpu);
|
||||
if dt_us > 0 {
|
||||
s.cpu_percent = Some(100.0 * d_cpu as f32 / dt_us as f32);
|
||||
}
|
||||
}
|
||||
ws.last_cpu_sample = Some((now, cpu_now));
|
||||
}
|
||||
Some(s)
|
||||
}
|
||||
|
||||
@@ -403,6 +421,7 @@ impl WorkspaceManager {
|
||||
root_card: card,
|
||||
commands: HashMap::new(),
|
||||
started: Instant::now(),
|
||||
last_cpu_sample: None,
|
||||
};
|
||||
self.inner.lock().await.workspaces.insert(id, state);
|
||||
info!(%id, ?ttl, "workspace created");
|
||||
|
||||
@@ -211,7 +211,10 @@ pub async fn run_pipeline(
|
||||
i
|
||||
);
|
||||
let socket = crate::flow_channel::default_flow_socket_path(&id);
|
||||
match crate::flow_channel::FlowChannel::with_replay_cap(socket.clone(), spec.discern.replay_chunks) {
|
||||
match crate::flow_channel::FlowChannel::with_replay_caps(
|
||||
socket.clone(),
|
||||
crate::flow_channel::ReplayCaps::new(spec.discern.replay_chunks, spec.discern.replay_bytes),
|
||||
) {
|
||||
Ok(fc) => {
|
||||
senders_per_edge.push(Some(fc.sender_handle()));
|
||||
paths_per_edge.push(Some(socket));
|
||||
@@ -693,6 +696,7 @@ mod tests {
|
||||
sample_bytes: 4096,
|
||||
enrich_producer: true,
|
||||
replay_chunks: 32,
|
||||
replay_bytes: 0,
|
||||
},
|
||||
};
|
||||
let disc = Arc::new(DiscernPipeline::default_pipeline());
|
||||
|
||||
@@ -23,6 +23,9 @@ pub struct WorkspaceStats {
|
||||
pub rss_peak_bytes: Option<u64>,
|
||||
/// Tiempo CPU acumulado en microsegundos. `None` si no se pudo medir.
|
||||
pub cpu_usec: Option<u64>,
|
||||
/// %CPU instantáneo derivado entre dos samples consecutivos. `None`
|
||||
/// en el primer sample (no hay baseline). `100.0` = 1 core saturado.
|
||||
pub cpu_percent: Option<f32>,
|
||||
/// Fuente del dato: "proc" | "cgroup" | "mixed".
|
||||
pub source: String,
|
||||
/// Wall-clock uptime del workspace en milisegundos.
|
||||
@@ -67,6 +70,7 @@ pub fn measure(
|
||||
rss_bytes: rss,
|
||||
rss_peak_bytes: rss_peak,
|
||||
cpu_usec: cpu,
|
||||
cpu_percent: None, // El caller lo rellena con el diff vs prev sample.
|
||||
source,
|
||||
uptime_ms: workspace_started.elapsed().as_millis() as u64,
|
||||
}
|
||||
|
||||
@@ -216,6 +216,8 @@ pub struct WorkspaceStatsInfo {
|
||||
#[serde(default)]
|
||||
pub rss_peak_bytes: Option<u64>,
|
||||
pub cpu_usec: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub cpu_percent: Option<f32>,
|
||||
pub source: String,
|
||||
pub uptime_ms: u64,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user