From 2b0cfa53fdfe2982ffb171bef5514549ba99ba9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leosvel=20P=C3=A9rez=20Espinosa?= Date: Mon, 19 Jan 2026 14:38:34 +0100 Subject: [PATCH] fix(core): establish cpu baseline when possible to improve measurement accuracy --- packages/nx/src/native/metrics/collector.rs | 68 ++++++++++++++++++++- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/packages/nx/src/native/metrics/collector.rs b/packages/nx/src/native/metrics/collector.rs index dd9a200764b0a..01c5288b44e29 100644 --- a/packages/nx/src/native/metrics/collector.rs +++ b/packages/nx/src/native/metrics/collector.rs @@ -2,7 +2,7 @@ use anyhow::Result; #[cfg(test)] use crossbeam_channel::Receiver; use crossbeam_channel::{Sender, unbounded}; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use napi::{Env, JsFunction}; use napi_derive::napi; use parking_lot::Mutex; @@ -80,6 +80,8 @@ struct CollectionRunner { main_cli_pid: Arc>>, main_cli_subprocess_pids: Arc>>, daemon_pid: Arc>>, + /// PIDs registered since last baseline refresh (lock-free concurrent set) + pids_needing_baseline: Arc>, // Collection infrastructure system: Arc>, @@ -104,6 +106,7 @@ impl CollectionRunner { main_cli_pid: Arc::clone(&collector.main_cli_pid), main_cli_subprocess_pids: Arc::clone(&collector.main_cli_subprocess_pids), daemon_pid: Arc::clone(&collector.daemon_pid), + pids_needing_baseline: Arc::clone(&collector.pids_needing_baseline), system: Arc::clone(&collector.system), config: collector.config.clone(), process_metadata_map: Arc::clone(&collector.process_metadata_map), @@ -112,9 +115,43 @@ impl CollectionRunner { } } + /// Establish CPU baselines for all processes when there are processes needing a baseline. + /// Uses bulk refresh to keep all process timing in sync (avoids per-PID refresh bug). + /// Returns true if baseline was actually performed, false if skipped (nothing to baseline). + fn run_baseline_if_needed(&self) -> bool { + if self.pids_needing_baseline.is_empty() { + return false; + } + + trace!("New processes need baseline, running bulk CPU refresh"); + + { + let mut sys = self.system.lock(); + sys.refresh_processes_specifics( + sysinfo::ProcessesToUpdate::All, + false, // don't remove dead processes; collection handles cleanup + ProcessRefreshKind::nothing().with_cpu(), + ); + } // release system lock before clearing set + + // Clear the set: bulk refresh establishes baselines for all processes + self.pids_needing_baseline.clear(); + trace!("Baseline refresh complete"); + true + } + /// Run the collection loop fn run(self) { let interval = Duration::from_millis(self.config.collection_interval_ms); + // sysinfo's MINIMUM_CPU_UPDATE_INTERVAL + 50ms safety buffer + let baseline_offset = sysinfo::MINIMUM_CPU_UPDATE_INTERVAL + Duration::from_millis(50); + let post_collection_sleep = interval - baseline_offset; + + // First iteration: baseline if needed, then wait before first collection + if self.should_collect.load(Ordering::Acquire) && self.run_baseline_if_needed() { + // Sleep to allow CPU to be calculated correctly for the baselined processes + self.sleep_with_early_exit(baseline_offset); + } while self.should_collect.load(Ordering::Acquire) { // Collect current metrics and send to main collector thread @@ -123,8 +160,15 @@ impl CollectionRunner { .map(|result| self.send_metrics(result)) .ok(); - // Sleep in small chunks so thread can exit quickly on shutdown - self.sleep_with_early_exit(interval); + // Sleep after collection, before baseline + self.sleep_with_early_exit(post_collection_sleep); + if !self.should_collect.load(Ordering::Acquire) { + break; + } + + self.run_baseline_if_needed(); + // Sleep until next collection (offset) + self.sleep_with_early_exit(baseline_offset); } self.is_collecting.store(false, Ordering::Release); @@ -690,6 +734,10 @@ impl CollectionRunner { daemon_pid_to_clear, } = self.refresh_and_collect_metrics(); + // Collection's bulk refresh established CPU baselines for all processes, + // so clear the tracking set to avoid redundant baseline refreshes + self.pids_needing_baseline.clear(); + // Now that system lock is released, clear daemon PID if needed // This avoids holding system lock while acquiring daemon_pid lock if let Some(_pid) = daemon_pid_to_clear { @@ -786,6 +834,8 @@ pub struct ProcessMetricsCollector { main_cli_subprocess_pids: Arc>>, /// Daemon process PID (can be updated when daemon connects) daemon_pid: Arc>>, + /// PIDs registered since last baseline refresh (lock-free concurrent set) + pids_needing_baseline: Arc>, /// Cached CPU core count (set once at initialization) cpu_cores: u32, /// Cached total memory in bytes (set once at initialization) @@ -837,6 +887,7 @@ impl ProcessMetricsCollector { main_cli_pid: Arc::new(Mutex::new(None)), main_cli_subprocess_pids: Arc::new(DashMap::new()), daemon_pid: Arc::new(Mutex::new(None)), + pids_needing_baseline: Arc::new(DashSet::new()), cpu_cores, total_memory, system: Arc::new(Mutex::new(sys)), @@ -1063,6 +1114,8 @@ impl ProcessMetricsCollector { pub fn register_main_cli_process(&self, pid: i32) { trace!("Registering main CLI process: pid={}", pid); *self.main_cli_pid.lock() = Some(pid); + // Track that this PID needs a baseline for accurate first CPU reading + self.pids_needing_baseline.insert(pid); trace!("Main CLI process registered: pid={}", pid); } @@ -1074,6 +1127,8 @@ impl ProcessMetricsCollector { pid, alias ); self.main_cli_subprocess_pids.insert(pid, alias); + // Track that this PID needs a baseline for accurate first CPU reading + self.pids_needing_baseline.insert(pid); trace!("Main CLI subprocess registered: pid={}", pid); } @@ -1082,6 +1137,8 @@ impl ProcessMetricsCollector { pub fn register_daemon_process(&self, pid: i32) { let mut daemon_pid = self.daemon_pid.lock(); *daemon_pid = Some(pid); + // Track that this PID needs a baseline for accurate first CPU reading + self.pids_needing_baseline.insert(pid); } /// Register a process for a specific task @@ -1094,6 +1151,8 @@ impl ProcessMetricsCollector { .or_insert_with(|| IndividualTaskRegistration::new(task_id.clone())) .anchor_pids .insert(pid); + // Track that this PID needs a baseline for accurate first CPU reading + self.pids_needing_baseline.insert(pid); trace!("Task process registered: task_id={}, pid={}", task_id, pid); } @@ -1110,6 +1169,8 @@ impl ProcessMetricsCollector { batch_id.clone(), BatchRegistration::new(batch_id.clone(), task_ids, pid), ); + // Track that this PID needs a baseline for accurate first CPU reading + self.pids_needing_baseline.insert(pid); trace!("Batch registered: batch_id={}, pid={}", batch_id, pid); } @@ -1163,6 +1224,7 @@ mod tests { main_cli_pid: Arc::new(Mutex::new(None)), main_cli_subprocess_pids: Arc::new(DashMap::new()), daemon_pid: Arc::new(Mutex::new(None)), + pids_needing_baseline: Arc::new(DashSet::new()), system: Arc::new(Mutex::new(System::new())), config: CollectorConfig::default(), process_metadata_map: Arc::new(DashMap::new()),