Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,7 @@ mod tests {
async fn run_n_collections(collector: &OximeterAgent, id: Uuid, n: u64) {
let mut details_rx = details_watcher(collector, id);
for i in 1..=n {
// The first collection happens automatically, so skip over it here.
if i > 1 {
force_collect(collector, id);
}
force_collect(collector, id);
wait_for_details(
&mut details_rx,
|details| details.n_collections + details.n_failures >= i,
Expand Down Expand Up @@ -1101,8 +1098,9 @@ mod tests {

// We don't manipulate time manually here, since this is pretty short
// and we want to assert things about the actual timing in the test
// below. The first collection is triggered by the interval timer,
// which fires once immediately.
// below. We force the first collection to avoid waiting for its
// jittered offset.
force_collect(&collector, id);
wait_for_condition(
|| async {
// We need to check if the server has had a collection
Expand Down
44 changes: 30 additions & 14 deletions oximeter/collector/src/collection_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::sync::watch;
use tokio::time::Instant;
use tokio::time::Interval;
use tokio::time::MissedTickBehavior;
use tokio::time::interval;
use tokio::time::interval_at;
use uuid::Uuid;

/// Error returned when a forced collection fails.
Expand Down Expand Up @@ -624,17 +624,12 @@ impl CollectionTask {
};

// Construct self-collection statistics and our collection times.
//
// If we miss a tick, say because the results sink is full when we try
// to pass off our collection result, we'll delay the next tick rather
// than burst to catch up.
let stats = self_stats::CollectionTaskStats::new(collector, &producer);
let mut collection_timer = interval(producer.interval);
collection_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut self_collection_timer =
interval(self_stats::COLLECTION_INTERVAL);
self_collection_timer
.set_missed_tick_behavior(MissedTickBehavior::Delay);

let now = Instant::now();
let collection_timer = make_collection_timer(now, producer.interval);
let self_collection_timer =
make_collection_timer(now, self_stats::COLLECTION_INTERVAL);
let self_ = Self {
log,
producer_details_tx,
Expand Down Expand Up @@ -721,9 +716,8 @@ impl CollectionTask {
self.producer_details_tx
.send_modify(|details| details.update(&new_info));
self.stats.update(&new_info);
self.collection_timer = interval(new_info.interval);
self.collection_timer
.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.collection_timer =
make_collection_timer(Instant::now(), new_info.interval);
}

/// Handle a single message from the task handle.
Expand Down Expand Up @@ -928,3 +922,25 @@ impl CollectionTask {
TaskAction::Continue(())
}
}

// Make a timer for a collection task.
//
// We inject random jitter from `0..interval` at the start of the interval to
// avoid a thundering herd. If many collection tasks with the same interval, or
// intervals with a common multiple, are started at the same time, they tick in
// lockstep and can overwhelm the database batcher queue and drop samples, even
// if the volume per unit time isn't particularly high.
fn make_collection_timer(now: Instant, interval: Duration) -> Interval {
let jitter = Duration::from_millis(rand::random_range(
0..interval.as_millis().max(1) as u64,
));

let mut timer = interval_at(now + jitter, interval);

// If we miss a tick, say because the results sink is full when we try
// to pass off our collection result, we'll delay the next tick rather
// than burst to catch up.
timer.set_missed_tick_behavior(MissedTickBehavior::Delay);

timer
}
Loading