[Data] [Core] [3/n] Wire BlockRefCounter into Operators#64191
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized BlockRefCounter to track object-store memory usage per operator via Ray Core callbacks, integrating it across various physical operators and the streaming executor. The feedback focuses on enhancing the robustness of this counter: first, by replacing the set-based tracking of registered IDs with a dictionary to handle duplicate registrations and prevent memory leaks; second, by skipping tracking for zero-sized blocks to reduce overhead; third, by initializing the counter to a default instance in PhysicalOperator to avoid potential AttributeErrors in tests; and finally, by adding proper type annotations to the overridden start methods across all operator subclasses to ensure type safety.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Pull request overview
This PR introduces a centralized BlockRefCounter to track per-operator object store memory via Ray Core out-of-scope callbacks, wiring it through operator startup and the streaming executor lifecycle.
Changes:
- Add
BlockRefCounterimplementation + comprehensive unit/integration tests and Bazel target. - Plumb a shared counter from
ResourceManagerintoPhysicalOperator.start()and intoDataOpTaskso blocks are accounted once metadata is available. - Avoid double-counting for AllToAll bulk transforms that forward input refs unchanged; clear accounting on executor shutdown after queues are drained.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| python/ray/data/tests/test_streaming_executor.py | Adds a helper to construct DataOpTask with default BlockRefCounter/producer_id for existing tests. |
| python/ray/data/tests/test_operators.py | Updates operator tests to call start() explicitly where required. |
| python/ray/data/tests/test_block_ref_counter.py | New test suite for BlockRefCounter accounting, clear(), thread-safety, and lifecycle integration. |
| python/ray/data/BUILD.bazel | Registers the new test_block_ref_counter Bazel py_test. |
| python/ray/data/_internal/execution/streaming_executor.py | Starts all operators with a shared counter and clears the counter at shutdown after draining queues. |
| python/ray/data/_internal/execution/streaming_executor_state.py | Stops starting operators during topology construction (executor does it after ResourceManager exists). |
| python/ray/data/_internal/execution/resource_manager.py | Constructs and exposes an executor-wide BlockRefCounter. |
| python/ray/data/_internal/execution/operators/union_operator.py | Updates start() signature to accept/forward block_ref_counter. |
| python/ray/data/_internal/execution/operators/output_splitter.py | Updates start() signature to accept/forward block_ref_counter. |
| python/ray/data/_internal/execution/operators/map_operator.py | Updates start() signature; plumbs counter + producer id into DataOpTask. |
| python/ray/data/_internal/execution/operators/input_data_buffer.py | Updates start() signature to accept/forward block_ref_counter. |
| python/ray/data/_internal/execution/operators/hash_shuffle.py | Updates start() signature; plumbs counter + producer id into finalization DataOpTask. |
| python/ray/data/_internal/execution/operators/base_physical_operator.py | Prevents double-counting in AllToAll by skipping forwarded input refs. |
| python/ray/data/_internal/execution/operators/actor_pool_map_operator.py | Updates start() signature to accept/forward block_ref_counter. |
| python/ray/data/_internal/execution/interfaces/physical_operator.py | Extends PhysicalOperator.start() and DataOpTask to accept a shared counter + producer id; DataOpTask accounts blocks when metadata is ready. |
| python/ray/data/_internal/execution/block_ref_counter.py | Adds the BlockRefCounter implementation using Ray Core out-of-scope callbacks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
| block_ref_counter: Optional[BlockRefCounter], | ||
| producer_id: str, |
There was a problem hiding this comment.
An alternative implementation could be to make callers responsible for updating the reference counter with output_ready_callback rather than making it DataOpTask's responsibility.
Advantage is that it would reduce the amount we pass through block_ref_counter and simplify the DataOpTask interface, though I imagine it might also introduce a moderate amount of duplication.
Will defer you to about what's cleaner
| self, | ||
| task_index: int, | ||
| streaming_gen: ObjectRefGenerator, | ||
| block_ref_counter: Optional[BlockRefCounter], |
There was a problem hiding this comment.
When can this ever be None? We only create DataOpTasks once execution has already started, so I feel like we shouldn't allow it
| def start( | ||
| self, | ||
| options: ExecutionOptions, | ||
| block_ref_counter: Optional[BlockRefCounter] = None, |
There was a problem hiding this comment.
Similar comment -- can this ever actually be None? We control when start gets called, and we make it so that start always gets called with a non-None value.
Making this just block_ref_counter: BlockRefCounter would make the code easier to reason about
| @property | ||
| def block_ref_counter(self) -> BlockRefCounter: | ||
| """The centralized block reference counter for this executor.""" | ||
| return self._block_ref_counter | ||
|
|
There was a problem hiding this comment.
We can probably simplify the dataflow here.
Currently, it's like:
- Executor creates ResourceManager
- ResourceManager constructs counter
- Executor gets counter from ResourceManager
- Executor passes counter from ResourceManager to Operators
I think it'd be clearer as
- Executor constructs counter
- Executor passes counter to ResourceManager as constructor dependency
- Executor passes counter to operators in
start
There was a problem hiding this comment.
If we do this, we can also avoid expanding the ResourceManager interface with the block_ref_coutner proeprty
| kwargs.setdefault("block_ref_counter", BlockRefCounter()) | ||
| kwargs.setdefault("producer_id", "test_op") |
There was a problem hiding this comment.
Here and below -- I think the tests would be clearer if we just inlined these parameters and didn't introduce the _make_data_op_task layer of indirection
| # outputs (e.g., map outputs for map-reduce). | ||
| output_buffer, self._stats = self._bulk_fn(self._input_buffer.to_list(), ctx) | ||
|
|
||
| # Snapshot input refs before calling bulk_fn. Some bulk_fns (e.g. |
There was a problem hiding this comment.
Would it make sense/be simpler if we made on_block_produced idempotent?
Description
Wires
BlockRefCounterinto every operator and the streaming executor.ResourceManagergains the counter as an attribute in this PR, but_estimate_object_store_memory_usageis not changed to keep the scheduling-visible change isolated.Implementation
physical_operator.py.PhysicalOperator.start()now acceptsblock_ref_counter(replacesset_block_ref_counter()used in prototype per review).DataOpTask.on_data_readycallson_block_producedonce both the block ref and its metadata are available.base_physical_operator.py.AllToAllOperator.all_inputs_doneskipson_block_producedfor output refs that are unchanged from the input, avoiding double-counting whenbulk_fnforwards refs unchanged (e.g.randomize_blocks).streaming_executor_state.py/streaming_executor.py. Operators are no longer started insidebuild_streaming_topology. The executor starts them afterResourceManageris constructed so the shared counter can be passed in.block_ref_counter.clear()is called at shutdown after queues are drained.resource_manager.py. ConstructsBlockRefCounter()and exposes it via ablock_ref_counterproperty.Tests
test_operators.py: updated to removeset_block_ref_countercalls; addsop.start(ExecutionOptions())where missing.test_streaming_executor.py:_make_data_op_taskhelper supplies defaultblock_ref_counterandproducer_idso existingDataOpTasktests compile.Related issues
Depends on #64157 (
BlockRefCounterimplementation).Related to #63601 (prototype), #63074 (previous manual
BlockRefCounter).