Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,20 @@ py_test(
],
)

py_test(
name = "test_block_ref_counter",
size = "small",
srcs = ["tests/test_block_ref_counter.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_map_operator",
size = "medium",
Expand Down
72 changes: 72 additions & 0 deletions python/ray/data/_internal/execution/block_ref_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import threading
from collections import defaultdict
from typing import Dict

import ray
import ray._private.worker


class BlockRefCounter:
"""Tracks object-store memory usage per operator via Ray Core callbacks.

The callback fires when:
- All Python ObjectRefs wrapping the block's ObjectID are garbage-collected, AND
- All Ray tasks that received the block as an argument have completed.
"""

def __init__(self):
# Object ID binaries of currently live blocks; used by _on_object_freed
# to distinguish a racing clear() from a real callback.
self._registered_ids: set[bytes] = set()
Comment thread
rayhhome marked this conversation as resolved.
# (producer_id -> total live bytes); maintained incrementally for O(1) reads.
self._bytes_by_producer: Dict[str, int] = defaultdict(int)
self._lock = threading.Lock()

def on_block_produced(
self,
block_ref: "ray.ObjectRef",
size_bytes: int,
producer_id: str,
) -> None:
"""Register a block and attribute its memory to producer_id.

Registers a Ray Core out-of-scope callback so that when all references
to block_ref are gone the bytes are automatically removed from the
producer's usage.
"""
id_binary = block_ref.binary()
Comment thread
rayhhome marked this conversation as resolved.
with self._lock:
self._registered_ids.add(id_binary)
self._bytes_by_producer[producer_id] += size_bytes

Comment thread
rayhhome marked this conversation as resolved.
def _on_object_freed(id_bytes: bytes) -> None:
with self._lock:
if id_bytes not in self._registered_ids:
# Already cleared (e.g. by clear()), nothing to do.
return
self._registered_ids.discard(id_bytes)
self._bytes_by_producer[producer_id] -= size_bytes
if self._bytes_by_producer[producer_id] == 0:
del self._bytes_by_producer[producer_id]
Comment thread
rayhhome marked this conversation as resolved.

core_worker = ray._private.worker.global_worker.core_worker # type: ignore[attr-defined]
registered = core_worker.add_object_out_of_scope_callback(
block_ref, _on_object_freed
)
if not registered:
_on_object_freed(id_binary)

def get_object_store_memory_usage(self, producer_id: str) -> int:
"""Total bytes of live blocks attributed to producer_id."""
with self._lock:
return self._bytes_by_producer.get(producer_id, 0)

def clear(self) -> None:
"""Reset all accounting, e.g. on executor shutdown.

Any previously registered Ray Core callbacks firing after clear()
will be silently ignored because _registered_ids is empty.
"""
with self._lock:
self._registered_ids.clear()
self._bytes_by_producer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ActorPoolInfo,
AutoscalingActorPool,
)
from ray.data._internal.execution.block_ref_counter import BlockRefCounter
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -134,6 +135,8 @@ def __init__(
self,
task_index: int,
streaming_gen: ObjectRefGenerator,
block_ref_counter: BlockRefCounter,
producer_id: str,
Comment on lines +138 to +139

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative implementation could be to make callers responsible for updating the reference counter with output_ready_callback rather than making it DataOpTask's responsibility.

Advantage is that it would reduce the amount we pass through block_ref_counter and simplify the DataOpTask interface, though I imagine it might also introduce a moderate amount of duplication.

Will defer you to about what's cleaner

output_ready_callback: Callable[[RefBundle], None] = lambda bundle: None,
task_done_callback: TaskDoneCallbackType = lambda exc, worker_stats, driver_stats: None,
Comment thread
rayhhome marked this conversation as resolved.
block_ready_callback: Callable[
Expand All @@ -149,6 +152,9 @@ def __init__(
Args:
task_index: Index of the task. Used for callbacks.
streaming_gen: The streaming generator of this task. It should yield blocks.
block_ref_counter: The centralized block reference counter. on_block_produced
is called for each block yielded by this task.
producer_id: The id of the operator that produces the blocks from this task.
output_ready_callback: The callback to call when a new RefBundle is output
from the generator.
task_done_callback: The callback to call when the task is done.
Expand All @@ -171,6 +177,8 @@ def __init__(
self._block_ready_callback = block_ready_callback
self._metadata_ready_callback = metadata_ready_callback
self._operator_name = operator_name
self._block_ref_counter: BlockRefCounter = block_ref_counter
self._producer_id: str = producer_id

# If the generator hasn't produced block metadata yet, or if the block metadata
# object isn't available after we get a reference, we need store the pending
Expand Down Expand Up @@ -292,6 +300,9 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
meta_with_schema_bytes
)
meta = meta_with_schema.metadata
self._block_ref_counter.on_block_produced(
self._pending_block_ref, meta.size_bytes or 0, self._producer_id
)
self._output_ready_callback(
RefBundle(
[BlockEntry(self._pending_block_ref, meta)],
Expand Down Expand Up @@ -444,6 +455,7 @@ def __init__(
self._id = str(uuid.uuid4())
# Initialize metrics after data_context is set
self._metrics = OpRuntimeMetrics(self)
self._block_ref_counter: Optional[BlockRefCounter] = None
Comment thread
rayhhome marked this conversation as resolved.

def __reduce__(self):
raise ValueError("Operator is not serializable.")
Expand Down Expand Up @@ -743,12 +755,21 @@ def num_output_splits(self) -> int:
"""
return self._num_output_splits

def start(self, options: ExecutionOptions) -> None:
def start(
self,
options: ExecutionOptions,
block_ref_counter: Optional[BlockRefCounter] = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment -- can this ever actually be None? We control when start gets called, and we make it so that start always gets called with a non-None value.

Making this just block_ref_counter: BlockRefCounter would make the code easier to reason about

) -> None:
"""Called by the executor when execution starts for an operator.

Args:
options: The global options used for the overall execution.
block_ref_counter: The executor-wide shared counter for tracking
object-store memory. If omitted, a fresh per-operator counter is used.
"""
self._block_ref_counter = (
block_ref_counter if block_ref_counter is not None else BlockRefCounter()
)
Comment thread
rayhhome marked this conversation as resolved.
Outdated
Comment thread
rayhhome marked this conversation as resolved.
Outdated
self._started = True

def can_add_input(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ def _apply_default_actor_task_remote_args(

return ray_actor_task_remote_args

def start(self, options: ExecutionOptions):
def start(self, options: ExecutionOptions, block_ref_counter=None):
Comment thread
rayhhome marked this conversation as resolved.
Outdated
self._actor_locality_enabled = options.actor_locality_enabled
super().start(options)
super().start(options, block_ref_counter)

self._actor_cls = ray.remote(**self._ray_remote_args)(self._map_worker_cls)
self._actor_pool.scale(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,23 @@ def all_inputs_done(self) -> None:
)
# NOTE: We don't account object store memory use from intermediate `bulk_fn`
# outputs (e.g., map outputs for map-reduce).
output_buffer, self._stats = self._bulk_fn(self._input_buffer.to_list(), ctx)

# Snapshot input refs before calling bulk_fn. Some bulk_fns (e.g.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense/be simpler if we made on_block_produced idempotent?

# randomize_blocks) forward input ObjectRefs unchanged to the output.
# We only call on_block_produced for genuinely new refs to avoid
# double-counting; forwarded refs stay attributed to their original producer.
input_bundles = self._input_buffer.to_list()
input_refs = {entry.ref for bundle in input_bundles for entry in bundle.blocks}
output_buffer, self._stats = self._bulk_fn(input_bundles, ctx)
self._output_buffer = FIFOBundleQueue(output_buffer)

for bundle in output_buffer:
for entry in bundle.blocks:
if entry.ref not in input_refs:
self._block_ref_counter.on_block_produced(
entry.ref, entry.metadata.size_bytes or 0, self.id
)

while self._input_buffer.has_next():
refs = self._input_buffer.get_next()
self._metrics.on_input_dequeued(refs, input_index=0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,8 @@ def __init__(
self._reduce_bar = None
self._reduce_metrics = OpRuntimeMetrics(self)

def start(self, options: ExecutionOptions) -> None:
super().start(options)
def start(self, options: ExecutionOptions, block_ref_counter=None) -> None:
Comment thread
rayhhome marked this conversation as resolved.
Outdated
super().start(options, block_ref_counter)

self._aggregator_pool.start()

Expand Down Expand Up @@ -1007,6 +1007,8 @@ def _on_aggregation_done(
ExecutionResources.from_resource_dict(finalize_task_resource_bundle)
),
operator_name=self.name,
block_ref_counter=self._block_ref_counter,
producer_id=self.id,
)
self._finalizing_tasks[partition_id] = data_task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
self._input_data_index = 0
self.mark_execution_finished()

def start(self, options: ExecutionOptions) -> None:
def start(self, options: ExecutionOptions, block_ref_counter=None) -> None:
Comment thread
rayhhome marked this conversation as resolved.
Outdated
if not self._is_input_initialized:
self._input_data = self._input_data_factory(
self.target_max_block_size_override
Expand All @@ -57,7 +57,7 @@ def start(self, options: ExecutionOptions) -> None:
# so we record input metrics here
for bundle in self._input_data:
self._metrics.on_input_received(bundle)
super().start(options)
super().start(options, block_ref_counter)

def has_next(self) -> bool:
return self._input_data_index < len(self._input_data)
Expand Down
12 changes: 8 additions & 4 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ def create(
else:
raise ValueError(f"Unsupported execution strategy {compute_strategy}")

def start(self, options: "ExecutionOptions"):
super().start(options)
def start(self, options: "ExecutionOptions", block_ref_counter=None):
Comment thread
rayhhome marked this conversation as resolved.
Outdated
super().start(options, block_ref_counter)
# Create output queue with desired ordering semantics.
if options.preserve_order:
self._output_queue = ReorderingBundleQueue()
Expand Down Expand Up @@ -655,8 +655,12 @@ def _task_done_callback(
data_task = DataOpTask(
task_index,
gen,
lambda output: _output_ready_callback(task_index, output),
functools.partial(_task_done_callback, task_index),
self._block_ref_counter,
self.id,
output_ready_callback=lambda output: _output_ready_callback(
task_index, output
),
task_done_callback=functools.partial(_task_done_callback, task_index),
operator_name=self.name,
)
self._metrics.on_task_submitted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ def num_output_rows_total(self) -> Optional[int]:
# The total number of rows is the same as the number of input rows.
return self.input_dependencies[0].num_output_rows_total()

def start(self, options: ExecutionOptions) -> None:
def start(self, options: ExecutionOptions, block_ref_counter=None) -> None:
Comment thread
rayhhome marked this conversation as resolved.
Outdated
if options.preserve_order:
# If preserve_order is set, we need to ignore locality hints to ensure determinism.
self._locality_hints = None
self._max_buffer_size = 0

super().start(options)
super().start(options, block_ref_counter)

def throttling_disabled(self) -> bool:
"""Disables resource-based throttling.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def _input_queues(self) -> List["BaseBundleQueue"]:
def _output_queues(self) -> List["BaseBundleQueue"]:
return [self._output_buffer]

def start(self, options: ExecutionOptions):
def start(self, options: ExecutionOptions, block_ref_counter=None):
Comment thread
rayhhome marked this conversation as resolved.
Outdated
# Whether to preserve deterministic ordering of output blocks.
# When True, blocks are emitted in round-robin order across inputs,
# ensuring the same input always produces the same output order.
self._preserve_order = options.preserve_order
super().start(options)
super().start(options, block_ref_counter)

def num_outputs_total(self) -> Optional[int]:
num_outputs = 0
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ray._common.utils import env_bool, env_float
from ray.data._internal.execution import create_resource_allocator
from ray.data._internal.execution.block_ref_counter import BlockRefCounter
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -137,6 +138,8 @@ def __init__(
# operator's output usage.
self._output_operator = terminal_operator_from_topology(topology)

self._block_ref_counter = BlockRefCounter()

self._op_resource_allocator: Optional[
"OpResourceAllocator"
] = create_resource_allocator(self, data_context)
Expand Down Expand Up @@ -168,6 +171,11 @@ def get_external_consumer_bytes(self) -> int:
"""Get the bytes buffered by external consumers."""
return self._external_consumer_bytes

@property
def block_ref_counter(self) -> BlockRefCounter:
"""The centralized block reference counter for this executor."""
return self._block_ref_counter

Comment on lines +174 to +178

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably simplify the dataflow here.

Image

Currently, it's like:

  1. Executor creates ResourceManager
  2. ResourceManager constructs counter
  3. Executor gets counter from ResourceManager
  4. Executor passes counter from ResourceManager to Operators

I think it'd be clearer as

Image
  1. Executor constructs counter
  2. Executor passes counter to ResourceManager as constructor dependency
  3. Executor passes counter to operators in start

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we can also avoid expanding the ResourceManager interface with the block_ref_coutner proeprty

def _estimate_object_store_memory_usage(
self, op: "PhysicalOperator", state: "OpState"
) -> int:
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ def execute(
self._data_context,
)

counter = self._resource_manager.block_ref_counter
for op in self._topology:
op.start(self._options, counter)

# Constructed once per executor (not per scheduling iteration) so the
# guard's idle-detection state accumulates across scheduling iterations.
self._output_backpressure_guard = OutputBackpressureGuard(
Expand Down Expand Up @@ -332,6 +336,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None):
op.shutdown(timer, force=force)

self._clear_topology_queues_post_shutdown(force, exception)
# Queues have been drained; any remaining Ray Core callbacks that fire
# after this point should be no-ops.
self._resource_manager.block_ref_counter.clear()

min_ = round(timer.min(), 3)
max_ = round(timer.max(), 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ def setup_state(op: PhysicalOperator) -> OpState:
# Create state.
op_state = OpState(op, inqueues)
topology[op] = op_state
op.start(options)
return op_state

setup_state(dag)
Expand Down
Loading
Loading