Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 167 additions & 21 deletions golem-common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ pub struct Cache<K, PV, V, E> {
full_cache_eviction: FullCacheEvictionMode,
background_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
name: &'static str,
/// Test-only seam: when set, awaited inside the full-cache eviction between
/// snapshotting the entries to keep and committing the new size, so a test
/// can deterministically interleave a concurrent insert at that point.
#[cfg(test)]
evict_interleave: Arc<Mutex<Option<EvictInterleaveHook>>>,
}

#[cfg(test)]
type EvictInterleaveHook = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;

pub trait SimpleCache<K, V, E> {
fn get_or_insert_simple<F>(&self, key: &K, f: F) -> impl Future<Output = Result<V, E>>
where
Expand Down Expand Up @@ -141,6 +149,8 @@ impl<
full_cache_eviction,
background_handle: Arc::new(Mutex::new(None)),
name,
#[cfg(test)]
evict_interleave: Arc::new(Mutex::new(None)),
};

if let Some(capacity) = capacity {
Expand Down Expand Up @@ -177,6 +187,20 @@ impl<
cache
}

/// Test-only: installs a hook awaited inside full-cache eviction between
/// computing the surviving entry set and committing the new size, so a test
/// can deterministically interleave a concurrent insert at that point.
#[cfg(test)]
fn set_evict_interleave(&self, hook: EvictInterleaveHook) {
*self.evict_interleave.lock().unwrap() = Some(hook);
}

/// Test-only: removes the eviction interleave hook.
#[cfg(test)]
fn clear_evict_interleave(&self) {
*self.evict_interleave.lock().unwrap() = None;
}

