feat(sidecar): Phase B-3 — SidecarPool consolida sidecars en un runtime
Antes: cada spawn(card) creaba un thread + tokio runtime propio.
Para módulos con muchas sesiones (nouser daemon con 50+ Mónadas)
eso es 50 threads + 50 runtimes. Ahora: un thread + un runtime
tokio current_thread que hostea N tasks de sidecar.
API nueva (aditiva, no rompe spawn/spawn_with_handle):
let pool = SidecarPool::new()?;
pool.spawn(card1);
pool.spawn(card2);
pool.spawn_conscious(card_with_wit, wit);
pool.spawn_with_config(custom_config);
// pool drop = todas las sesiones cierran.
run_client se hace pública para que el pool pueda enqueuar tasks
externos al runtime con handle.spawn(run_client(config)).
nouser daemon migrado al pool. Verificación con ps -L:
$ ps -L -p $(pidof nouser)
LWP CMD
28817 nouser # main thread
28819 brahman-sidecar # pool thread (todas las sesiones)
Antes serían 6+ LWP (1 main + N sesiones). Ahora 2 fijos sin
importar cuántas Mónadas se publiquen.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,34 @@ ratio/diff ver `git show <sha>`.
|
|||||||
|
|
||||||
## 2026-05-08
|
## 2026-05-08
|
||||||
|
|
||||||
|
### feat(sidecar): Phase B-3 — SidecarPool consolida en un runtime
|
||||||
|
Antes: cada `spawn(card)` creaba un thread + tokio runtime propio.
|
||||||
|
Para módulos que publican muchas sesiones (nouser daemon con 50+
|
||||||
|
Mónadas) eso es 50 threads + 50 runtimes. Ahora: **un thread + un
|
||||||
|
runtime tokio current_thread** que hostea N tasks de sidecar.
|
||||||
|
|
||||||
|
API nueva (aditiva, no rompe `spawn`/`spawn_with_handle`):
|
||||||
|
|
||||||
|
let pool = SidecarPool::new()?;
|
||||||
|
pool.spawn(card1);
|
||||||
|
pool.spawn(card2);
|
||||||
|
pool.spawn_conscious(card_wit, wit);
|
||||||
|
pool.spawn_with_config(SidecarConfig::new(c).with_wit(w));
|
||||||
|
// pool drop = todas las sesiones cierran.
|
||||||
|
|
||||||
|
`run_client` se hace pública para que el pool pueda enqueuar tasks
|
||||||
|
externos al runtime con `handle.spawn(run_client(config))`.
|
||||||
|
|
||||||
|
`nouser daemon` migrado al pool. Verificación con `ps -L`:
|
||||||
|
|
||||||
|
$ ps -L -p $(pidof nouser)
|
||||||
|
LWP CMD
|
||||||
|
28817 nouser # main thread
|
||||||
|
28819 brahman-sidecar # pool thread (todas las sesiones)
|
||||||
|
|
||||||
|
Antes serían 6+ LWP (1 main + N sesiones); ahora 2 fijos sin importar
|
||||||
|
cuántas Mónadas se publiquen.
|
||||||
|
|
||||||
### feat: Crossreferencia — Card.references como grafo del fractal
|
### feat: Crossreferencia — Card.references como grafo del fractal
|
||||||
Las Cards ahora declaran sus relaciones con otras Cards. El Engine
|
Las Cards ahora declaran sus relaciones con otras Cards. El Engine
|
||||||
posee Mónadas; las Mónadas declaran que son poseídas por el Engine.
|
posee Mónadas; las Mónadas declaran que son poseídas por el Engine.
|
||||||
|
|||||||
@@ -164,6 +164,11 @@ fn cmd_json(args: &[String]) -> Cmd {
|
|||||||
fn cmd_daemon(args: &[String]) -> Cmd {
|
fn cmd_daemon(args: &[String]) -> Cmd {
|
||||||
let dir = require_dir(args)?;
|
let dir = require_dir(args)?;
|
||||||
|
|
||||||
|
// Pool consolidado: 1 thread + 1 tokio runtime para TODAS las
|
||||||
|
// sesiones (engine + N mónadas). Antes era 1 thread por sesión.
|
||||||
|
let pool = brahman_sidecar::SidecarPool::new()
|
||||||
|
.map_err(|e| format!("crear pool: {e}"))?;
|
||||||
|
|
||||||
// 1. El propio engine se presenta como Ente.
|
// 1. El propio engine se presenta como Ente.
|
||||||
let engine_card = build_engine_card();
|
let engine_card = build_engine_card();
|
||||||
let engine_id = engine_card.id;
|
let engine_id = engine_card.id;
|
||||||
@@ -172,7 +177,7 @@ fn cmd_daemon(args: &[String]) -> Cmd {
|
|||||||
"nouser daemon: publicando engine '{}' (kind=Ente, id={})",
|
"nouser daemon: publicando engine '{}' (kind=Ente, id={})",
|
||||||
engine_label, engine_id
|
engine_label, engine_id
|
||||||
);
|
);
|
||||||
brahman_sidecar::spawn(engine_card);
|
pool.spawn(engine_card);
|
||||||
|
|
||||||
// 2. Scan y cluster.
|
// 2. Scan y cluster.
|
||||||
let (db, n_files) = run_scan(&dir)?;
|
let (db, n_files) = run_scan(&dir)?;
|
||||||
@@ -184,10 +189,9 @@ fn cmd_daemon(args: &[String]) -> Cmd {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// 3. Cada Mónada se presenta como Card de tipo Data, declarando
|
// 3. Cada Mónada se presenta como Card de tipo Data, declarando
|
||||||
// su relación OwnedBy con el engine. La UI puede entonces
|
// su relación OwnedBy con el engine. Todas comparten el runtime
|
||||||
// cruzar referencias para reconstruir el grafo
|
// del pool — sin overhead de N threads.
|
||||||
// "nouser_engine posee Mónada X" sin lookup adicional.
|
let mut count = 0usize;
|
||||||
let mut handles = Vec::with_capacity(db.monad_count());
|
|
||||||
for monad in db.monads() {
|
for monad in db.monads() {
|
||||||
let mut card = monad.to_brahman_card();
|
let mut card = monad.to_brahman_card();
|
||||||
card.references.push(brahman_card::CardReference {
|
card.references.push(brahman_card::CardReference {
|
||||||
@@ -195,23 +199,17 @@ fn cmd_daemon(args: &[String]) -> Cmd {
|
|||||||
target_id: engine_id,
|
target_id: engine_id,
|
||||||
target_label: engine_label.clone(),
|
target_label: engine_label.clone(),
|
||||||
});
|
});
|
||||||
match brahman_sidecar::spawn_with_handle(brahman_sidecar::SidecarConfig::new(card)) {
|
pool.spawn(card);
|
||||||
Ok(h) => handles.push(h),
|
count += 1;
|
||||||
Err(e) => eprintln!(
|
|
||||||
"nouser daemon: falló sidecar para mónada '{}': {e}",
|
|
||||||
monad.label
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"nouser daemon: {} sidecars activos (1 ente + {} data). Ctrl-C para terminar.",
|
"nouser daemon: 1 ente + {} mónadas en pool consolidado. Ctrl-C para terminar.",
|
||||||
handles.len() + 1,
|
count
|
||||||
handles.len()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// 4. Park: el proceso principal queda dormido, los sidecars siguen
|
// 4. Park: el thread del pool sigue vivo mientras `pool` exista.
|
||||||
// pingueando en sus threads.
|
|
||||||
std::thread::park();
|
std::thread::park();
|
||||||
|
drop(pool);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
#![warn(rust_2018_idioms)]
|
#![warn(rust_2018_idioms)]
|
||||||
|
|
||||||
|
use std::sync::mpsc;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -79,6 +80,85 @@ pub fn spawn_with_handle(config: SidecarConfig) -> std::io::Result<JoinHandle<()
|
|||||||
.spawn(move || run_thread(config))
|
.spawn(move || run_thread(config))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =====================================================================
|
||||||
|
// SidecarPool — un solo runtime tokio compartido por N sesiones
|
||||||
|
// =====================================================================
|
||||||
|
|
||||||
|
/// Pool consolidado: un único thread con un runtime tokio
|
||||||
|
/// `current_thread` que hostea N tasks de sidecar simultáneas.
|
||||||
|
///
|
||||||
|
/// Para módulos con muchas sesiones (p. ej. `nouser daemon` publicando
|
||||||
|
/// 50+ Mónadas), evita el costo de tener un thread+runtime por cada
|
||||||
|
/// sesión.
|
||||||
|
///
|
||||||
|
/// **API**:
|
||||||
|
/// - `SidecarPool::new()` crea el pool (spawn del thread runtime).
|
||||||
|
/// - `pool.spawn(card)` añade una sesión sin WIT.
|
||||||
|
/// - `pool.spawn_conscious(card, wit)` añade una sesión con WIT.
|
||||||
|
/// - `pool.spawn_with_config(config)` para configuración custom.
|
||||||
|
///
|
||||||
|
/// El pool se mantiene vivo mientras exista. Si el `SidecarPool`
|
||||||
|
/// se dropea, el thread interno termina y todas las sesiones cierran.
|
||||||
|
pub struct SidecarPool {
|
||||||
|
handle: tokio::runtime::Handle,
|
||||||
|
_thread: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SidecarPool {
|
||||||
|
/// Crea un pool nuevo. Bloquea hasta que el runtime esté listo.
|
||||||
|
pub fn new() -> std::io::Result<Self> {
|
||||||
|
let (handle_tx, handle_rx) = mpsc::sync_channel::<tokio::runtime::Handle>(0);
|
||||||
|
let thread = std::thread::Builder::new()
|
||||||
|
.name("brahman-sidecar-pool".into())
|
||||||
|
.spawn(move || {
|
||||||
|
let rt = match tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.build()
|
||||||
|
{
|
||||||
|
Ok(rt) => rt,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "tokio runtime falló — pool muerto");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if handle_tx.send(rt.handle().clone()).is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Mantenemos el runtime vivo mientras existan tasks.
|
||||||
|
rt.block_on(std::future::pending::<()>());
|
||||||
|
})?;
|
||||||
|
let handle = handle_rx
|
||||||
|
.recv()
|
||||||
|
.map_err(|_| std::io::Error::other("pool runtime no respondió"))?;
|
||||||
|
Ok(Self {
|
||||||
|
handle,
|
||||||
|
_thread: thread,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Añade una sesión agnóstica al pool (sin WIT).
|
||||||
|
pub fn spawn(&self, card: Card) {
|
||||||
|
self.spawn_with_config(SidecarConfig::new(card));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Añade una sesión consciente (con WIT) al pool.
|
||||||
|
pub fn spawn_conscious(&self, card: Card, wit: WitInterface) {
|
||||||
|
self.spawn_with_config(SidecarConfig::new(card).with_wit(wit));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Añade una sesión con configuración custom.
|
||||||
|
pub fn spawn_with_config(&self, config: SidecarConfig) {
|
||||||
|
self.handle.spawn(run_client(config));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SidecarPool {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new().expect("falló SidecarPool::new")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn run_thread(config: SidecarConfig) {
|
fn run_thread(config: SidecarConfig) {
|
||||||
let rt = match tokio::runtime::Builder::new_current_thread()
|
let rt = match tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_io()
|
.enable_io()
|
||||||
@@ -94,7 +174,9 @@ fn run_thread(config: SidecarConfig) {
|
|||||||
rt.block_on(run_client(config));
|
rt.block_on(run_client(config));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_client(config: SidecarConfig) {
|
/// Bucle async del sidecar. Público para que `SidecarPool` lo use vía
|
||||||
|
/// `handle.spawn(run_client(...))` desde código externo al runtime.
|
||||||
|
pub async fn run_client(config: SidecarConfig) {
|
||||||
let path = transport::default_socket_path();
|
let path = transport::default_socket_path();
|
||||||
let conscious = config.wit.is_some();
|
let conscious = config.wit.is_some();
|
||||||
let mut client = match Client::connect_with(&path, config.card, config.wit).await {
|
let mut client = match Client::connect_with(&path, config.card, config.wit).await {
|
||||||
|
|||||||
Reference in New Issue
Block a user