From 2f05d861b6669ebcb7730039f0a1c3210fef7901 Mon Sep 17 00:00:00 2001 From: Josh Carp Date: Wed, 1 Jul 2026 11:38:44 -0400 Subject: [PATCH] Oximeter: jitter collection intervals. As of this writing, the oximeter collector's database batcher can queue at most 100_000 samples. In realistic conditions, it can receive about 80_000 metrics at once: each mgs producer sends about 40_000 metrics per collection on a full rack, and with the two mgs producers sampled with the same interval and offset, we receive all their collected metrics at roughly the same time. As a result, even when overall metrics volume isn't particularly high, we can drop samples when too many arrive at once. To mitigate thundering herds of samples, this patch introduces a jittered offset to each collection task timer, offsetting by 0..interval. This doesn't save us from having to size the collector queue properly, but spreads out load over time on average. Part of #10552. --- oximeter/collector/src/agent.rs | 10 +++--- oximeter/collector/src/collection_task.rs | 44 +++++++++++++++-------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 94c932e751c..97594ff46d1 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -788,10 +788,7 @@ mod tests { async fn run_n_collections(collector: &OximeterAgent, id: Uuid, n: u64) { let mut details_rx = details_watcher(collector, id); for i in 1..=n { - // The first collection happens automatically, so skip over it here. - if i > 1 { - force_collect(collector, id); - } + force_collect(collector, id); wait_for_details( &mut details_rx, |details| details.n_collections + details.n_failures >= i, @@ -1101,8 +1098,9 @@ mod tests { // We don't manipulate time manually here, since this is pretty short // and we want to assert things about the actual timing in the test - // below. The first collection is triggered by the interval timer, - // which fires once immediately. + // below. We force the first collection to avoid waiting for its + // jittered offset. + force_collect(&collector, id); wait_for_condition( || async { // We need to check if the server has had a collection diff --git a/oximeter/collector/src/collection_task.rs b/oximeter/collector/src/collection_task.rs index 9fc55a8a461..2f96bc8e0da 100644 --- a/oximeter/collector/src/collection_task.rs +++ b/oximeter/collector/src/collection_task.rs @@ -32,7 +32,7 @@ use tokio::sync::watch; use tokio::time::Instant; use tokio::time::Interval; use tokio::time::MissedTickBehavior; -use tokio::time::interval; +use tokio::time::interval_at; use uuid::Uuid; /// Error returned when a forced collection fails. @@ -624,17 +624,12 @@ impl CollectionTask { }; // Construct self-collection statistics and our collection times. - // - // If we miss a tick, say because the results sink is full when we try - // to pass off our collection result, we'll delay the next tick rather - // than burst to catch up. let stats = self_stats::CollectionTaskStats::new(collector, &producer); - let mut collection_timer = interval(producer.interval); - collection_timer.set_missed_tick_behavior(MissedTickBehavior::Delay); - let mut self_collection_timer = - interval(self_stats::COLLECTION_INTERVAL); - self_collection_timer - .set_missed_tick_behavior(MissedTickBehavior::Delay); + + let now = Instant::now(); + let collection_timer = make_collection_timer(now, producer.interval); + let self_collection_timer = + make_collection_timer(now, self_stats::COLLECTION_INTERVAL); let self_ = Self { log, producer_details_tx, @@ -721,9 +716,8 @@ impl CollectionTask { self.producer_details_tx .send_modify(|details| details.update(&new_info)); self.stats.update(&new_info); - self.collection_timer = interval(new_info.interval); - self.collection_timer - .set_missed_tick_behavior(MissedTickBehavior::Delay); + self.collection_timer = + make_collection_timer(Instant::now(), new_info.interval); } /// Handle a single message from the task handle. @@ -928,3 +922,25 @@ impl CollectionTask { TaskAction::Continue(()) } } + +// Make a timer for a collection task. +// +// We inject random jitter from `0..interval` at the start of the interval to +// avoid a thundering herd. If many collection tasks with the same interval, or +// intervals with a common multiple, are started at the same time, they tick in +// lockstep and can overwhelm the database batcher queue and drop samples, even +// if the volume per unit time isn't particularly high. +fn make_collection_timer(now: Instant, interval: Duration) -> Interval { + let jitter = Duration::from_millis(rand::random_range( + 0..interval.as_millis().max(1) as u64, + )); + + let mut timer = interval_at(now + jitter, interval); + + // If we miss a tick, say because the results sink is full when we try + // to pass off our collection result, we'll delay the next tick rather + // than burst to catch up. + timer.set_missed_tick_behavior(MissedTickBehavior::Delay); + + timer +}