/// Tries to get a cached value for the given key. If the value is missing or is pending, it returns None.
pub async fn try_get(&self, key: &K) -> Option<V> {
let result = self
Expand Down Expand Up @@ -234,7 +258,7 @@ impl<
{
let mut eviction_needed = false;
let result = {
let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst);
let own_id = self.state.last_id.fetch_add(1, Ordering::Relaxed);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This agent review comment seems valid:

1. get_or_insert_spawned still has the unfixed trigger + clobber-prone behavior — and a real bounded cache uses it.
The PR fixed the count race in get_or_insert and get_or_insert_pending (switched to new_count > capacity + Relaxed), but get_or_insert_spawned was left untouched:

let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst);          // line 342
...
let old_count = self_clone.state.count.fetch_add(1, Ordering::SeqCst);   // line 374
record_cache_size(self_clone.name, old_count.saturating_add(1));
if Some(old_count) == self_clone.capacity {                              // line 378 — the exact bug being fixed
    eviction_needed = true;
}

This path is reached via get_or_insert_simple_spawned, which worker_read_only_cache uses — a capacity-bounded cache (Some(cache_capacity), default 256, LeastRecentlyUsed(1)):

  • bounded cache creation: worker/mod.rs:440
  • usage: worker/mod.rs:1096

Worse: the PR replaced the eviction's count.store(survivors) with a relative fetch_sub, so count is no longer reset to the true survivor count. Under main's old code, the == capacity trigger could self-heal because every eviction reset the counter; now drift in this path is permanent, so once the read-only cache's count skips past capacity it can stop evicting and grow unbounded — exactly the failure the PR's own test description warns about.

Fix: apply the same change to get_or_insert_spawned (lines 374–378): let new_count = old_count.saturating_add(1); and if self_clone.capacity.is_some_and(|c| new_count > c). Also reconcile the SeqCst/Relaxed decision here. Consider adding a get_or_insert_simple_spawned capacity test, since the new race test only covers the non-spawned path.

let result = self.get_or_add_as_pending(key, own_id, f1).await?;
match result {
Item::Pending {
Expand All @@ -257,11 +281,12 @@ impl<
},
)
.await;
let old_count = self.state.count.fetch_add(1, Ordering::SeqCst);
let old_count = self.state.count.fetch_add(1, Ordering::Relaxed);
let new_count = old_count.saturating_add(1);

record_cache_size(self.name, old_count.saturating_add(1));
record_cache_size(self.name, new_count);

if Some(old_count) == self.capacity {
if self.capacity.is_some_and(|capacity| new_count > capacity) {
eviction_needed = true;
}
} else {
Expand Down Expand Up @@ -397,7 +422,7 @@ impl<
F2: FnOnce(&PV) -> Pin<Box<dyn Future<Output = Result<V, E>> + Send>> + Send + 'static,
{
{
let own_id = self.state.last_id.fetch_add(1, Ordering::SeqCst);
let own_id = self.state.last_id.fetch_add(1, Ordering::Relaxed);
let result = self.get_or_add_as_pending(key, own_id, f1).await?;
match result {
Item::Pending {
Expand Down Expand Up @@ -429,11 +454,15 @@ impl<
)
.await;
let old_count =
self_clone.state.count.fetch_add(1, Ordering::SeqCst);
self_clone.state.count.fetch_add(1, Ordering::Relaxed);
let new_count = old_count.saturating_add(1);

record_cache_size(self_clone.name, old_count.saturating_add(1));
record_cache_size(self_clone.name, new_count);

if Some(old_count) == self_clone.capacity {
if self_clone
.capacity
.is_some_and(|capacity| new_count > capacity)
{
self_clone.evict().await;
}
} else {
Expand Down Expand Up @@ -490,7 +519,7 @@ impl<
pub async fn remove(&self, key: &K) {
let removed = self.state.items.remove_async(key).await.is_some();
if removed {
let count = self.state.count.fetch_sub(1, Ordering::SeqCst);
let count = self.state.count.fetch_sub(1, Ordering::Relaxed);
record_cache_size(self.name, count.saturating_sub(1));
}
}
Expand Down Expand Up @@ -530,7 +559,7 @@ impl<
if let Some(state) = weak_state.upgrade() {
let removed = state.items.remove_sync(&key).is_some();
if removed {
let count = state.count.fetch_sub(1, Ordering::SeqCst);
let count = state.count.fetch_sub(1, Ordering::Relaxed);
record_cache_size(name, count.saturating_sub(1));
}
}
Expand Down Expand Up @@ -559,20 +588,31 @@ impl<
}

async fn evict_least_recently_used(&self, count: usize) {
let mut keys_to_keep = vec![];
let mut cached = vec![];
self.state
.items
.iter_async(|key, value| {
if let Item::Cached { last_access, .. } = value {
keys_to_keep.push((key.clone(), last_access.elapsed().as_millis()))
cached.push((key.clone(), last_access.elapsed().as_millis()))
}
true
})
.await;

keys_to_keep.sort_by_key(|(_, v)| *v);
keys_to_keep.truncate(keys_to_keep.len().saturating_sub(count));
let keys_to_keep: HashSet<&K> = keys_to_keep.iter().map(|(k, _)| k).collect();
// Sort most-recently-used first (smallest elapsed first) so truncating
// the tail drops the oldest entries and keeps the newest.
cached.sort_by_key(|(_, elapsed)| *elapsed);

// Keep at most `cached_len - count` entries, and never more than the
// configured capacity, so an over-capacity cache is always trimmed back
// down to the bound regardless of how far it overshot.
let cached_len = cached.len();
let mut keep = cached_len.saturating_sub(count);
if let Some(capacity) = self.capacity {
keep = keep.min(capacity);
}
cached.truncate(keep);
let keys_to_keep: HashSet<&K> = cached.iter().map(|(k, _)| k).collect();

self.state
.items
Expand All @@ -581,21 +621,56 @@ impl<
Item::Pending { .. } => true,
})
.await;
self.state.count.store(keys_to_keep.len(), Ordering::SeqCst);
record_cache_size(self.name, keys_to_keep.len());

// Test-only seam: let a test interleave a concurrent insert here, after
// the surviving set has been computed but before the size is committed,
// to deterministically exercise the count race.
#[cfg(test)]
{
let hook = self.evict_interleave.lock().unwrap().clone();
if let Some(hook) = hook {
hook().await;
}
}

// Decrement by the number of cached entries actually removed rather than
// overwriting the counter. A blind store would clobber the increments of
// inserts that completed concurrently with this eviction, drifting the
// size below the real count and disabling the capacity trigger.
let removed = cached_len.saturating_sub(keep);
let new_count = self
.state
.count
.fetch_sub(removed, Ordering::Relaxed)
.saturating_sub(removed);
record_cache_size(self.name, new_count);
}

async fn evict_older_than(&self, ttl: Duration) {
let removed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let removed_in_retain = removed.clone();
self.state
.items
.retain_async(|_, item| match item {
Item::Cached { last_access, .. } => last_access.elapsed() < ttl,
Item::Cached { last_access, .. } => {
let keep = last_access.elapsed() < ttl;
if !keep {
removed_in_retain.fetch_add(1, Ordering::Relaxed);
}
keep
}
Item::Pending { .. } => true,
})
.await;
let count = self.state.items.len();
self.state.count.store(count, Ordering::SeqCst);
record_cache_size(self.name, count);
// Decrement by the number of cached entries actually removed rather than
// overwriting the counter, so concurrent insert increments are not lost.
let removed = removed.load(Ordering::Relaxed);
let new_count = self
.state
.count
.fetch_sub(removed, Ordering::Relaxed)
.saturating_sub(removed);
record_cache_size(self.name, new_count);
}

async fn update_last_access(&self, key: &K) {
Expand Down Expand Up @@ -1316,6 +1391,77 @@ mod tests {
assert!(cache.contains_key(&3).await);
}

#[test(flavor = "multi_thread", worker_threads = 4)]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test-r does not have such attributes (all test runs in a shared multi-threaded tokio runtime)

async fn capacity_holds_when_insert_races_eviction() {
// Deterministically reproduces the production count race. Capacity
// eviction snapshots the surviving entry set and then *blindly stores*
// that as the new size. If an insert lands between the snapshot and the
// store, its increment is clobbered, so the cache's notion of its own
// size drifts below the real number of cached entries. Because eviction
// is only triggered when an insert observes the size exactly equal to
// capacity, once the size has drifted below capacity the trigger is
// never hit again and the cache grows without bound.
//
// A test seam pauses eviction at exactly that window so the race is
// forced every run rather than relying on timing.
let capacity = 4usize;
let cache = bounded_cache(capacity, "capacity_race");

// Fill exactly to capacity.
for i in 0..capacity as u64 {
cache
.get_or_insert_simple(&i, || async move { Ok(i) })
.await
.unwrap();
}
assert_eq!(cache.iter().await.len(), capacity);

// Arrange for a concurrent insert to fire while eviction is paused at the
// seam (after computing survivors, before committing the size). The hook
// runs once.
let cache_for_hook = cache.clone();
let fired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let fired_clone = fired.clone();
cache.set_evict_interleave(Arc::new(move || {
let cache = cache_for_hook.clone();
let fired = fired_clone.clone();
Box::pin(async move {
if !fired.swap(true, Ordering::SeqCst) {
// A unique insert completing inside the eviction window: its
// count increment will be clobbered by the eviction's store.
cache
.get_or_insert_simple(&1000u64, || async move { Ok(1000) })
.await
.unwrap();
}
})
}));

// This insert crosses capacity and triggers the (now racing) eviction.
cache
.get_or_insert_simple(&100u64, || async move { Ok(100) })
.await
.unwrap();
cache.clear_evict_interleave();

// After the clobber, insert more unique keys. With the bug, the size has
// drifted below capacity so the eviction trigger is never hit again and
// the real cached population grows unbounded past capacity.
for k in 0..50u64 {
cache
.get_or_insert_simple(&(2000 + k), || async move { Ok(2000 + k) })
.await
.unwrap();
}

let size = cache.iter().await.len();
assert!(
size <= capacity,
"cache with capacity {capacity} grew to {size} cached entries after an insert raced \
eviction; the capacity bound is not being enforced"
);
}

#[test]
async fn background_eviction_older_than_ttl() {
let cache: Cache<u64, (), u64, String> = Cache::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem"
GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage"
GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled"
GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32
GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16
GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384
GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024
GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h"
Expand Down Expand Up @@ -191,6 +192,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem"
GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage"
GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled"
GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32
GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16
GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384
GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024
GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h"
Expand Down
2 changes: 2 additions & 0 deletions golem-debugging-service/config/debug-worker-executor.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type = "Enabled"

[component_cache]
max_capacity = 32
max_concurrent_compilations = 16
max_metadata_capacity = 16384
max_resolved_component_capacity = 1024
time_to_idle = "12h"
Expand Down Expand Up @@ -307,6 +308,7 @@ without_time = false
#
# [component_cache]
# max_capacity = 32
# max_concurrent_compilations = 16
# max_metadata_capacity = 16384
# max_resolved_component_capacity = 1024
# time_to_idle = "12h"
Expand Down
3 changes: 3 additions & 0 deletions golem-worker-executor/config/worker-executor.sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ GOLEM__BLOB_STORAGE__TYPE="LocalFileSystem"
GOLEM__BLOB_STORAGE__CONFIG__ROOT="../data/blob_storage"
GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled"
GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32
GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16
GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384
GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024
GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h"
Expand Down Expand Up @@ -252,6 +253,7 @@ GOLEM__BLOB_STORAGE__CONFIG__RETRIES__MIN_DELAY="100ms"
GOLEM__BLOB_STORAGE__CONFIG__RETRIES__MULTIPLIER=3.0
GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled"
GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32
GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16
GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384
GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024
GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h"
Expand Down Expand Up @@ -463,6 +465,7 @@ GOLEM__AGENT_WEBHOOKS_SERVICE__USE_HTTPS_FOR_WEBHOOK_URL=true
GOLEM__BLOB_STORAGE__TYPE="InMemory"
GOLEM__COMPILED_COMPONENT_SERVICE__TYPE="Enabled"
GOLEM__COMPONENT_CACHE__MAX_CAPACITY=32
GOLEM__COMPONENT_CACHE__MAX_CONCURRENT_COMPILATIONS=16
GOLEM__COMPONENT_CACHE__MAX_METADATA_CAPACITY=16384
GOLEM__COMPONENT_CACHE__MAX_RESOLVED_COMPONENT_CAPACITY=1024
GOLEM__COMPONENT_CACHE__TIME_TO_IDLE="12h"
Expand Down
3 changes: 3 additions & 0 deletions golem-worker-executor/config/worker-executor.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type = "Enabled"

[component_cache]
max_capacity = 32
max_concurrent_compilations = 16
max_metadata_capacity = 16384
max_resolved_component_capacity = 1024
time_to_idle = "12h"
Expand Down Expand Up @@ -397,6 +398,7 @@ without_time = false
#
# [component_cache]
# max_capacity = 32
# max_concurrent_compilations = 16
# max_metadata_capacity = 16384
# max_resolved_component_capacity = 1024
# time_to_idle = "12h"
Expand Down Expand Up @@ -726,6 +728,7 @@ without_time = false
#
# [component_cache]
# max_capacity = 32
# max_concurrent_compilations = 16
# max_metadata_capacity = 16384
# max_resolved_component_capacity = 1024
# time_to_idle = "12h"
Expand Down
Loading
Loading