Skip to content

Fix Ray deduplicator shared state#978

Open
macroguo-ghy wants to merge 4 commits into
datajuicer:mainfrom
macroguo-ghy:codex/fix-ray-dedup-state-971
Open

Fix Ray deduplicator shared state#978
macroguo-ghy wants to merge 4 commits into
datajuicer:mainfrom
macroguo-ghy:codex/fix-ray-dedup-state-971

Conversation

@macroguo-ghy

@macroguo-ghy macroguo-ghy commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Share Ray deduplicator backend state across map_batches tasks for a single execution.
  • Prepare Ray actor-backed dedup sets before serializing the operator into Ray tasks without recreating existing actor handles.
  • Materialize Ray basic deduplicator stats for all stateful backends, including Redis, before later actions can re-run the lazy stats stage.
  • Add regression coverage for document deduplication across Ray blocks, repeated executions, Redis materialization signaling, and actor handle reuse.
  • Fix Ray test helper conversion by using RayDataset.to_list() instead of iterating RayDataset directly.

Fixes #971.

Validation

python3 -m pytest tests/ops/deduplicator/test_ray_document_deduplicator.py -q

Result: 9 passed, 10 warnings in 69.43s.

@macroguo-ghy macroguo-ghy changed the title [codex] Fix Ray deduplicator shared state Fix Ray deduplicator shared state May 14, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to handle stateful operators within Ray datasets by allowing operators to trigger dataset materialization after execution. This change specifically addresses potential issues in deduplication where Ray's lazy re-execution could lead to incorrect results due to persistent state in actors or external backends. The feedback highlights that the RedisBackend should also trigger this materialization to prevent similar state conflicts and suggests refactoring the actor initialization logic to eliminate code duplication.

Comment thread data_juicer/ops/deduplicator/ray_basic_deduplicator.py Outdated
Comment thread data_juicer/ops/deduplicator/ray_basic_deduplicator.py Outdated
@macroguo-ghy macroguo-ghy marked this pull request as ready for review May 17, 2026 17:28
@cmgzn cmgzn requested review from Dludora and fengrui-z May 22, 2026 07:39
@fengrui-z

Copy link
Copy Markdown
Collaborator

Strength

  • Correct direction. Eagerly creating dedup actors on the driver and letting Ray pickle the handles to every worker is the canonical pattern for shared stateful operators — the fix targets the actual root cause.

Risks

  1. Loss of lazy autoscale. Actor count is now locked in at planning time based on current cluster_resources(), breaking ActorBackend's original deferred-creation design.

  2. materialize() buffers the full post-dedup dataset into the object store. Previously streaming — a real memory cost on large datasets worth noting in the PR description.

  3. Silently overrides ray_execution_mode='actor' for dedup ops. Downgrades to task mode without a log line; users may wonder why their config "didn't take effect."

  4. Hook is informal. Underscore-prefixed and getattr-probed instead of declared on Filter. Consider promoting to a formal Filter.prepare_for_ray_map_batches() -> bool API for future reuse.

  5. Repeated-read regression test depends on Ray's scheduling and didn't reproduce the original failure locally. A unit test that directly asserts materialize() was called would be more robust across Ray versions.

Recommendation

Approve and merge after noting the materialize() memory cost and lost lazy-autoscale in the PR description. Optionally: promote the hook to a formal Filter API.

When a dedup operator has ray_execution_mode='actor' but gets
downgraded to task mode to preserve shared dedup state, emit an
info log so users understand why their config was overridden.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RayBasicDeduplicator 懒加载策略导致无法实现全局去重

2 participants