Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions packages/nx/src/native/metrics/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
#[cfg(test)]
use crossbeam_channel::Receiver;
use crossbeam_channel::{Sender, unbounded};
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use napi::{Env, JsFunction};
use napi_derive::napi;
use parking_lot::Mutex;
Expand Down Expand Up @@ -80,6 +80,8 @@ struct CollectionRunner {
main_cli_pid: Arc<Mutex<Option<i32>>>,
main_cli_subprocess_pids: Arc<DashMap<i32, Option<String>>>,
daemon_pid: Arc<Mutex<Option<i32>>>,
/// PIDs registered since last baseline refresh (lock-free concurrent set)
pids_needing_baseline: Arc<DashSet<i32>>,

// Collection infrastructure
system: Arc<Mutex<System>>,
Expand All @@ -104,6 +106,7 @@ impl CollectionRunner {
main_cli_pid: Arc::clone(&collector.main_cli_pid),
main_cli_subprocess_pids: Arc::clone(&collector.main_cli_subprocess_pids),
daemon_pid: Arc::clone(&collector.daemon_pid),
pids_needing_baseline: Arc::clone(&collector.pids_needing_baseline),
system: Arc::clone(&collector.system),
config: collector.config.clone(),
process_metadata_map: Arc::clone(&collector.process_metadata_map),
Expand All @@ -112,9 +115,43 @@ impl CollectionRunner {
}
}

/// Establish CPU baselines for all processes when there are processes needing a baseline.
/// Uses bulk refresh to keep all process timing in sync (avoids per-PID refresh bug).
/// Returns true if baseline was actually performed, false if skipped (nothing to baseline).
fn run_baseline_if_needed(&self) -> bool {
if self.pids_needing_baseline.is_empty() {
return false;
}

trace!("New processes need baseline, running bulk CPU refresh");

{
let mut sys = self.system.lock();
sys.refresh_processes_specifics(
sysinfo::ProcessesToUpdate::All,
false, // don't remove dead processes; collection handles cleanup
ProcessRefreshKind::nothing().with_cpu(),
);
} // release system lock before clearing set

// Clear the set: bulk refresh establishes baselines for all processes
self.pids_needing_baseline.clear();
trace!("Baseline refresh complete");
true
}

/// Run the collection loop
fn run(self) {
let interval = Duration::from_millis(self.config.collection_interval_ms);
// sysinfo's MINIMUM_CPU_UPDATE_INTERVAL + 50ms safety buffer
let baseline_offset = sysinfo::MINIMUM_CPU_UPDATE_INTERVAL + Duration::from_millis(50);
let post_collection_sleep = interval - baseline_offset;

// First iteration: baseline if needed, then wait before first collection
if self.should_collect.load(Ordering::Acquire) && self.run_baseline_if_needed() {
// Sleep to allow CPU to be calculated correctly for the baselined processes
self.sleep_with_early_exit(baseline_offset);
}

while self.should_collect.load(Ordering::Acquire) {
// Collect current metrics and send to main collector thread
Expand All @@ -123,8 +160,15 @@ impl CollectionRunner {
.map(|result| self.send_metrics(result))
.ok();

// Sleep in small chunks so thread can exit quickly on shutdown
self.sleep_with_early_exit(interval);
// Sleep after collection, before baseline
self.sleep_with_early_exit(post_collection_sleep);
if !self.should_collect.load(Ordering::Acquire) {
break;
}

self.run_baseline_if_needed();
// Sleep until next collection (offset)
self.sleep_with_early_exit(baseline_offset);
}

self.is_collecting.store(false, Ordering::Release);
Expand Down Expand Up @@ -690,6 +734,10 @@ impl CollectionRunner {
daemon_pid_to_clear,
} = self.refresh_and_collect_metrics();

// Collection's bulk refresh established CPU baselines for all processes,
// so clear the tracking set to avoid redundant baseline refreshes
self.pids_needing_baseline.clear();

// Now that system lock is released, clear daemon PID if needed
// This avoids holding system lock while acquiring daemon_pid lock
if let Some(_pid) = daemon_pid_to_clear {
Expand Down Expand Up @@ -786,6 +834,8 @@ pub struct ProcessMetricsCollector {
main_cli_subprocess_pids: Arc<DashMap<i32, Option<String>>>,
/// Daemon process PID (can be updated when daemon connects)
daemon_pid: Arc<Mutex<Option<i32>>>,
/// PIDs registered since last baseline refresh (lock-free concurrent set)
pids_needing_baseline: Arc<DashSet<i32>>,
/// Cached CPU core count (set once at initialization)
cpu_cores: u32,
/// Cached total memory in bytes (set once at initialization)
Expand Down Expand Up @@ -837,6 +887,7 @@ impl ProcessMetricsCollector {
main_cli_pid: Arc::new(Mutex::new(None)),
main_cli_subprocess_pids: Arc::new(DashMap::new()),
daemon_pid: Arc::new(Mutex::new(None)),
pids_needing_baseline: Arc::new(DashSet::new()),
cpu_cores,
total_memory,
system: Arc::new(Mutex::new(sys)),
Expand Down Expand Up @@ -1063,6 +1114,8 @@ impl ProcessMetricsCollector {
pub fn register_main_cli_process(&self, pid: i32) {
trace!("Registering main CLI process: pid={}", pid);
*self.main_cli_pid.lock() = Some(pid);
// Track that this PID needs a baseline for accurate first CPU reading
self.pids_needing_baseline.insert(pid);
trace!("Main CLI process registered: pid={}", pid);
}

Expand All @@ -1074,6 +1127,8 @@ impl ProcessMetricsCollector {
pid, alias
);
self.main_cli_subprocess_pids.insert(pid, alias);
// Track that this PID needs a baseline for accurate first CPU reading
self.pids_needing_baseline.insert(pid);
trace!("Main CLI subprocess registered: pid={}", pid);
}

Expand All @@ -1082,6 +1137,8 @@ impl ProcessMetricsCollector {
pub fn register_daemon_process(&self, pid: i32) {
let mut daemon_pid = self.daemon_pid.lock();
*daemon_pid = Some(pid);
// Track that this PID needs a baseline for accurate first CPU reading
self.pids_needing_baseline.insert(pid);
}

/// Register a process for a specific task
Expand All @@ -1094,6 +1151,8 @@ impl ProcessMetricsCollector {
.or_insert_with(|| IndividualTaskRegistration::new(task_id.clone()))
.anchor_pids
.insert(pid);
// Track that this PID needs a baseline for accurate first CPU reading
self.pids_needing_baseline.insert(pid);
trace!("Task process registered: task_id={}, pid={}", task_id, pid);
}

Expand All @@ -1110,6 +1169,8 @@ impl ProcessMetricsCollector {
batch_id.clone(),
BatchRegistration::new(batch_id.clone(), task_ids, pid),
);
// Track that this PID needs a baseline for accurate first CPU reading
self.pids_needing_baseline.insert(pid);
trace!("Batch registered: batch_id={}, pid={}", batch_id, pid);
}

Expand Down Expand Up @@ -1163,6 +1224,7 @@ mod tests {
main_cli_pid: Arc::new(Mutex::new(None)),
main_cli_subprocess_pids: Arc::new(DashMap::new()),
daemon_pid: Arc::new(Mutex::new(None)),
pids_needing_baseline: Arc::new(DashSet::new()),
system: Arc::new(Mutex::new(System::new())),
config: CollectorConfig::default(),
process_metadata_map: Arc::new(DashMap::new()),
Expand Down
Loading