diff --git a/golem-common/src/cache.rs b/golem-common/src/cache.rs index 24ab268da4..acc07089b5 100644 --- a/golem-common/src/cache.rs +++ b/golem-common/src/cache.rs @@ -47,8 +47,22 @@ pub struct Cache { full_cache_eviction: FullCacheEvictionMode, background_handle: Arc>>>, name: &'static str, + /// Test-only seam: when set, awaited inside the full-cache eviction between + /// snapshotting the entries to keep and committing the new size, so a test + /// can deterministically interleave a concurrent insert at that point. + #[cfg(test)] + evict_interleave: Arc>>, + /// Test-only seam: when set, awaited inside the full-cache eviction after + /// snapshotting the entries to keep but before retaining the map, so a test + /// can deterministically interleave concurrent evictions with stale + /// snapshots. + #[cfg(test)] + evict_before_retain_interleave: Arc>>, } +#[cfg(test)] +type EvictInterleaveHook = Arc Pin + Send>> + Send + Sync>; + pub trait SimpleCache { fn get_or_insert_simple(&self, key: &K, f: F) -> impl Future> where @@ -141,6 +155,10 @@ impl< full_cache_eviction, background_handle: Arc::new(Mutex::new(None)), name, + #[cfg(test)] + evict_interleave: Arc::new(Mutex::new(None)), + #[cfg(test)] + evict_before_retain_interleave: Arc::new(Mutex::new(None)), }; if let Some(capacity) = capacity { @@ -177,6 +195,33 @@ impl< cache } + /// Test-only: installs a hook awaited inside full-cache eviction between + /// computing the surviving entry set and committing the new size, so a test + /// can deterministically interleave a concurrent insert at that point. + #[cfg(test)] + fn set_evict_interleave(&self, hook: EvictInterleaveHook) { + *self.evict_interleave.lock().unwrap() = Some(hook); + } + + /// Test-only: removes the eviction interleave hook. + #[cfg(test)] + fn clear_evict_interleave(&self) { + *self.evict_interleave.lock().unwrap() = None; + } + + /// Test-only: installs a hook awaited inside full-cache eviction after the + /// surviving entry set is computed but before the map is retained. + #[cfg(test)] + fn set_evict_before_retain_interleave(&self, hook: EvictInterleaveHook) { + *self.evict_before_retain_interleave.lock().unwrap() = Some(hook); + } + + /// Test-only: removes the pre-retain eviction interleave hook. + #[cfg(test)] + fn clear_evict_before_retain_interleave(&self) { + *self.evict_before_retain_interleave.lock().unwrap() = None; + } + /// Tries to get a cached value for the given key. If the value is missing or is pending, it returns None. pub async fn try_get(&self, key: &K) -> Option { let result = self @@ -234,7 +279,7 @@ impl< { let mut eviction_needed = false; let result = { - let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst); + let own_id = self.state.last_id.fetch_add(1, Ordering::Relaxed); let result = self.get_or_add_as_pending(key, own_id, f1).await?; match result { Item::Pending { @@ -257,11 +302,12 @@ impl< }, ) .await; - let old_count = self.state.count.fetch_add(1, Ordering::SeqCst); + let old_count = self.state.count.fetch_add(1, Ordering::Relaxed); + let new_count = old_count.saturating_add(1); - record_cache_size(self.name, old_count.saturating_add(1)); + record_cache_size(self.name, new_count); - if Some(old_count) == self.capacity { + if self.capacity.is_some_and(|capacity| new_count > capacity) { eviction_needed = true; } } else { @@ -314,7 +360,7 @@ impl< F1: FnOnce() -> PV, F2: FnOnce(&PV) -> Pin> + Send>> + Send + 'static, { - let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst); + let own_id = self.state.last_id.fetch_add(1, Ordering::Relaxed); let result = self.get_or_add_as_pending(key, own_id, f1).await?; match result { Item::Pending { @@ -346,11 +392,15 @@ impl< ) .await; let old_count = - self_clone.state.count.fetch_add(1, Ordering::SeqCst); + self_clone.state.count.fetch_add(1, Ordering::Relaxed); + let new_count = old_count.saturating_add(1); - record_cache_size(self_clone.name, old_count.saturating_add(1)); + record_cache_size(self_clone.name, new_count); - if Some(old_count) == self_clone.capacity { + if self_clone + .capacity + .is_some_and(|capacity| new_count > capacity) + { eviction_needed = true; } } else { @@ -397,7 +447,7 @@ impl< F2: FnOnce(&PV) -> Pin> + Send>> + Send + 'static, { { - let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst); + let own_id = self.state.last_id.fetch_add(1, Ordering::Relaxed); let result = self.get_or_add_as_pending(key, own_id, f1).await?; match result { Item::Pending { @@ -429,11 +479,15 @@ impl< ) .await; let old_count = - self_clone.state.count.fetch_add(1, Ordering::SeqCst); + self_clone.state.count.fetch_add(1, Ordering::Relaxed); + let new_count = old_count.saturating_add(1); - record_cache_size(self_clone.name, old_count.saturating_add(1)); + record_cache_size(self_clone.name, new_count); - if Some(old_count) == self_clone.capacity { + if self_clone + .capacity + .is_some_and(|capacity| new_count > capacity) + { self_clone.evict().await; } } else { @@ -490,7 +544,7 @@ impl< pub async fn remove(&self, key: &K) { let removed = self.state.items.remove_async(key).await.is_some(); if removed { - let count = self.state.count.fetch_sub(1, Ordering::SeqCst); + let count = self.state.count.fetch_sub(1, Ordering::Relaxed); record_cache_size(self.name, count.saturating_sub(1)); } } @@ -530,7 +584,7 @@ impl< if let Some(state) = weak_state.upgrade() { let removed = state.items.remove_sync(&key).is_some(); if removed { - let count = state.count.fetch_sub(1, Ordering::SeqCst); + let count = state.count.fetch_sub(1, Ordering::Relaxed); record_cache_size(name, count.saturating_sub(1)); } } @@ -559,43 +613,107 @@ impl< } async fn evict_least_recently_used(&self, count: usize) { - let mut keys_to_keep = vec![]; + let mut cached = vec![]; self.state .items .iter_async(|key, value| { if let Item::Cached { last_access, .. } = value { - keys_to_keep.push((key.clone(), last_access.elapsed().as_millis())) + cached.push((key.clone(), last_access.elapsed().as_millis())) } true }) .await; - keys_to_keep.sort_by_key(|(_, v)| *v); - keys_to_keep.truncate(keys_to_keep.len().saturating_sub(count)); - let keys_to_keep: HashSet<&K> = keys_to_keep.iter().map(|(k, _)| k).collect(); + // Sort most-recently-used first (smallest elapsed first) so truncating + // the tail drops the oldest entries and keeps the newest. + cached.sort_by_key(|(_, elapsed)| *elapsed); + + // Keep at most `cached_len - count` entries, and never more than the + // configured capacity, so an over-capacity cache is always trimmed back + // down to the bound regardless of how far it overshot. + let cached_len = cached.len(); + let mut keep = cached_len.saturating_sub(count); + if let Some(capacity) = self.capacity { + keep = keep.min(capacity); + } + cached.truncate(keep); + + #[cfg(test)] + { + let hook = self.evict_before_retain_interleave.lock().unwrap().clone(); + if let Some(hook) = hook { + hook().await; + } + } + + let keys_to_keep: HashSet<&K> = cached.iter().map(|(k, _)| k).collect(); + let removed = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let removed_in_retain = removed.clone(); self.state .items .retain_async(|k, v| match v { - Item::Cached { .. } => keys_to_keep.contains(k), + Item::Cached { .. } => { + let keep = keys_to_keep.contains(k); + if !keep { + removed_in_retain.fetch_add(1, Ordering::Relaxed); + } + keep + } Item::Pending { .. } => true, }) .await; - self.state.count.store(keys_to_keep.len(), Ordering::SeqCst); - record_cache_size(self.name, keys_to_keep.len()); + + // Test-only seam: let a test interleave a concurrent insert here, after + // the surviving set has been computed but before the size is committed, + // to deterministically exercise the count race. + #[cfg(test)] + { + let hook = self.evict_interleave.lock().unwrap().clone(); + if let Some(hook) = hook { + hook().await; + } + } + + // Decrement by the number of cached entries this retain actually + // removed rather than by the stale snapshot's expected removal count. + // A blind store would clobber concurrent insert increments, and a + // snapshot-derived decrement would double-subtract when concurrent + // evictions try to remove the same entries. + let removed = removed.load(Ordering::Relaxed); + let new_count = self + .state + .count + .fetch_sub(removed, Ordering::Relaxed) + .saturating_sub(removed); + record_cache_size(self.name, new_count); } async fn evict_older_than(&self, ttl: Duration) { + let removed = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let removed_in_retain = removed.clone(); self.state .items .retain_async(|_, item| match item { - Item::Cached { last_access, .. } => last_access.elapsed() < ttl, + Item::Cached { last_access, .. } => { + let keep = last_access.elapsed() < ttl; + if !keep { + removed_in_retain.fetch_add(1, Ordering::Relaxed); + } + keep + } Item::Pending { .. } => true, }) .await; - let count = self.state.items.len(); - self.state.count.store(count, Ordering::SeqCst); - record_cache_size(self.name, count); + // Decrement by the number of cached entries actually removed rather than + // overwriting the counter, so concurrent insert increments are not lost. + let removed = removed.load(Ordering::Relaxed); + let new_count = self + .state + .count + .fetch_sub(removed, Ordering::Relaxed) + .saturating_sub(removed); + record_cache_size(self.name, new_count); } async fn update_last_access(&self, key: &K) { @@ -1316,6 +1434,218 @@ mod tests { assert!(cache.contains_key(&3).await); } + #[test] + async fn concurrent_lru_evictions_subtract_only_actual_removals() { + let capacity = 4usize; + let cache = bounded_cache(capacity, "concurrent_lru_evictions"); + + for i in 0..6u64 { + cache + .state + .items + .upsert_async( + i, + Item::Cached { + value: i, + last_access: Instant::now(), + }, + ) + .await; + tokio::time::sleep(Duration::from_millis(1)).await; + } + cache.state.count.store(6, Ordering::Relaxed); + + let arrived = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let arrived_for_hook = arrived.clone(); + cache.set_evict_before_retain_interleave(Arc::new(move || { + let arrived = arrived_for_hook.clone(); + Box::pin(async move { + arrived.fetch_add(1, Ordering::SeqCst); + for _ in 0..1000 { + if arrived.load(Ordering::SeqCst) >= 2 { + return; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + panic!("concurrent evictions did not both reach the pre-retain seam"); + }) + })); + + let evictions = vec![ + { + let cache = cache.clone(); + tokio::spawn(async move { cache.evict().await }) + }, + { + let cache = cache.clone(); + tokio::spawn(async move { cache.evict().await }) + }, + ]; + + tokio::time::timeout(Duration::from_secs(5), join_all(evictions)) + .await + .expect("concurrent evictions timed out"); + cache.clear_evict_before_retain_interleave(); + + assert_eq!(cache.iter().await.len(), capacity); + + cache + .get_or_insert_simple(&100u64, || async move { Ok(100) }) + .await + .unwrap(); + + let size = cache.iter().await.len(); + assert!( + size <= capacity, + "cache with capacity {capacity} grew to {size} cached entries after concurrent evictions; \ + count drifted below the real cached population" + ); + } + + #[test] + async fn capacity_holds_when_insert_races_eviction() { + // Deterministically reproduces the production count race. Capacity + // eviction snapshots the surviving entry set and then *blindly stores* + // that as the new size. If an insert lands between the snapshot and the + // store, its increment is clobbered, so the cache's notion of its own + // size drifts below the real number of cached entries. Because eviction + // is only triggered when an insert observes the size exactly equal to + // capacity, once the size has drifted below capacity the trigger is + // never hit again and the cache grows without bound. + // + // A test seam pauses eviction at exactly that window so the race is + // forced every run rather than relying on timing. + let capacity = 4usize; + let cache = bounded_cache(capacity, "capacity_race"); + + // Fill exactly to capacity. + for i in 0..capacity as u64 { + cache + .get_or_insert_simple(&i, || async move { Ok(i) }) + .await + .unwrap(); + } + assert_eq!(cache.iter().await.len(), capacity); + + // Arrange for a concurrent insert to fire while eviction is paused at the + // seam (after computing survivors, before committing the size). The hook + // runs once. + let cache_for_hook = cache.clone(); + let fired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let fired_clone = fired.clone(); + cache.set_evict_interleave(Arc::new(move || { + let cache = cache_for_hook.clone(); + let fired = fired_clone.clone(); + Box::pin(async move { + if !fired.swap(true, Ordering::SeqCst) { + // A unique insert completing inside the eviction window: its + // count increment will be clobbered by the eviction's store. + cache + .get_or_insert_simple(&1000u64, || async move { Ok(1000) }) + .await + .unwrap(); + } + }) + })); + + // This insert crosses capacity and triggers the (now racing) eviction. + cache + .get_or_insert_simple(&100u64, || async move { Ok(100) }) + .await + .unwrap(); + cache.clear_evict_interleave(); + + // After the clobber, insert more unique keys. With the bug, the size has + // drifted below capacity so the eviction trigger is never hit again and + // the real cached population grows unbounded past capacity. + for k in 0..50u64 { + cache + .get_or_insert_simple(&(2000 + k), || async move { Ok(2000 + k) }) + .await + .unwrap(); + } + + let size = cache.iter().await.len(); + assert!( + size <= capacity, + "cache with capacity {capacity} grew to {size} cached entries after an insert raced \ + eviction; the capacity bound is not being enforced" + ); + } + + #[test] + async fn spawned_capacity_holds_when_insert_races_eviction() { + let capacity = 4usize; + let cache = bounded_cache(capacity, "spawned_capacity_race"); + + for i in 0..capacity as u64 { + cache + .get_or_insert_simple_spawned(&i, move || async move { Ok(i) }) + .await + .unwrap(); + } + assert_eq!(cache.iter().await.len(), capacity); + + let completed = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let completed_for_hook = completed.clone(); + let fired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let fired_for_hook = fired.clone(); + cache.set_evict_interleave(Arc::new(move || { + let completed = completed_for_hook.clone(); + let fired = fired_for_hook.clone(); + Box::pin(async move { + if !fired.swap(true, Ordering::SeqCst) { + for _ in 0..1000 { + if completed.load(Ordering::SeqCst) >= 2 { + return; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + panic!("spawned inserts did not complete while eviction was interleaved"); + } + }) + })); + + let release = Arc::new(tokio::sync::Notify::new()); + let tasks: Vec<_> = [100u64, 101u64] + .into_iter() + .map(|key| { + let cache = cache.clone(); + let release = release.clone(); + let completed = completed.clone(); + tokio::spawn(async move { + let result = cache + .get_or_insert_simple_spawned(&key, move || async move { + release.notified().await; + Ok(key) + }) + .await; + completed.fetch_add(1, Ordering::SeqCst); + result + }) + }) + .collect(); + + tokio::time::sleep(Duration::from_millis(20)).await; + release.notify_waiters(); + + let results = tokio::time::timeout(Duration::from_secs(5), join_all(tasks)) + .await + .expect("spawned inserts timed out"); + for result in results { + result.unwrap().unwrap(); + } + + cache.clear_evict_interleave(); + + let size = cache.iter().await.len(); + assert!( + size <= capacity, + "spawned cache with capacity {capacity} grew to {size} cached entries after inserts raced \ + eviction; the capacity bound is not being enforced" + ); + } + #[test] async fn background_eviction_older_than_ttl() { let cache: Cache = Cache::new( diff --git a/golem-debugging-service/config/debug-worker-executor.sample.env b/golem-debugging-service/config/debug-worker-executor.sample.env index 4349e54ebe..9593b20889 100644 --- a/golem-debugging-service/config/debug-worker-executor.sample.env +++ b/golem-debugging-service/config/debug-worker-executor.sample.env @@ -18,6 +18,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem" GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage" GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled" GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32 +GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16 GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384 GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024 GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h" @@ -193,6 +194,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem" GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage" GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled" GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32 +GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16 GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384 GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024 GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h" diff --git a/golem-debugging-service/config/debug-worker-executor.toml b/golem-debugging-service/config/debug-worker-executor.toml index 01a81fd83a..ac6cd1b48a 100644 --- a/golem-debugging-service/config/debug-worker-executor.toml +++ b/golem-debugging-service/config/debug-worker-executor.toml @@ -39,6 +39,7 @@ type = "Enabled" [component_cache] max_capacity = 32 +max_concurrent_compilations = 16 max_metadata_capacity = 16384 max_resolved_component_capacity = 1024 time_to_idle = "12h" @@ -309,6 +310,7 @@ without_time = false # # [component_cache] # max_capacity = 32 +# max_concurrent_compilations = 16 # max_metadata_capacity = 16384 # max_resolved_component_capacity = 1024 # time_to_idle = "12h" diff --git a/golem-worker-executor/config/worker-executor.sample.env b/golem-worker-executor/config/worker-executor.sample.env index 914e348b8b..eb85248427 100644 --- a/golem-worker-executor/config/worker-executor.sample.env +++ b/golem-worker-executor/config/worker-executor.sample.env @@ -20,6 +20,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem" GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage" GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled" GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32 +GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16 GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384 GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024 GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h" @@ -254,6 +255,7 @@ GOLEM__BLOB_STORAGE__CONFIG__RETRIES__MIN_DELAY="100ms" GOLEM__BLOB_STORAGE__CONFIG__RETRIES__MULTIPLIER=3.0 GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled" GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32 +GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16 GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384 GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024 GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h" @@ -467,6 +469,7 @@ GOLEM__AGENT_WEBHOOKS_SERVICE__USE_HTTPS_FOR_WEBHOOK_URL=true GOLEM__BLOB_STORAGE__TYPE="InMemory" GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled" GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32 +GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16 GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384 GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024 GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h" diff --git a/golem-worker-executor/config/worker-executor.toml b/golem-worker-executor/config/worker-executor.toml index d0690a79d3..ac7e95c090 100644 --- a/golem-worker-executor/config/worker-executor.toml +++ b/golem-worker-executor/config/worker-executor.toml @@ -41,6 +41,7 @@ type = "Enabled" [component_cache] max_capacity = 32 +max_concurrent_compilations = 16 max_metadata_capacity = 16384 max_resolved_component_capacity = 1024 time_to_idle = "12h" @@ -399,6 +400,7 @@ without_time = false # # [component_cache] # max_capacity = 32 +# max_concurrent_compilations = 16 # max_metadata_capacity = 16384 # max_resolved_component_capacity = 1024 # time_to_idle = "12h" @@ -730,6 +732,7 @@ without_time = false # # [component_cache] # max_capacity = 32 +# max_concurrent_compilations = 16 # max_metadata_capacity = 16384 # max_resolved_component_capacity = 1024 # time_to_idle = "12h" diff --git a/golem-worker-executor/src/services/compilation_limiter.rs b/golem-worker-executor/src/services/compilation_limiter.rs new file mode 100644 index 0000000000..9e9152995a --- /dev/null +++ b/golem-worker-executor/src/services/compilation_limiter.rs @@ -0,0 +1,148 @@ +// Copyright 2024-2026 Golem Cloud +// +// Licensed under the Golem Source License v1.1 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://license.golem.cloud/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; +use tokio::sync::Semaphore; + +/// Bounds the number of concurrent local component compile fallback attempts. +/// +/// Worker admission accounts for guest linear memory and a shared +/// per-component-revision charge derived from registry metadata. It does not +/// reserve the transient working set used when the worker-executor in-process +/// component cache and compiled artifact store both miss. That fallback buffers +/// the raw component bytes, compiles them with Wasmtime, serializes the compiled +/// artifact, and writes it to the compiled artifact store. +/// +/// This limiter caps how many fallback attempts run at the same time; callers +/// beyond the limit wait for a permit. +/// A limit of `0` is treated as unlimited: the gate is bypassed entirely. +#[derive(Clone)] +pub struct CompilationLimiter { + semaphore: Option>, +} + +impl CompilationLimiter { + pub fn new(max_concurrent: usize) -> Self { + let semaphore = if max_concurrent == 0 { + None + } else { + Some(Arc::new(Semaphore::new(max_concurrent))) + }; + Self { semaphore } + } + + /// Runs `f` while holding a compilation permit, waiting for one if the + /// limiter is at its concurrency limit. With an unlimited limiter the + /// future runs immediately with no gating. + pub async fn run(&self, f: F) -> T + where + F: Future, + { + let _permit = match &self.semaphore { + Some(semaphore) => Some( + semaphore + .clone() + .acquire_owned() + .await + .expect("compilation limiter semaphore closed"), + ), + None => None, + }; + f.await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::join_all; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + use test_r::test; + + #[test] + async fn limits_concurrent_compilations_to_the_configured_maximum() { + let limit = 4usize; + let bursts = 200usize; + let limiter = CompilationLimiter::new(limit); + + let current = Arc::new(AtomicUsize::new(0)); + let observed_max = Arc::new(AtomicUsize::new(0)); + + let futs: Vec<_> = (0..bursts) + .map(|_| { + let limiter = limiter.clone(); + let current = current.clone(); + let observed_max = observed_max.clone(); + async move { + limiter + .run(async { + let now = current.fetch_add(1, Ordering::SeqCst) + 1; + observed_max.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(5)).await; + current.fetch_sub(1, Ordering::SeqCst); + }) + .await; + } + }) + .collect(); + + tokio::time::timeout(Duration::from_secs(30), join_all(futs)) + .await + .expect("gated compilations timed out"); + + let peak = observed_max.load(Ordering::SeqCst); + assert!( + peak <= limit, + "compilation limiter with limit {limit} allowed {peak} concurrent compilations" + ); + } + + #[test] + async fn unlimited_limiter_does_not_gate() { + let limiter = CompilationLimiter::new(0); + let current = Arc::new(AtomicUsize::new(0)); + let observed_max = Arc::new(AtomicUsize::new(0)); + let bursts = 50usize; + + let futs: Vec<_> = (0..bursts) + .map(|_| { + let limiter = limiter.clone(); + let current = current.clone(); + let observed_max = observed_max.clone(); + async move { + limiter + .run(async { + let now = current.fetch_add(1, Ordering::SeqCst) + 1; + observed_max.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(5)).await; + current.fetch_sub(1, Ordering::SeqCst); + }) + .await; + } + }) + .collect(); + + tokio::time::timeout(Duration::from_secs(30), join_all(futs)) + .await + .expect("ungated compilations timed out"); + + let peak = observed_max.load(Ordering::SeqCst); + assert!( + peak > 1, + "unlimited limiter should allow concurrent execution, observed max {peak}" + ); + } +} diff --git a/golem-worker-executor/src/services/component.rs b/golem-worker-executor/src/services/component.rs index 0cd81670b6..ec711e9677 100644 --- a/golem-worker-executor/src/services/component.rs +++ b/golem-worker-executor/src/services/component.rs @@ -14,6 +14,7 @@ use super::golem_config::ComponentCacheConfig; use crate::metrics::component::record_compilation_time; +use crate::services::compilation_limiter::CompilationLimiter; use async_trait::async_trait; use golem_common::SafeDisplay; use golem_common::cache::SimpleCache; @@ -102,6 +103,7 @@ pub fn configured( cache_config.max_capacity, cache_config.max_metadata_capacity, cache_config.time_to_idle, + cache_config.max_concurrent_compilations, compiled_component_service, )) } @@ -112,6 +114,7 @@ pub struct ComponentServiceDefault { current_component_metadata_cache: Cache, compiled_component_service: Arc, registry_client: Arc, + compilation_limiter: CompilationLimiter, } impl ComponentServiceDefault { @@ -120,6 +123,7 @@ impl ComponentServiceDefault { max_component_capacity: usize, max_metadata_capacity: usize, time_to_idle: Duration, + max_concurrent_compilations: usize, compiled_component_service: Arc, ) -> Self { Self { @@ -134,6 +138,7 @@ impl ComponentServiceDefault { time_to_idle, ), compiled_component_service, + compilation_limiter: CompilationLimiter::new(max_concurrent_compilations), } } } @@ -152,6 +157,7 @@ impl ComponentService for ComponentServiceDefault { }; let engine = engine.clone(); let compiled_component_service = self.compiled_component_service.clone(); + let compilation_limiter = self.compilation_limiter.clone(); let metadata = self .get_metadata(component_id, Some(component_revision)) .await?; @@ -176,52 +182,71 @@ impl ComponentService for ComponentServiceDefault { match component { Some(component) => Ok(component), None => { - let bytes = self - .registry_client - .download_component(component_id, component_revision) - .await - .map_err(|e| WorkerExecutorError::ComponentDownloadFailed { - component_id, - component_revision, - reason: e.to_safe_string(), - })?; - - let start = Instant::now(); - let span = info_span!("Loading WASM component"); - let component = spawn_blocking(move || { - let _enter = span.enter(); - wasmtime::component::Component::from_binary(&engine, &bytes) - .map_err(|e| WorkerExecutorError::ComponentParseFailed { - component_id, - component_revision, - reason: format!("{e}"), + // Bound the local compile fallback working set when + // both the in-process cache and compiled artifact + // store miss. + compilation_limiter + .run(async move { + let bytes = self + .registry_client + .download_component(component_id, component_revision) + .await + .map_err(|e| { + WorkerExecutorError::ComponentDownloadFailed { + component_id, + component_revision, + reason: e.to_safe_string(), + } + })?; + + let start = Instant::now(); + let span = info_span!("Loading WASM component"); + let component = spawn_blocking(move || { + let _enter = span.enter(); + wasmtime::component::Component::from_binary(&engine, &bytes) + .map_err(|e| { + WorkerExecutorError::ComponentParseFailed { + component_id, + component_revision, + reason: format!("{e}"), + } + }) }) - }) - .await - .map_err(|join_err| { - WorkerExecutorError::unknown(join_err.to_string()) - })??; - let end = Instant::now(); - - let compilation_time = end.duration_since(start); - record_compilation_time(compilation_time); - debug!( - "Compiled {} in {}ms", - component_id, - compilation_time.as_millis(), - ); - - let result = compiled_component_service - .put(environment_id, component_id, component_revision, &component) - .await; - - match result { - Ok(_) => Ok(component), - Err(err) => { - warn!("Failed to upload compiled component {:?}: {}", key, err); - Ok(component) - } - } + .await + .map_err(|join_err| { + WorkerExecutorError::unknown(join_err.to_string()) + })??; + let end = Instant::now(); + + let compilation_time = end.duration_since(start); + record_compilation_time(compilation_time); + debug!( + "Compiled {} in {}ms", + component_id, + compilation_time.as_millis(), + ); + + let result = compiled_component_service + .put( + environment_id, + component_id, + component_revision, + &component, + ) + .await; + + match result { + Ok(_) => Ok(component), + Err(err) => { + warn!( + "Failed to upload compiled component {:?}: {}", + key, err + ); + Ok(component) + } + } + }) + .await } } }) @@ -703,6 +728,7 @@ mod tests { 1, 8, Duration::from_secs(60), + 0, Arc::new(CompiledComponentServiceDisabled::new()), ); @@ -732,6 +758,7 @@ mod tests { 1, 8, Duration::from_secs(60), + 0, Arc::new(CompiledComponentServiceDisabled::new()), ); @@ -764,6 +791,7 @@ mod tests { 1, 8, Duration::from_secs(60), + 0, Arc::new(CompiledComponentServiceDisabled::new()), ); diff --git a/golem-worker-executor/src/services/golem_config.rs b/golem-worker-executor/src/services/golem_config.rs index 5b52b98f44..b7448a73c2 100644 --- a/golem-worker-executor/src/services/golem_config.rs +++ b/golem-worker-executor/src/services/golem_config.rs @@ -1247,6 +1247,11 @@ pub struct ComponentCacheConfig { pub max_capacity: usize, pub max_metadata_capacity: usize, pub max_resolved_component_capacity: usize, + /// Maximum number of local component compile fallback attempts allowed to + /// run concurrently on this executor. This bounds the transient working set + /// used after both the in-process cache and compiled artifact store miss. + /// `0` means unlimited. + pub max_concurrent_compilations: usize, #[serde(with = "humantime_serde")] pub time_to_idle: Duration, } @@ -1265,6 +1270,11 @@ impl SafeDisplay for ComponentCacheConfig { "max resolved component capacity: {}", self.max_resolved_component_capacity ); + let _ = writeln!( + &mut result, + "max concurrent compilations: {}", + self.max_concurrent_compilations + ); let _ = writeln!(&mut result, "time to idle: {:?}", self.time_to_idle); result } @@ -1819,6 +1829,7 @@ impl Default for ComponentCacheConfig { max_capacity: 32, max_metadata_capacity: 16384, max_resolved_component_capacity: 1024, + max_concurrent_compilations: 16, time_to_idle: Duration::from_secs(12 * 60 * 60), } } diff --git a/golem-worker-executor/src/services/mod.rs b/golem-worker-executor/src/services/mod.rs index 2f5b2d210f..b155bf3fa2 100644 --- a/golem-worker-executor/src/services/mod.rs +++ b/golem-worker-executor/src/services/mod.rs @@ -16,6 +16,7 @@ pub mod active_workers; pub mod agent_types; pub mod agent_webhooks; pub mod blob_store; +pub mod compilation_limiter; pub mod component; pub mod direct_invocation_auth; pub mod environment_state;