From 9ee0902e30d65f716cd54739d98fc2b4e19dc474 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Mon, 16 Mar 2026 12:43:29 -0700 Subject: [PATCH 01/17] add kaleido constraint --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 195d29ec2a..42d1edaef2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -186,6 +186,9 @@ requires = [ ] build-backend = "hatchling.build" +[tool.uv] +constraint-dependencies = ["kaleido==0.2.1"] + [tool.hatch.version] path = "data_juicer/__init__.py" From f517ff9953294a92690f50deb0e17f81e97b096f Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Mon, 16 Mar 2026 15:59:56 -0700 Subject: [PATCH 02/17] direct concurrency setting --- data_juicer/core/data/ray_dataset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/data_juicer/core/data/ray_dataset.py b/data_juicer/core/data/ray_dataset.py index 2d8b198565..acca1a8b8f 100644 --- a/data_juicer/core/data/ray_dataset.py +++ b/data_juicer/core/data/ray_dataset.py @@ -237,7 +237,8 @@ def process_batch_arrow(table: pyarrow.Table): try: if op.use_ray_actor(): - compute = get_compute_strategy(op.__class__, concurrency=op.num_proc) + # Use concurrency= directly for better GPU utilization + # (get_compute_strategy may limit parallelism) self.data = self.data.map_batches( op.__class__, fn_args=None, @@ -247,7 +248,7 @@ def process_batch_arrow(table: pyarrow.Table): batch_size=batch_size, num_cpus=op.num_cpus, num_gpus=op.num_gpus, - compute=compute, + concurrency=op.num_proc, batch_format="pyarrow", runtime_env=op.runtime_env, ) @@ -280,7 +281,7 @@ def process_batch_arrow(table: pyarrow.Table): ) cached_columns.add(Fields.stats) if op.use_ray_actor(): - compute = get_compute_strategy(op.__class__, concurrency=op.num_proc) + # Use concurrency= directly for better GPU utilization self.data = self.data.map_batches( op.__class__, fn_args=None, @@ -290,7 +291,7 @@ def process_batch_arrow(table: pyarrow.Table): batch_size=batch_size, num_cpus=op.num_cpus, num_gpus=op.num_gpus, - compute=compute, + concurrency=op.num_proc, batch_format="pyarrow", runtime_env=op.runtime_env, ) From 0f4eee16ae95fd096b4236f668fa39f558f7432a Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 11:26:33 -0700 Subject: [PATCH 03/17] Add concurrent partition processing with auto GPU detection Partitions now run concurrently as Ray remote tasks instead of sequentially, eliminating GPU idle time between partitions. max_concurrent_partitions defaults to "auto", which detects the number of GPUs in the Ray cluster and sets concurrency accordingly. num_of_partitions is automatically raised to match when too low. - New: concurrency_scoping.py with scope_op_concurrency() utility - New: _process_partitions_concurrent() method using @ray.remote tasks - New: _resolve_max_concurrent() for auto GPU detection - Sequential path preserved for DAG monitoring and single-GPU setups - Tests for scoping, config parsing, auto-inference, and e2e concurrent - perf-test.py simplified: no manual GPU tuning flags needed --- .../core/executor/concurrency_scoping.py | 20 + .../core/executor/ray_executor_partitioned.py | 223 +++- docs/design/parallel_partition_actor_reuse.md | 1116 +++++++++++++++++ perf-test.py | 582 +++++++++ .../executor/test_ray_executor_partitioned.py | 216 ++++ 5 files changed, 2156 insertions(+), 1 deletion(-) create mode 100644 data_juicer/core/executor/concurrency_scoping.py create mode 100644 docs/design/parallel_partition_actor_reuse.md create mode 100644 perf-test.py diff --git a/data_juicer/core/executor/concurrency_scoping.py b/data_juicer/core/executor/concurrency_scoping.py new file mode 100644 index 0000000000..906f8cbe37 --- /dev/null +++ b/data_juicer/core/executor/concurrency_scoping.py @@ -0,0 +1,20 @@ +"""Utility for scoping op concurrency when running partitions concurrently.""" + + +def scope_op_concurrency(op, max_concurrent_partitions: int) -> int: + """Returns the concurrency a single partition should use for this op. + + When multiple partitions run concurrently, each partition should use a + fraction of the total GPU/actor resources to avoid over-subscription. + + Args: + op: An operator instance with ``use_ray_actor()`` and ``num_proc``. + max_concurrent_partitions: How many partitions will run in parallel. + + Returns: + The concurrency value the partition should pass through to + ``map_batches``. + """ + if not op.use_ray_actor() or not op.num_proc or op.num_proc <= 0: + return op.num_proc # CPU ops or auto-mode unchanged + return max(1, op.num_proc // max_concurrent_partitions) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index ddcb1fc442..293f3ab077 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -273,16 +273,60 @@ def _configure_partitioning(self): logger.warning("Legacy num_partitions detected, overriding partition configuration") self.partition_mode = mode - self.num_partitions = num_of_partitions self.partition_size = partition_size self.max_size_mb = max_size_mb + # Resolve max_concurrent_partitions. + # "auto" (default) → detect from Ray cluster GPU count, fall back to 1. + # Explicit int → use as-is. + raw_max_conc = ConfigAccessor.get(partition_cfg, "max_concurrent_partitions", "auto") + self.max_concurrent_partitions = self._resolve_max_concurrent(raw_max_conc) + + # Ensure we have at least as many partitions as concurrent slots, + # otherwise some GPUs would sit idle. + if self.max_concurrent_partitions > num_of_partitions: + logger.info( + f"num_of_partitions ({num_of_partitions}) < " + f"max_concurrent_partitions ({self.max_concurrent_partitions}), " + f"raising num_of_partitions to {self.max_concurrent_partitions}" + ) + num_of_partitions = self.max_concurrent_partitions + + self.num_partitions = num_of_partitions + if mode == "manual": logger.info(f"Manual partition mode: using {self.num_partitions} partitions") else: # auto mode logger.info(f"Auto partition mode: will determine optimal partitioning based on data characteristics") logger.info(f"Fallback partition size: {self.partition_size} samples, max {self.max_size_mb} MB") + if self.max_concurrent_partitions > 1: + logger.info( + f"Concurrent partition processing enabled: " + f"max_concurrent_partitions={self.max_concurrent_partitions}" + ) + + @staticmethod + def _resolve_max_concurrent(raw_value) -> int: + """Resolve max_concurrent_partitions from config value. + + * ``"auto"`` → number of GPUs visible to Ray (falls back to 1). + * An explicit int is returned as-is (minimum 1). + """ + if isinstance(raw_value, str) and raw_value.lower() == "auto": + try: + num_gpus = int(ray.cluster_resources().get("GPU", 0)) + except Exception: + num_gpus = 0 + if num_gpus > 1: + logger.info( + f"Auto-detected {num_gpus} GPUs in Ray cluster, " f"setting max_concurrent_partitions={num_gpus}" + ) + return num_gpus + # No GPUs or single GPU → sequential + return 1 + return max(1, int(raw_value)) + def _configure_auto_partitioning(self, dataset, ops): """Configure partitioning using the partition size optimizer for auto mode.""" try: @@ -498,6 +542,10 @@ def _process_with_simple_partitioning(self, dataset: RayDataset, ops: List): f"{partitioning_info.total_rows} total rows" ) + # Branch: concurrent vs sequential partition processing + if self.max_concurrent_partitions > 1: + return self._process_partitions_concurrent(partitions, ops, partitioning_info) + # Process each partition separately with checkpointing logger.info("Processing partitions with checkpointing support...") processed_partitions = [] @@ -541,6 +589,179 @@ def _process_with_simple_partitioning(self, dataset: RayDataset, ops: List): # Return as RayDataset wrapper return RayDataset(merged_dataset, cfg=self.cfg) + def _process_partitions_concurrent(self, partitions, ops, partitioning_info): + """Process partitions concurrently as Ray remote tasks. + + Each partition is submitted as a Ray remote task that independently + loads ops from config, scopes concurrency, and processes data with + its own checkpoint manager. Results are collected and unioned. + """ + max_conc = min(self.max_concurrent_partitions, len(partitions)) + logger.info(f"Processing {len(partitions)} partitions concurrently " f"(max_concurrent_partitions={max_conc})") + + # Serialisable values extracted from self (avoid serialising the executor) + cfg = self.cfg + ckpt_enabled = self.ckpt_manager.checkpoint_enabled + ckpt_strategy = self.ckpt_manager.checkpoint_strategy + ckpt_dir = self.ckpt_manager.ckpt_dir + ckpt_n_ops = getattr(self.ckpt_manager, "checkpoint_n_ops", 1) + ckpt_op_names = getattr(self.ckpt_manager, "checkpoint_op_names", []) + op_fusion_enabled = getattr(cfg, "op_fusion", False) + + @ray.remote + def _process_single_partition_task( + partition_data, + partition_id, + cfg, + max_concurrent_partitions, + ckpt_enabled, + ckpt_strategy, + ckpt_dir, + ckpt_n_ops, + ckpt_op_names, + op_fusion_enabled, + ): + """Ray remote task that processes one partition end-to-end.""" + from loguru import logger as task_logger + + from data_juicer.core.data.ray_dataset import RayDataset + from data_juicer.core.executor.concurrency_scoping import ( + scope_op_concurrency, + ) + from data_juicer.ops import load_ops + from data_juicer.ops.op_fusion import fuse_operators + from data_juicer.utils.ckpt_utils import RayCheckpointManager + + task_logger.info(f"[Partition {partition_id}] Starting remote processing") + + # Re-create ops from config to avoid serialisation issues + task_ops = load_ops(cfg.process) + if op_fusion_enabled: + task_ops = fuse_operators(task_ops) + + # Scope concurrency for each op + for op in task_ops: + op.num_proc = scope_op_concurrency(op, max_concurrent_partitions) + + # Create local checkpoint manager + ckpt_manager = RayCheckpointManager( + ckpt_dir=ckpt_dir, + checkpoint_enabled=ckpt_enabled, + checkpoint_strategy=ckpt_strategy, + checkpoint_n_ops=ckpt_n_ops, + checkpoint_op_names=ckpt_op_names, + ) + + # Check for existing checkpoint + latest_checkpoint = ckpt_manager.find_latest_checkpoint(partition_id) + + # If all ops are already checkpointed, load from checkpoint + if latest_checkpoint and latest_checkpoint[0] >= len(task_ops) - 1: + task_logger.info(f"[Partition {partition_id}] All ops checkpointed, " f"loading from checkpoint") + loaded = ckpt_manager.load_checkpoint( + latest_checkpoint[0], + latest_checkpoint[1], + partition_id, + cfg=cfg, + ) + if loaded is not None: + return loaded.data.materialize() + + # Determine resume point + start_op_idx = 0 + partition_dataset = RayDataset(partition_data, cfg=cfg) + + if latest_checkpoint: + loaded = ckpt_manager.load_checkpoint( + latest_checkpoint[0], + latest_checkpoint[1], + partition_id, + cfg=cfg, + ) + if loaded is not None: + partition_dataset = loaded + start_op_idx = latest_checkpoint[0] + 1 + task_logger.info(f"[Partition {partition_id}] Resuming from op " f"{start_op_idx}") + + # Process ops one-by-one with checkpointing + remaining_ops = task_ops[start_op_idx:] + for rel_idx, op in enumerate(remaining_ops): + abs_idx = start_op_idx + rel_idx + task_logger.info(f"[Partition {partition_id}] Processing op {abs_idx}: " f"{op._name}") + partition_dataset = partition_dataset.process([op]) + + # Checkpoint if needed + if ckpt_manager.should_checkpoint(abs_idx, op._name): + partition_dataset.data = partition_dataset.data.materialize() + ckpt_manager.save_checkpoint( + partition_dataset.data, + abs_idx, + partition_id, + ) + + # Final materialize + partition_dataset.data = partition_dataset.data.materialize() + return partition_dataset.data + + # Submit tasks + futures = {} + for i, partition in enumerate(partitions): + # Check if partition is fully checkpointed before submitting + latest_ckpt = self.ckpt_manager.find_latest_checkpoint(i) + if latest_ckpt and latest_ckpt[0] >= len(ops) - 1: + logger.info(f"Partition {i}: already fully checkpointed, " f"loading from checkpoint") + loaded = self.ckpt_manager.load_checkpoint(latest_ckpt[0], latest_ckpt[1], i, cfg=self.cfg) + if loaded is not None: + futures[i] = loaded.data.materialize() + continue + + self._log_event( + event_type=EventType.PARTITION_START, + message=f"Starting concurrent processing of partition " f"{i + 1}/{len(partitions)}", + partition_id=i, + ) + futures[i] = _process_single_partition_task.remote( + partition, + i, + cfg, + max_conc, + ckpt_enabled, + ckpt_strategy, + ckpt_dir, + ckpt_n_ops, + ckpt_op_names, + op_fusion_enabled, + ) + + # Collect results + processed_partitions = [] + for i in range(len(partitions)): + result = futures[i] + if isinstance(result, ray.ObjectRef): + try: + result = ray.get(result) + logger.info(f"Partition {i}: completed successfully") + except Exception as e: + logger.error(f"Partition {i}: failed with error: {e}") + raise + processed_partitions.append(result) + self._log_event( + event_type=EventType.PARTITION_COMPLETE, + message=f"Completed concurrent processing of partition " f"{i + 1}/{len(partitions)}", + partition_id=i, + ) + + # Union results + logger.info("Merging concurrently processed partitions...") + if len(processed_partitions) == 1: + merged_dataset = processed_partitions[0] + else: + merged_dataset = processed_partitions[0] + for partition in processed_partitions[1:]: + merged_dataset = merged_dataset.union(partition) + + return RayDataset(merged_dataset, cfg=self.cfg) + def _process_with_convergence(self, dataset: RayDataset, ops: List, convergence_points: List[int]): """ Process dataset with convergence support for global operations. diff --git a/docs/design/parallel_partition_actor_reuse.md b/docs/design/parallel_partition_actor_reuse.md new file mode 100644 index 0000000000..f2d2617ab7 --- /dev/null +++ b/docs/design/parallel_partition_actor_reuse.md @@ -0,0 +1,1116 @@ +# Design Doc: Parallel Partition Processing with Actor Reuse + +**Author:** Data-Juicer Team +**Created:** 2026-03-09 +**Updated:** 2026-03-09 +**Status:** Draft +**Branch:** `feat/parallel-partition-actor-reuse` + +--- + +## 1. Problem Statement + +### Current Behavior + +The `PartitionedRayExecutor` processes partitions **sequentially**, creating new GPU actors for each partition: + +``` +Partition 1 → [Create Actors] → [Load Models] → [Process] → [Actors GC'd] +Partition 2 → [Create Actors] → [Load Models] → [Process] → [Actors GC'd] +Partition 3 → [Create Actors] → [Load Models] → [Process] → [Actors GC'd] +``` + +### Problems + +1. **Repeated Model Loading**: Heavy GPU models (e.g., VideoBLIP ~20GB) are loaded N times for N partitions +2. **GPU Idle Time**: GPUs sit idle between partitions during actor teardown/creation +3. **Poor Scalability**: Processing time scales linearly with partition count due to model loading overhead + +### Impact + +For a typical video processing pipeline with 3 GPU operators and 10 partitions: +- Model loading time: ~60s per operator × 3 operators × 10 partitions = **30 minutes of pure overhead** +- This overhead can exceed actual processing time for smaller datasets + +### Root Cause Analysis + +The issue is **not** the partitioning strategy itself. Partitioning provides: +- Memory control +- Checkpoint granularity +- Resume capability + +The issue is **actor lifecycle management**: +- Actors are created per `map_batches()` call +- Each partition triggers a new `map_batches()` call +- Ray garbage collects actors after each partition completes + +--- + +## 2. Proposed Solution + +### Overview + +Implement **shared actor pools with detached lifecycle** that persist across partitions: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Shared Actor Pool (Detached) │ +│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ +│ │ A0 │ │ A1 │ │ A2 │ │ A3 │ ← Models loaded │ +│ │GPU0 │ │GPU1 │ │GPU2 │ │GPU3 │ ONCE at start │ +│ └──────┘ └──────┘ └──────┘ └──────┘ │ +└─────────────────────────────────────────────────────────────┘ + ↑ ↑ ↑ ↑ + │ │ │ │ + ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ + │ P0 │ │ P1 │ │ P2 │ │ P3 │ + └─────┘ └─────┘ └─────┘ └─────┘ + +Partitions processed sequentially, actors reused across all +``` + +### Key Design Principles + +1. **Keep Sequential Partition Processing**: Preserves resume capability and checkpoint granularity +2. **Detached Actor Lifecycle**: Actors persist across partitions, models load once +3. **Explicit Pool Management**: Create pools at job start, cleanup at job end +4. **Compatible with Resume**: Skip completed partitions, reuse actors for remaining + +--- + +## 3. Detailed Design + +### 3.1 Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ ActorReusePartitionExecutor │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ SharedActorPoolManager │ │ +│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ +│ │ │ Pool: NSFW │ │Pool: Aesth. │ │Pool: Caption│ │ │ +│ │ │ 8 actors │ │ 16 actors │ │ 8 actors │ │ │ +│ │ │ num_gpus=1 │ │ num_gpus=0.5│ │ num_gpus=1 │ │ │ +│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ ↑ │ +│ │ (shared across partitions) │ +│ ↓ │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ Sequential Partition Processing │ │ +│ │ │ │ +│ │ for partition_id, partition in enumerate(partitions): │ │ +│ │ if is_complete(partition_id): continue # Resume support │ │ +│ │ result = process_with_shared_actors(partition, ops) │ │ +│ │ checkpoint(partition_id, result) │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### 3.2 Execution Flow + +``` +Phase 1: Initialization +───────────────────────── + +Job Start + │ + ▼ +┌─────────────────────┐ +│ Analyze Operators │ Identify GPU operators and their requirements +└──────────┬──────────┘ + │ + ▼ +┌─────────────────────┐ +│ Create Actor Pools │ Create detached actors, load models ONCE +└──────────┬──────────┘ + │ + ▼ +┌─────────────────────┐ +│ Split into Partitions│ +└──────────┬──────────┘ + + +Phase 2: Sequential Processing with Actor Reuse +─────────────────────────────────────────────── + │ + ┌─────┴─────┐ + │ │ + ▼ ▼ +┌─────────┐ ┌─────────┐ +│ P0 │ │ P1 │ ... +└────┬────┘ └────┬────┘ + │ │ + ▼ ▼ +┌─────────────────────────────────────────┐ +│ Shared Actor Pool │ +│ (models already loaded) │ +│ │ +│ Process P0 → Checkpoint P0 │ +│ Process P1 → Checkpoint P1 │ +│ Process P2 → Checkpoint P2 │ +│ ... │ +└─────────────────────────────────────────┘ + + +Phase 3: Cleanup +──────────────── + │ + ▼ +┌─────────────────────┐ +│ Cleanup Actor Pools │ Kill detached actors +└──────────┬──────────┘ + │ + ▼ + Job End +``` + +### 3.3 Resume Flow + +``` +Resume from Crash (Partition 2 was in progress) +────────────────────────────────────────────── + +Checkpoint State: +├── partition_0/ ✓ (complete) +├── partition_1/ ✓ (complete) +├── partition_2/ ✗ (incomplete - no _SUCCESS marker) +└── partition_3/ ✗ (not started) + +Resume: + │ + ▼ +┌─────────────────────┐ +│ Scan Checkpoints │ Find: completed=[0,1], pending=[2,3] +└──────────┬──────────┘ + │ + ▼ +┌─────────────────────┐ +│ Create Actor Pools │ Models load once (fresh start) +└──────────┬──────────┘ + │ + ▼ +┌─────────────────────────────────────────┐ +│ for partition_id in [0, 1, 2, 3]: │ +│ if partition_id in [0, 1]: │ +│ skip (already complete) │ +│ else: │ +│ process with shared actors │ +│ checkpoint │ +└─────────────────────────────────────────┘ + │ + ▼ + Only partitions 2, 3 processed + Models loaded only ONCE for both +``` + +### 3.4 Core Components + +#### 3.4.1 DetachedActorPool + +```python +import ray +from ray.util.actor_pool import ActorPool +from typing import Dict, List, Any, Optional +from dataclasses import dataclass +import uuid + + +@dataclass +class ActorPoolConfig: + """Configuration for an actor pool.""" + op_class: type + op_init_args: tuple + op_init_kwargs: dict + num_gpus: float + num_cpus: float + num_actors: int + pool_id: str + + +class DetachedActorPool: + """ + A pool of detached Ray actors that persist across partitions. + + Key features: + - Actors have 'detached' lifetime (not garbage collected) + - Models loaded once during actor creation + - Explicit cleanup required at job end + """ + + def __init__(self, config: ActorPoolConfig): + self.config = config + self.actors: List[ray.actor.ActorHandle] = [] + self.pool: Optional[ActorPool] = None + self._created = False + + def initialize(self): + """Create detached actors and load models.""" + if self._created: + return + + # Create actor class with resource requirements + actor_cls = ray.remote( + num_gpus=self.config.num_gpus, + num_cpus=self.config.num_cpus, + )(self.config.op_class) + + # Create detached actors + for i in range(self.config.num_actors): + actor_name = f"{self.config.pool_id}_actor_{i}" + + # Check if actor already exists (from previous run) + try: + actor = ray.get_actor(actor_name) + logger.info(f"Reusing existing actor: {actor_name}") + except ValueError: + # Create new detached actor + actor = actor_cls.options( + name=actor_name, + lifetime="detached", + max_restarts=3, + ).remote(*self.config.op_init_args, **self.config.op_init_kwargs) + logger.info(f"Created new actor: {actor_name}") + + self.actors.append(actor) + + self.pool = ActorPool(self.actors) + self._created = True + logger.info(f"Actor pool initialized: {self.config.pool_id} with {len(self.actors)} actors") + + def map_batches(self, batches: List[Any], method_name: str = "process") -> List[Any]: + """ + Process batches using the actor pool. + + Args: + batches: List of batches to process + method_name: Name of the actor method to call + + Returns: + List of processed results + """ + if not self._created: + raise RuntimeError("Actor pool not initialized. Call initialize() first.") + + results = [] + + # Submit all batches to the pool + for batch in batches: + self.pool.submit( + lambda actor, b: getattr(actor, method_name).remote(b), + batch + ) + + # Collect results in order + while self.pool.has_next(): + result = self.pool.get_next() + results.append(result) + + return results + + def process_dataset(self, dataset: ray.data.Dataset, batch_size: int = 1000) -> ray.data.Dataset: + """ + Process a Ray Dataset using the actor pool. + + This method iterates through the dataset in batches, + submits them to the actor pool, and collects results. + """ + if not self._created: + raise RuntimeError("Actor pool not initialized. Call initialize() first.") + + processed_batches = [] + + for batch in dataset.iter_batches(batch_size=batch_size, batch_format="pyarrow"): + # Submit to pool + self.pool.submit( + lambda actor, b: actor.process.remote(b), + batch + ) + + # Collect all results + while self.pool.has_next(): + result = self.pool.get_next() + processed_batches.append(result) + + # Convert back to Ray Dataset + return ray.data.from_arrow(processed_batches) + + def cleanup(self): + """Kill all actors in the pool.""" + for actor in self.actors: + try: + ray.kill(actor) + except Exception as e: + logger.warning(f"Failed to kill actor: {e}") + + self.actors = [] + self.pool = None + self._created = False + logger.info(f"Actor pool cleaned up: {self.config.pool_id}") +``` + +#### 3.4.2 SharedActorPoolManager + +```python +class SharedActorPoolManager: + """ + Manages shared actor pools for GPU operators across partitions. + + Responsibilities: + - Create actor pools for each unique GPU operator configuration + - Reuse pools across partitions + - Handle cleanup at job completion + """ + + def __init__(self, job_id: str = None): + self.job_id = job_id or str(uuid.uuid4())[:8] + self.pools: Dict[str, DetachedActorPool] = {} + self._initialized = False + + def initialize_pools(self, ops: List[OP], num_gpus_available: int = 8): + """ + Initialize actor pools for all GPU operators. + + Args: + ops: List of operators in the pipeline + num_gpus_available: Total GPUs available in cluster + """ + if self._initialized: + logger.info("Actor pools already initialized, skipping") + return + + for op in ops: + if not op.use_ray_actor(): + continue # Skip CPU operators + + pool_key = self._get_pool_key(op) + + if pool_key in self.pools: + continue # Pool already created for similar operator + + # Calculate number of actors based on GPU requirements + num_actors = self._calculate_num_actors(op, num_gpus_available) + + config = ActorPoolConfig( + op_class=op.__class__, + op_init_args=op._init_args, + op_init_kwargs=op._init_kwargs, + num_gpus=op.num_gpus or 1, + num_cpus=op.num_cpus or 1, + num_actors=num_actors, + pool_id=f"{self.job_id}_{op._name}", + ) + + pool = DetachedActorPool(config) + pool.initialize() + self.pools[pool_key] = pool + + logger.info(f"Created actor pool for {op._name}: {num_actors} actors, {op.num_gpus} GPUs each") + + self._initialized = True + + def get_pool(self, op: OP) -> Optional[DetachedActorPool]: + """Get the actor pool for an operator.""" + pool_key = self._get_pool_key(op) + return self.pools.get(pool_key) + + def _get_pool_key(self, op: OP) -> str: + """ + Generate a unique key for an operator's pool. + + Operators with same class and GPU requirements can share a pool. + """ + return f"{op.__class__.__name__}_{op.num_gpus}_{op.num_cpus}" + + def _calculate_num_actors(self, op: OP, num_gpus_available: int) -> int: + """Calculate optimal number of actors for an operator.""" + if op.num_gpus and op.num_gpus > 0: + # GPU operator: actors = available_gpus / gpus_per_actor + return max(1, int(num_gpus_available / op.num_gpus)) + else: + # CPU operator with actor mode: use num_proc + return op.num_proc if op.num_proc > 0 else 4 + + def cleanup_all(self): + """Cleanup all actor pools.""" + for pool_key, pool in self.pools.items(): + pool.cleanup() + + self.pools = {} + self._initialized = False + logger.info("All actor pools cleaned up") +``` + +#### 3.4.3 ActorReusePartitionExecutor + +```python +class ActorReusePartitionExecutor: + """ + Partition executor with actor reuse across partitions. + + Key features: + - Sequential partition processing (resume-friendly) + - Shared actor pools (models load once) + - Per-partition checkpointing + - Resume from last incomplete partition + """ + + def __init__(self, cfg): + self.cfg = cfg + self.num_partitions = cfg.partition.get('num_of_partitions', 10) + self.checkpoint_dir = cfg.get('checkpoint_dir', './checkpoints') + self.actor_manager = SharedActorPoolManager(job_id=cfg.get('job_id')) + + # Detect available GPUs + self.num_gpus = self._detect_gpus() + + def run(self, dataset: ray.data.Dataset, ops: List[OP]) -> ray.data.Dataset: + """ + Main execution method. + + Args: + dataset: Input Ray Dataset + ops: List of operators to apply + + Returns: + Processed Ray Dataset + """ + try: + # Phase 1: Initialize actor pools for GPU operators + logger.info("Phase 1: Initializing actor pools...") + self.actor_manager.initialize_pools(ops, self.num_gpus) + + # Phase 2: Split dataset into partitions + logger.info(f"Phase 2: Splitting dataset into {self.num_partitions} partitions...") + partitions = dataset.split(self.num_partitions) + + # Phase 3: Process partitions sequentially with actor reuse + logger.info("Phase 3: Processing partitions...") + processed_partitions = [] + + for partition_id, partition in enumerate(partitions): + # Check if partition already completed (resume support) + if self._is_partition_complete(partition_id): + logger.info(f"Partition {partition_id}: Loading from checkpoint (already complete)") + result = self._load_partition_checkpoint(partition_id) + else: + logger.info(f"Partition {partition_id}: Processing...") + result = self._process_partition(partition, ops, partition_id) + + # Checkpoint after successful processing + self._save_partition_checkpoint(partition_id, result) + logger.info(f"Partition {partition_id}: Checkpointed") + + processed_partitions.append(result) + + # Phase 4: Union all partitions + logger.info("Phase 4: Merging partitions...") + final_dataset = self._union_partitions(processed_partitions) + + return final_dataset + + finally: + # Phase 5: Cleanup actor pools + logger.info("Phase 5: Cleaning up actor pools...") + self.actor_manager.cleanup_all() + + def _process_partition( + self, + partition: ray.data.Dataset, + ops: List[OP], + partition_id: int + ) -> ray.data.Dataset: + """ + Process a single partition using shared actor pools. + + Args: + partition: The partition dataset to process + ops: List of operators to apply + partition_id: ID of this partition (for logging) + + Returns: + Processed partition dataset + """ + result = partition + + for op_idx, op in enumerate(ops): + logger.debug(f"Partition {partition_id}, Op {op_idx}: {op._name}") + + if op.use_ray_actor(): + # GPU operator: use shared actor pool + pool = self.actor_manager.get_pool(op) + if pool is None: + raise RuntimeError(f"No actor pool found for operator: {op._name}") + + result = pool.process_dataset(result, batch_size=op.batch_size or 1000) + else: + # CPU operator: use standard Ray Data processing + result = result.map_batches( + op.process, + batch_size=op.batch_size, + batch_format="pyarrow", + ) + + return result + + def _is_partition_complete(self, partition_id: int) -> bool: + """Check if a partition checkpoint exists and is complete.""" + checkpoint_path = self._get_checkpoint_path(partition_id) + success_marker = os.path.join(checkpoint_path, "_SUCCESS") + return os.path.exists(success_marker) + + def _save_partition_checkpoint(self, partition_id: int, dataset: ray.data.Dataset): + """Save partition checkpoint atomically.""" + checkpoint_path = self._get_checkpoint_path(partition_id) + temp_path = f"{checkpoint_path}.tmp" + + # Write to temp location + dataset.write_parquet(temp_path) + + # Atomic rename + if os.path.exists(checkpoint_path): + shutil.rmtree(checkpoint_path) + os.rename(temp_path, checkpoint_path) + + # Write success marker + with open(os.path.join(checkpoint_path, "_SUCCESS"), 'w') as f: + f.write(datetime.now().isoformat()) + + def _load_partition_checkpoint(self, partition_id: int) -> ray.data.Dataset: + """Load partition from checkpoint.""" + checkpoint_path = self._get_checkpoint_path(partition_id) + return ray.data.read_parquet(checkpoint_path) + + def _get_checkpoint_path(self, partition_id: int) -> str: + """Get checkpoint path for a partition.""" + return os.path.join(self.checkpoint_dir, f"partition_{partition_id:04d}") + + def _union_partitions(self, partitions: List[ray.data.Dataset]) -> ray.data.Dataset: + """Union all partitions into a single dataset.""" + if not partitions: + raise ValueError("No partitions to union") + + result = partitions[0] + for p in partitions[1:]: + result = result.union(p) + + return result + + def _detect_gpus(self) -> int: + """Detect number of available GPUs in the cluster.""" + try: + resources = ray.cluster_resources() + return int(resources.get("GPU", 0)) + except Exception: + return 8 # Default assumption +``` + +--- + +## 4. Integration with Existing PartitionedRayExecutor + +### 4.1 Minimal Changes Approach + +```python +# In ray_executor_partitioned.py + +class PartitionedRayExecutor: + def __init__(self, cfg): + self.cfg = cfg + # ... existing init ... + + # New: Actor pool manager for GPU operator reuse + self.actor_manager = None + self.actor_reuse_enabled = cfg.get('partition', {}).get('actor_reuse', True) + + def run(self): + """Modified run method with actor reuse support.""" + try: + # Initialize shared actor pools if enabled + if self.actor_reuse_enabled and self._has_gpu_ops(): + self._initialize_shared_actors() + + # ... existing partition processing logic ... + + finally: + # Cleanup actors + if self.actor_manager: + self.actor_manager.cleanup_all() + + def _initialize_shared_actors(self): + """Initialize shared actor pools for GPU operators.""" + self.actor_manager = SharedActorPoolManager( + job_id=self.cfg.get('job_id', str(uuid.uuid4())[:8]) + ) + self.actor_manager.initialize_pools(self.ops, self._detect_gpus()) + + def _process_partition_with_actor_reuse(self, partition, ops): + """Process partition using shared actors for GPU ops.""" + result = partition + + for op in ops: + if op.use_ray_actor() and self.actor_manager: + # Use shared actor pool + pool = self.actor_manager.get_pool(op) + result = pool.process_dataset(result) + else: + # Standard processing + result = self._apply_op_standard(result, op) + + return result +``` + +### 4.2 Configuration Changes + +```yaml +# New configuration options + +partition: + mode: 'auto' + num_of_partitions: 10 + + # Actor reuse settings (NEW) + actor_reuse: true # Enable actor reuse across partitions + actor_pool: + max_restarts: 3 # Actor restart limit on failure + reuse_across_similar_ops: true # Share pool for ops with same config +``` + +--- + +## 5. Comparison: Before vs After + +### Timeline Comparison + +**Before (No Actor Reuse):** +``` +Time ────────────────────────────────────────────────────────────────────▶ + +Partition 0: [Load Model 60s][Process 30s][GC] +Partition 1: [Load Model 60s][Process 30s][GC] +Partition 2: [Load 60s][Process 30s] + +Total: 3 × (60s + 30s) = 270s +Model loads: 3 +``` + +**After (With Actor Reuse):** +``` +Time ────────────────────────────────────────────────────────────────────▶ + +Actor Pool: [Load Model 60s]─────────────────────────────────────────[Cleanup] +Partition 0: [Process 30s] +Partition 1: [Process 30s] +Partition 2: [Process 30s] + +Total: 60s + 3 × 30s = 150s +Model loads: 1 +``` + +**Speedup: 1.8x** (more significant with more partitions) + +### Performance Analysis + +| Metric | Before | After | Improvement | +|--------|--------|-------|-------------| +| Model Loading | N × T | 1 × T | **Nx faster** | +| GPU Idle Time | High (between partitions) | Low (continuous) | **Significant** | +| Memory Efficiency | Good (per-partition) | Good (same) | No change | +| Resume Support | Yes | Yes | No change | +| Checkpoint Granularity | Per-partition | Per-partition | No change | + +Where N = number of partitions, T = model loading time + +### Benchmark Projection + +**Setup:** +- 8 GPUs (A100 80GB) +- 10,000 video samples +- 10 partitions +- 3 GPU operators (NSFW ~30s load, Aesthetics ~20s load, Captioning ~60s load) +- Processing: ~100s per partition + +**Before:** +``` +Model loading: 10 × (30 + 20 + 60) = 1100s (~18 min) +Processing: 10 × 100s = 1000s (~17 min) +Total: ~35 minutes +``` + +**After:** +``` +Model loading: 1 × (30 + 20 + 60) = 110s (~2 min) +Processing: 10 × 100s = 1000s (~17 min) +Total: ~19 minutes +``` + +**Speedup: ~1.8x** + +--- + +## 6. Checkpointing and Resume + +### Checkpoint Structure + +``` +checkpoints/ +├── job_metadata.json # Job-level metadata +├── partition_0000/ +│ ├── data.parquet +│ └── _SUCCESS # Completion marker +├── partition_0001/ +│ ├── data.parquet +│ └── _SUCCESS +├── partition_0002/ +│ └── data.parquet # No _SUCCESS = incomplete +└── partition_0003/ # Directory doesn't exist = not started +``` + +### Resume Logic + +```python +def find_resume_point(checkpoint_dir: str, num_partitions: int) -> List[int]: + """ + Find which partitions need processing. + + Returns: + List of partition IDs that need processing + """ + pending = [] + + for partition_id in range(num_partitions): + checkpoint_path = f"{checkpoint_dir}/partition_{partition_id:04d}" + success_marker = f"{checkpoint_path}/_SUCCESS" + + if not os.path.exists(success_marker): + pending.append(partition_id) + + return pending +``` + +### Atomic Checkpoint Write + +```python +def atomic_checkpoint_write(dataset, checkpoint_path): + """ + Write checkpoint atomically to prevent corruption. + + Steps: + 1. Write to temp location + 2. Verify write succeeded + 3. Atomic rename to final location + 4. Write success marker + """ + temp_path = f"{checkpoint_path}.tmp.{uuid.uuid4()}" + + try: + # Step 1: Write to temp + dataset.write_parquet(temp_path) + + # Step 2: Verify + test_read = ray.data.read_parquet(temp_path) + if test_read.count() != dataset.count(): + raise RuntimeError("Checkpoint verification failed") + + # Step 3: Atomic rename + if os.path.exists(checkpoint_path): + shutil.rmtree(checkpoint_path) + os.rename(temp_path, checkpoint_path) + + # Step 4: Success marker + with open(f"{checkpoint_path}/_SUCCESS", 'w') as f: + f.write(json.dumps({ + 'timestamp': datetime.now().isoformat(), + 'row_count': dataset.count(), + })) + + except Exception as e: + # Cleanup temp on failure + if os.path.exists(temp_path): + shutil.rmtree(temp_path) + raise +``` + +--- + +## 7. Error Handling + +### Actor Failure Recovery + +```python +class DetachedActorPool: + def _handle_actor_failure(self, actor_idx: int, error: Exception): + """ + Handle actor failure with restart. + + Ray's 'max_restarts' handles automatic restart, + but we may need to re-add to pool. + """ + logger.warning(f"Actor {actor_idx} failed: {error}") + + # Check if actor was restarted by Ray + actor = self.actors[actor_idx] + try: + # Ping actor to check if alive + ray.get(actor.ping.remote(), timeout=5) + logger.info(f"Actor {actor_idx} recovered") + except Exception: + # Actor dead, create replacement + logger.info(f"Creating replacement for actor {actor_idx}") + self.actors[actor_idx] = self._create_actor(actor_idx) + + # Rebuild pool with updated actor list + self.pool = ActorPool(self.actors) +``` + +### Partition Failure Recovery + +```python +def _process_partition_with_retry(self, partition, ops, partition_id, max_retries=3): + """Process partition with retry on failure.""" + last_error = None + + for attempt in range(max_retries): + try: + result = self._process_partition(partition, ops, partition_id) + return result + except Exception as e: + last_error = e + logger.warning(f"Partition {partition_id} attempt {attempt + 1} failed: {e}") + + # Check if actors need recovery + self._recover_actors_if_needed() + + if attempt < max_retries - 1: + time.sleep(2 ** attempt) # Exponential backoff + + raise RuntimeError(f"Partition {partition_id} failed after {max_retries} attempts: {last_error}") +``` + +--- + +## 8. Implementation Plan + +### Phase 1: Core Actor Pool Infrastructure (3-4 days) + +- [ ] Implement `DetachedActorPool` class + - [ ] Actor creation with detached lifetime + - [ ] Batch processing methods + - [ ] Cleanup methods + - [ ] Unit tests + +- [ ] Implement `SharedActorPoolManager` class + - [ ] Pool key generation + - [ ] Pool initialization logic + - [ ] Actor count calculation + - [ ] Unit tests + +### Phase 2: Integration with PartitionedRayExecutor (2-3 days) + +- [ ] Modify `PartitionedRayExecutor.__init__` for actor manager +- [ ] Modify `_process_partition` to use shared actors +- [ ] Add configuration options +- [ ] Integration tests + +### Phase 3: Checkpointing Enhancements (2 days) + +- [ ] Implement atomic checkpoint writes +- [ ] Implement resume logic +- [ ] Add checkpoint verification +- [ ] Recovery tests + +### Phase 4: Testing & Optimization (2-3 days) + +- [ ] End-to-end benchmarks +- [ ] Memory profiling +- [ ] GPU utilization monitoring +- [ ] Performance comparison tests + +### Phase 5: Documentation & Production Readiness (1-2 days) + +- [ ] Update user documentation +- [ ] Add logging and metrics +- [ ] Configuration validation +- [ ] Code review and merge + +**Total Estimated Time: 10-14 days** + +--- + +## 9. Testing Strategy + +### Unit Tests + +```python +class TestDetachedActorPool: + def test_initialize_creates_actors(self): + """Test that initialization creates correct number of actors.""" + pass + + def test_actors_are_detached(self): + """Test that actors have detached lifetime.""" + pass + + def test_process_batches(self): + """Test batch processing through actor pool.""" + pass + + def test_cleanup_kills_actors(self): + """Test that cleanup properly kills all actors.""" + pass + + def test_reuse_existing_actors(self): + """Test that existing detached actors are reused.""" + pass + + +class TestSharedActorPoolManager: + def test_initialize_pools_for_gpu_ops(self): + """Test pool creation for GPU operators.""" + pass + + def test_skip_cpu_ops(self): + """Test that CPU operators don't get pools.""" + pass + + def test_pool_reuse_for_similar_ops(self): + """Test that similar ops share pools.""" + pass + + +class TestActorReusePartitionExecutor: + def test_end_to_end_processing(self): + """Test complete pipeline execution.""" + pass + + def test_resume_from_checkpoint(self): + """Test resuming from partial completion.""" + pass + + def test_actor_reuse_across_partitions(self): + """Verify actors are reused (models not reloaded).""" + pass +``` + +### Integration Tests + +```python +class TestActorReuseIntegration: + def test_with_real_gpu_operator(self): + """Test with actual GPU operator (e.g., VideoNSFWFilter).""" + pass + + def test_mixed_cpu_gpu_pipeline(self): + """Test pipeline with both CPU and GPU operators.""" + pass + + def test_checkpoint_and_resume(self): + """Test full checkpoint/resume cycle.""" + pass + + def test_actor_failure_recovery(self): + """Test recovery from actor failure.""" + pass +``` + +### Benchmark Tests + +```python +class TestPerformance: + def test_model_loading_count(self): + """Verify model is loaded only once across partitions.""" + # Count model loading log messages + pass + + def test_speedup_vs_baseline(self): + """Compare performance against current implementation.""" + pass + + def test_gpu_utilization(self): + """Measure GPU utilization during processing.""" + pass +``` + +--- + +## 10. Risks and Mitigations + +| Risk | Likelihood | Impact | Mitigation | +|------|------------|--------|------------| +| Detached actors not cleaned up | Medium | Medium | Explicit cleanup in finally block, cleanup script for orphans | +| Actor pool exhaustion | Low | High | Queue-based submission, backpressure handling | +| Memory leak in long-running actors | Low | Medium | Periodic actor restart option, memory monitoring | +| Incompatibility with Ray upgrades | Low | High | Pin Ray version, abstract actor APIs | +| Checkpoint corruption on crash | Low | High | Atomic writes, verification, temp files | + +--- + +## 11. Future Enhancements + +1. **Dynamic Actor Scaling**: Scale actor pool based on queue depth +2. **Actor Health Monitoring**: Proactive detection of unhealthy actors +3. **Cross-Job Actor Reuse**: Reuse actors across multiple jobs (with same model) +4. **GPU Memory Optimization**: Pack multiple small models on single GPU +5. **Async Checkpointing**: Checkpoint in background without blocking processing + +--- + +## 12. Appendix + +### A. Glossary + +| Term | Definition | +|------|------------| +| **Detached Actor** | Ray actor with `lifetime="detached"` that persists until explicitly killed | +| **Actor Pool** | Collection of actors that process work items in parallel | +| **Partition** | A subset of the dataset processed as a unit | +| **Checkpoint** | Saved state of a partition for resume capability | + +### B. Configuration Reference + +```yaml +partition: + # Existing options + mode: 'auto' # 'auto' | 'manual' + num_of_partitions: 10 # Number of partitions + + # New options for actor reuse + actor_reuse: true # Enable actor reuse (default: true) + actor_pool: + max_restarts: 3 # Max actor restarts on failure + reuse_across_similar_ops: true # Share pool for same-config ops + cleanup_on_error: true # Cleanup actors on job failure + +checkpoint: + enabled: true + dir: './checkpoints' + atomic_write: true # Use atomic checkpoint writes + verify_on_write: true # Verify checkpoint after write +``` + +### C. Monitoring and Observability + +```python +# Metrics to expose +METRICS = { + 'actor_pool_size': Gauge, # Number of actors in pool + 'actor_restarts_total': Counter, # Total actor restarts + 'partition_processing_time': Histogram, # Time per partition + 'model_load_time': Histogram, # Model loading time + 'gpu_utilization': Gauge, # GPU utilization percentage + 'checkpoint_write_time': Histogram, # Checkpoint write time +} +``` + +--- + +## References + +- [Ray Actors Documentation](https://docs.ray.io/en/latest/ray-core/actors.html) +- [Ray Actor Lifetimes](https://docs.ray.io/en/latest/ray-core/actors/named-actors.html#actor-lifetimes) +- [Ray Data User Guide](https://docs.ray.io/en/latest/data/data.html) +- Data-Juicer Source: `data_juicer/core/executor/ray_executor_partitioned.py` diff --git a/perf-test.py b/perf-test.py new file mode 100644 index 0000000000..de6b95c723 --- /dev/null +++ b/perf-test.py @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 +""" +Simple single-operator benchmark to test data loading and Ray Data parallelism. +Enhanced for debugging Ray/DataJuicer GPU actor initialization issues. +""" + +import argparse +import importlib +import json +import os +import subprocess +import sys +import time +from datetime import datetime + +from loguru import logger + +# Data-Juicer path +DJ_CODE_PATH = "/mnt/workspace/yileiz/data-juicer" +if os.path.exists(DJ_CODE_PATH): + sys.path.insert(0, DJ_CODE_PATH) + + +# Output directory +OUTPUT_DIR = "/mnt/workspace/yileiz/outputs/partitioned_ray/simple_workdir" + + +def setup_logging(log_dir=None): + """Setup logging to file and console.""" + if log_dir is None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_dir = os.path.join(OUTPUT_DIR, f"run_{timestamp}") + + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, "benchmark.log") + + logger.remove() + + logger.add( + sys.stdout, + level="INFO", + format="{time:HH:mm:ss} | {level: <8} | {message}", + colorize=True, + ) + + logger.add( + log_file, + level="DEBUG", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + rotation="100 MB", + ) + + logger.info(f"Log file: {log_file}") + return log_dir, log_file + + +def monitor_gpu(): + """Print GPU utilization.""" + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=index,name,utilization.gpu,memory.used,memory.total", "--format=csv,noheader"], + capture_output=True, + text=True, + check=False, + ) + logger.info(f"GPU Status:\n{result.stdout}") + except Exception as e: + logger.warning(f"Failed to query GPU status: {e}") + + +def log_ray_paths(): + """Print likely Ray log locations for easier debugging.""" + ray_tmp = "/tmp/ray" + if os.path.exists(ray_tmp): + logger.info(f"Ray temp dir exists: {ray_tmp}") + logger.info("Check Ray logs under: /tmp/ray/session_latest/logs/") + logger.info("Ray Data logs often under: /tmp/ray/session_latest/logs/ray-data/") + else: + logger.warning("Ray temp dir /tmp/ray not found yet") + + +def prepare_jsonl_from_caption(jsonl_path, video_base_dir, num_samples=None, output_path=None): + """Prepare JSONL with absolute video paths.""" + if output_path is None: + output_path = jsonl_path.replace(".jsonl", "_abs.jsonl") + + if os.path.exists(output_path): + logger.info(f"Output already exists: {output_path}") + return output_path + + count = 0 + missing = 0 + with open(jsonl_path, "r") as f_in, open(output_path, "w") as f_out: + for line in f_in: + if num_samples and count >= num_samples: + break + sample = json.loads(line) + videos = sample.get("videos", []) + abs_videos = [os.path.join(video_base_dir, os.path.basename(v)) for v in videos] + if all(os.path.exists(v) for v in abs_videos): + out_sample = {"videos": abs_videos, "text": sample.get("caption", "")} + f_out.write(json.dumps(out_sample, ensure_ascii=False) + "\n") + count += 1 + else: + missing += 1 + + logger.info(f"Created {output_path} with {count} samples, skipped {missing} missing-video samples") + return output_path + + +def split_jsonl(jsonl_path, num_shards=96): + """Split JSONL into shards.""" + shard_dir = jsonl_path.replace(".jsonl", f"_sharded_{num_shards}") + marker = os.path.join(shard_dir, "_DONE") + + if os.path.exists(marker): + logger.info(f"Sharded data exists: {shard_dir}") + return shard_dir + + os.makedirs(shard_dir, exist_ok=True) + + writers = [open(os.path.join(shard_dir, f"shard_{i:04d}.jsonl"), "w") for i in range(num_shards)] + + count = 0 + try: + with open(jsonl_path, "r") as f_in: + for line in f_in: + writers[count % num_shards].write(line) + count += 1 + finally: + for w in writers: + w.close() + + with open(marker, "w") as f: + f.write(f"{count} samples\n") + + logger.info(f"Split {count} samples into {num_shards} shards") + return shard_dir + + +def require_module(module_name, pip_hint=None): + """Fail fast if module is missing.""" + try: + return importlib.import_module(module_name) + except Exception as e: + hint = f" Please install it first: {pip_hint}" if pip_hint else "" + raise RuntimeError(f"Missing required module [{module_name}].{hint}\nOriginal error: {e}") from e + + +def precheck_environment(model_path, fail_fast=True): + """ + Precheck environment in driver process to avoid hanging inside Ray actors. + """ + logger.info("=" * 80) + logger.info("Prechecking environment before starting Ray actors") + logger.info("=" * 80) + + # Basic env + logger.info(f"Python executable: {sys.executable}") + logger.info(f"Python version: {sys.version}") + logger.info(f'HF_ENDPOINT={os.environ.get("HF_ENDPOINT")}') + + # Model path + if not os.path.exists(model_path): + msg = f"Model path does not exist: {model_path}" + if fail_fast: + raise FileNotFoundError(msg) + logger.warning(msg) + else: + logger.info(f"Model path exists: {model_path}") + + # Required modules + require_module("torch", "pip install torch") + require_module("transformers", "pip install transformers") + require_module("ray", "pip install ray") + require_module("pyarrow", "pip install pyarrow") + + # This is the key one from your log + + # Torch / CUDA visibility + import torch + + logger.info(f"torch version: {torch.__version__}") + logger.info(f"torch.cuda.is_available(): {torch.cuda.is_available()}") + logger.info(f"torch.cuda.device_count(): {torch.cuda.device_count()}") + if torch.cuda.is_available(): + for i in range(torch.cuda.device_count()): + try: + logger.info(f"CUDA device {i}: {torch.cuda.get_device_name(i)}") + except Exception: + pass + + logger.info("Environment precheck passed.") + + +def init_ray(object_store_gb=300, num_gpus=8): + """Initialize Ray with better defaults.""" + # Pre-import to avoid circular import issues in Ray workers + logger.info("Pre-importing modules to avoid fsspec issues in Ray workers...") + import fsspec + import fsspec.spec + import fsspec.utils # noqa: F401 + + try: + from huggingface_hub import HfFileSystem # noqa: F401 + except ImportError: + pass # OK if not available + + import ray + + if ray.is_initialized(): + logger.info("Ray already initialized") + return + + # Check if there's a running Ray cluster + ray_address = os.environ.get("RAY_ADDRESS") + + if ray_address: + # Connect to specified cluster + logger.info(f"Connecting to Ray cluster at {ray_address}...") + ray.init(address=ray_address) + logger.info("Connected to existing Ray cluster") + else: + # Start a new local Ray instance + logger.info(f"Starting new Ray instance with {num_gpus} GPUs, {object_store_gb}GB object store...") + ray.init( + num_gpus=num_gpus, + object_store_memory=object_store_gb * 1024**3, + ) + logger.info(f"Ray initialized successfully") + + log_ray_paths() + + +def run_simple_benchmark( + data_path, + num_shards=96, + num_partitions=8, + fail_fast=True, + executor_type="ray", +): + """Run benchmark with DataJuicer + video_aesthetics_filter. + + Args: + executor_type: 'ray' (standard, uses all GPUs) or 'ray_partitioned' (partitioned). + ray_partitioned auto-detects GPU count and runs partitions concurrently. + """ + import ray # noqa: F401 + import yaml + + from data_juicer.config import init_configs + from data_juicer.core.executor.ray_executor import RayExecutor + from data_juicer.core.executor.ray_executor_partitioned import ( + PartitionedRayExecutor, + ) + + # Environment + os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") + os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") + + model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" + + # Fail fast before actors + precheck_environment(model_path=model_path, fail_fast=fail_fast) + + # Initialize Ray + init_ray(object_store_gb=300) + + # Shard data + if os.path.isfile(data_path): + data_path = split_jsonl(data_path, num_shards) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + work_dir = os.path.join(OUTPUT_DIR, f"dj_run_{timestamp}") + os.makedirs(work_dir, exist_ok=True) + + logger.info(f"Using executor type: {executor_type}") + + # Base config + cfg_dict = { + "project_name": "simple-benchmark", + "executor_type": executor_type, + "dataset_path": data_path, + "export_path": os.path.join(work_dir, "result.jsonl"), + "work_dir": work_dir, + "video_key": "videos", + "skip_op_error": False, # fail loudly + "use_cache": False, + "open_monitor": True, + "debug": False, + "auto_op_parallelism": False, # Disable auto calculation to use explicit num_proc + "process": [ + { + "video_aesthetics_filter": { + "hf_scorer_model": model_path, + "trust_remote_code": True, + "min_score": 0.4, + "max_score": 1.0, + "frame_num": 9223372036854775807, # sys.maxsize - use all frames + "reduce_mode": "avg", + "skip_op_error": False, # fail loudly during debugging + "batch_mode": True, + "num_gpus": 1, + "num_proc": 8, + }, + }, + ], + } + + # Add partition config only for ray_partitioned executor + if executor_type == "ray_partitioned": + cfg_dict["partition"] = { + "mode": "manual", + "num_of_partitions": num_partitions, + } + cfg_dict["checkpoint"] = { + "enabled": False, + } + + config_path = os.path.join(work_dir, "config.yaml") + with open(config_path, "w") as f: + yaml.dump(cfg_dict, f, allow_unicode=True, sort_keys=False) + + logger.info(f"Config saved to {config_path}") + logger.info(f"Work dir: {work_dir}") + logger.info(f"Data path: {data_path}") + if executor_type == "ray_partitioned": + logger.info(f"Num partitions: {num_partitions}") + + monitor_gpu() + + cfg = init_configs(args=["--config", config_path]) + + t0 = time.time() + if executor_type == "ray": + executor = RayExecutor(cfg) + else: + executor = PartitionedRayExecutor(cfg) + logger.info(f"Executor init ({executor_type}): {time.time() - t0:.2f}s") + + t1 = time.time() + try: + executor.run() + except Exception: + logger.exception("DataJuicer execution failed") + logger.error(f"Please inspect Ray logs under /tmp/ray/session_latest/logs/") + raise + + logger.info(f"Processing: {time.time() - t1:.2f}s") + monitor_gpu() + logger.info(f"Total: {time.time() - t0:.2f}s") + logger.info(f"Output dir: {work_dir}") + + +def run_ray_data_test(data_path, num_shards=96): + """Test raw Ray Data parallelism without DataJuicer.""" + import ray + + if os.path.isfile(data_path): + data_path = split_jsonl(data_path, num_shards) + + init_ray(object_store_gb=100) + + logger.info(f"Reading data from {data_path}") + + t0 = time.time() + ds = ray.data.read_json(data_path) + count = ds.count() + try: + num_blocks = ds.num_blocks() + except Exception: + num_blocks = "unknown_before_materialize" + logger.info(f"Loaded dataset: {count} rows, {num_blocks} blocks") + + def count_videos(row): + return {"video_count": len(row.get("videos", [])), "text_len": len(row.get("text", ""))} + + t1 = time.time() + ds = ds.map(count_videos) + result = ds.take(5) + logger.info(f"Map result: {result}") + logger.info(f"Map time: {time.time() - t1:.2f}s") + + t2 = time.time() + total = ds.count() + logger.info(f"Total rows: {total}, count time: {time.time() - t2:.2f}s") + + logger.info(f"Total time: {time.time() - t0:.2f}s") + + +def run_direct_gpu_test( + data_path, + num_shards=96, + batch_size=8, + gpu_concurrency=8, + fail_fast=True, +): + """ + Direct GPU test bypassing PartitionedRayExecutor. + This tests if Ray Data GPU actors work correctly. + """ + import pyarrow + import ray + + os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") + os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") + + model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" + + # Precheck before actor creation + precheck_environment(model_path=model_path, fail_fast=fail_fast) + + init_ray(object_store_gb=300) + + logger.info("Direct GPU Test - bypassing PartitionedRayExecutor") + monitor_gpu() + + t0 = time.time() + if os.path.isfile(data_path): + data_path = split_jsonl(data_path, num_shards) + + ds = ray.data.read_json(data_path) + row_count = ds.count() + logger.info(f"Loaded {row_count} rows in {time.time() - t0:.2f}s") + + def add_stats_column(table: pyarrow.Table): + new_column_data = [{} for _ in range(len(table))] + return table.append_column("__dj__stats__", [new_column_data]) + + ds = ds.map_batches(add_stats_column, batch_format="pyarrow") + logger.info("Added __dj__stats__ column") + + from data_juicer.ops.filter.video_aesthetics_filter import VideoAestheticsFilter + + # Create operator on driver for validation only + op_t0 = time.time() + op = VideoAestheticsFilter( + hf_scorer_model=model_path, + trust_remote_code=True, + min_score=0.4, + max_score=1.0, + frame_num=9223372036854775807, # sys.maxsize - use all frames + reduce_mode="avg", + num_gpus=1, + ) + logger.info(f"Operator init on driver: {time.time() - op_t0:.2f}s") + logger.info(f"Operator: {op._name}") + logger.info(f" use_cuda: {op.use_cuda()}") + logger.info(f" use_ray_actor: {op.use_ray_actor()}") + logger.info(f" num_gpus: {op.num_gpus}") + logger.info(f" num_proc: {op.num_proc}") + + # Restrict concurrency to available GPUs + import torch + + available_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 0 + if available_gpus <= 0: + raise RuntimeError("No CUDA GPUs visible, cannot run direct GPU test") + + gpu_concurrency = min(gpu_concurrency, available_gpus) + logger.info(f"Using gpu_concurrency={gpu_concurrency}, batch_size={batch_size}") + + # Prefer new API style: concurrency= + t1 = time.time() + logger.info("Creating Ray Data GPU actor pipeline...") + + try: + ds = ds.map_batches( + VideoAestheticsFilter, + fn_constructor_args=op._init_args, + fn_constructor_kwargs=op._init_kwargs, + batch_size=batch_size, + num_cpus=1, + num_gpus=1, + concurrency=gpu_concurrency, + batch_format="pyarrow", + ) + logger.info("Using map_batches(..., concurrency=...)") + except TypeError: + # Fallback for older Ray versions + from ray.data import ActorPoolStrategy + + logger.warning("Ray version does not support concurrency= here, fallback to ActorPoolStrategy") + ds = ds.map_batches( + VideoAestheticsFilter, + fn_constructor_args=op._init_args, + fn_constructor_kwargs=op._init_kwargs, + batch_size=batch_size, + num_cpus=1, + num_gpus=1, + compute=ActorPoolStrategy(size=gpu_concurrency), + batch_format="pyarrow", + ) + + logger.info("Executing pipeline...") + t2 = time.time() + try: + result = ds.materialize() + except Exception: + logger.exception("Direct GPU pipeline execution failed") + logger.error("Please inspect /tmp/ray/session_latest/logs/") + raise + + logger.info(f"Pipeline execution: {time.time() - t2:.2f}s") + + count = result.count() + logger.info(f"Result: {count} rows") + + monitor_gpu() + logger.info(f"Total time: {time.time() - t0:.2f}s") + logger.info(f"Pipeline setup time: {time.time() - t1:.2f}s") + + +def main(): + parser = argparse.ArgumentParser(description="Simple benchmark") + parser.add_argument( + "--caption-jsonl", + type=str, + default="/mnt/workspace/miaoxiang.zfr/data/Youku-AliceMind/caption_val_abs_6k.jsonl", + ) + parser.add_argument( + "--video-dir", + type=str, + default="/mnt/workspace/shurui.ksr/Project/data/modelscope/Youku-AliceMind/videos/caption", + ) + parser.add_argument("--num-samples", type=int, default=6000) + parser.add_argument("--num-shards", type=int, default=96) + parser.add_argument("--partitions", type=int, default=8) + parser.add_argument("--batch-size", type=int, default=8) + parser.add_argument("--gpu-concurrency", type=int, default=8) + parser.add_argument("--fail-fast", action="store_true", default=True) + parser.add_argument("--no-fail-fast", dest="fail_fast", action="store_false") + parser.add_argument("--mode", type=str, choices=["ray", "dj", "gpu", "both"], default="gpu") + parser.add_argument( + "--executor", + type=str, + choices=["ray", "ray_partitioned"], + default="ray", + help='Executor type: "ray" (standard, parallel GPUs) or "ray_partitioned" (partitioned)', + ) + args = parser.parse_args() + + log_dir, log_file = setup_logging() + logger.info(f"Arguments: {args}") + + jsonl_path = prepare_jsonl_from_caption(args.caption_jsonl, args.video_dir, args.num_samples) + + if args.mode in ["ray", "both"]: + logger.info("\n" + "=" * 60) + logger.info("Testing Ray Data parallelism") + logger.info("=" * 60) + run_ray_data_test(jsonl_path, args.num_shards) + + if args.mode in ["dj", "both"]: + logger.info("\n" + "=" * 60) + logger.info(f"Testing DataJuicer with single operator (executor={args.executor})") + logger.info("=" * 60) + run_simple_benchmark( + jsonl_path, + num_shards=args.num_shards, + num_partitions=args.partitions, + fail_fast=args.fail_fast, + executor_type=args.executor, + ) + + if args.mode == "gpu": + logger.info("\n" + "=" * 60) + logger.info("Testing Direct GPU (bypass PartitionedRayExecutor)") + logger.info("=" * 60) + run_direct_gpu_test( + jsonl_path, + num_shards=args.num_shards, + batch_size=args.batch_size, + gpu_concurrency=args.gpu_concurrency, + fail_fast=args.fail_fast, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/core/executor/test_ray_executor_partitioned.py b/tests/core/executor/test_ray_executor_partitioned.py index 9c6fec206b..2054f0c2a7 100644 --- a/tests/core/executor/test_ray_executor_partitioned.py +++ b/tests/core/executor/test_ray_executor_partitioned.py @@ -679,5 +679,221 @@ def test_dag_node_status_transitions(self): self.assertEqual(executor.pipeline_dag.nodes[node_id]["status"], "completed") +class ConcurrencyScopingTest(DataJuicerTestCaseBase): + """Unit tests for scope_op_concurrency utility.""" + + def test_gpu_op_scoping(self): + """GPU op concurrency is divided by max_concurrent_partitions.""" + from unittest.mock import MagicMock + from data_juicer.core.executor.concurrency_scoping import scope_op_concurrency + + op = MagicMock() + op.use_ray_actor.return_value = True + op.num_proc = 4 + self.assertEqual(scope_op_concurrency(op, 4), 1) + self.assertEqual(scope_op_concurrency(op, 2), 2) + self.assertEqual(scope_op_concurrency(op, 1), 4) + + def test_gpu_op_scoping_floor_min_one(self): + """Scoped concurrency never goes below 1.""" + from unittest.mock import MagicMock + from data_juicer.core.executor.concurrency_scoping import scope_op_concurrency + + op = MagicMock() + op.use_ray_actor.return_value = True + op.num_proc = 2 + self.assertEqual(scope_op_concurrency(op, 8), 1) + + def test_cpu_op_unchanged(self): + """CPU ops (use_ray_actor=False) are not scoped.""" + from unittest.mock import MagicMock + from data_juicer.core.executor.concurrency_scoping import scope_op_concurrency + + op = MagicMock() + op.use_ray_actor.return_value = False + op.num_proc = 4 + self.assertEqual(scope_op_concurrency(op, 4), 4) + + def test_auto_mode_unchanged(self): + """Auto-mode (num_proc <= 0) is not scoped.""" + from unittest.mock import MagicMock + from data_juicer.core.executor.concurrency_scoping import scope_op_concurrency + + op = MagicMock() + op.use_ray_actor.return_value = True + op.num_proc = -1 + self.assertEqual(scope_op_concurrency(op, 4), -1) + + def test_none_num_proc_unchanged(self): + """None num_proc is not scoped.""" + from unittest.mock import MagicMock + from data_juicer.core.executor.concurrency_scoping import scope_op_concurrency + + op = MagicMock() + op.use_ray_actor.return_value = True + op.num_proc = None + self.assertIsNone(scope_op_concurrency(op, 4)) + + def test_resolve_max_concurrent_explicit_int(self): + """Explicit int values are passed through.""" + from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor + self.assertEqual(PartitionedRayExecutor._resolve_max_concurrent(4), 4) + self.assertEqual(PartitionedRayExecutor._resolve_max_concurrent(1), 1) + # Minimum clamp to 1 + self.assertEqual(PartitionedRayExecutor._resolve_max_concurrent(0), 1) + + def test_resolve_max_concurrent_auto(self): + """'auto' resolves to GPU count or 1.""" + from data_juicer.core.executor.ray_executor_partitioned import PartitionedRayExecutor + result = PartitionedRayExecutor._resolve_max_concurrent("auto") + self.assertIsInstance(result, int) + self.assertGreaterEqual(result, 1) + + +class ConcurrentPartitionConfigTest(DataJuicerTestCaseBase): + """Tests for max_concurrent_partitions config parsing.""" + + root_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..') + + def setUp(self) -> None: + super().setUp() + unique_name = f'test_concurrent_cfg_{uuid.uuid4().hex[:8]}' + self.tmp_dir = os.path.join(self.root_path, 'tmp', unique_name) + os.makedirs(self.tmp_dir, exist_ok=True) + + def tearDown(self) -> None: + super().tearDown() + if os.path.exists(self.tmp_dir): + shutil.rmtree(self.tmp_dir) + + @TEST_TAG('ray') + def test_default_max_concurrent_partitions_auto(self): + """Default max_concurrent_partitions is 'auto', resolved from GPU count.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '4' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_default_conc', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_default_conc') + + executor = PartitionedRayExecutor(cfg) + # Auto-resolved: matches GPU count, or 1 if no GPUs + import ray as _ray + num_gpus = int(_ray.cluster_resources().get("GPU", 0)) + if num_gpus > 1: + self.assertEqual(executor.max_concurrent_partitions, num_gpus) + else: + self.assertEqual(executor.max_concurrent_partitions, 1) + + @TEST_TAG('ray') + def test_explicit_max_concurrent_partitions(self): + """Explicit max_concurrent_partitions is parsed correctly.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '8', + '--partition.max_concurrent_partitions', '4' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_explicit_conc', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_explicit_conc') + + executor = PartitionedRayExecutor(cfg) + self.assertEqual(executor.max_concurrent_partitions, 4) + + @TEST_TAG('ray') + def test_num_partitions_inferred_from_max_concurrent(self): + """num_of_partitions is raised to max_concurrent_partitions when too low.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '2', + '--partition.max_concurrent_partitions', '8' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_infer_partitions', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_infer_partitions') + + executor = PartitionedRayExecutor(cfg) + # num_partitions should be raised to 8 + self.assertEqual(executor.num_partitions, 8) + self.assertEqual(executor.max_concurrent_partitions, 8) + + @TEST_TAG('ray') + def test_num_partitions_not_lowered(self): + """num_of_partitions is NOT lowered when already >= max_concurrent.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '16', + '--partition.max_concurrent_partitions', '8' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_no_lower', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_no_lower') + + executor = PartitionedRayExecutor(cfg) + self.assertEqual(executor.num_partitions, 16) + self.assertEqual(executor.max_concurrent_partitions, 8) + + @TEST_TAG('ray') + def test_concurrent_execution_end2end(self): + """End-to-end test: concurrent partitions produce output.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '2', + '--partition.max_concurrent_partitions', '2' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_conc_e2e', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_conc_e2e') + + executor = PartitionedRayExecutor(cfg) + executor.run() + + self.assertTrue(os.path.exists(cfg.export_path)) + + @TEST_TAG('ray') + def test_concurrent_with_checkpointing(self): + """Concurrent execution with checkpointing enabled.""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '2', + '--partition.max_concurrent_partitions', '2', + '--checkpoint.enabled', 'true', + '--checkpoint.strategy', 'every_op' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_conc_ckpt', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_conc_ckpt') + + executor = PartitionedRayExecutor(cfg) + executor.run() + + self.assertTrue(os.path.exists(cfg.export_path)) + + # Verify checkpoint files were created + checkpoint_dir = cfg.checkpoint_dir + if os.path.exists(checkpoint_dir): + checkpoint_files = [f for f in os.listdir(checkpoint_dir) if f.endswith('.parquet')] + self.assertGreater(len(checkpoint_files), 0, "No checkpoint files were created") + + @TEST_TAG('ray') + def test_backward_compat_sequential(self): + """max_concurrent_partitions=1 uses sequential path (same as before).""" + cfg = init_configs([ + '--config', os.path.join(self.root_path, 'demos/process_on_ray/configs/demo-new-config.yaml'), + '--partition.mode', 'manual', + '--partition.num_of_partitions', '2', + '--partition.max_concurrent_partitions', '1' + ]) + cfg.export_path = os.path.join(self.tmp_dir, 'test_seq_compat', 'res.jsonl') + cfg.work_dir = os.path.join(self.tmp_dir, 'test_seq_compat') + + executor = PartitionedRayExecutor(cfg) + self.assertEqual(executor.max_concurrent_partitions, 1) + executor.run() + + self.assertTrue(os.path.exists(cfg.export_path)) + + if __name__ == '__main__': unittest.main() From 84953b4065b018e9a66a926f9d064f26a2c9f843 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 12:30:48 -0700 Subject: [PATCH 04/17] Fix GPU actor mode in concurrent partition tasks and auto-detect GPU count MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The @ray.remote partition tasks had no GPU resources assigned, causing torch.cuda.is_available() to return False inside the task. This made use_ray_actor() fall back to task mode, reloading the model on every batch instead of once per actor — explaining the 30+ minute stalls. Fix: force ray_execution_mode="actor" on ops with num_gpus > 0 after re-creating them in the remote task. Also: - max_concurrent_partitions defaults to "auto" (detect GPUs from Ray cluster), num_of_partitions auto-raised to match - perf-test.py auto-detects GPU count for num_proc instead of hardcoding 8, benefiting both ray and ray_partitioned modes --- data_juicer/core/executor/ray_executor_partitioned.py | 7 ++++++- perf-test.py | 10 +++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 293f3ab077..a53eda67aa 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -639,9 +639,14 @@ def _process_single_partition_task( if op_fusion_enabled: task_ops = fuse_operators(task_ops) - # Scope concurrency for each op + # Scope concurrency and fix actor mode for each op. + # The remote task has no GPU, so use_cuda() returns False and + # ops default to task mode (model reloads per batch). Force + # actor mode for GPU ops so the model loads once per actor. for op in task_ops: op.num_proc = scope_op_concurrency(op, max_concurrent_partitions) + if getattr(op, "num_gpus", 0) and op.num_gpus > 0: + op.ray_execution_mode = "actor" # Create local checkpoint manager ckpt_manager = RayCheckpointManager( diff --git a/perf-test.py b/perf-test.py index de6b95c723..6e1a0c6032 100644 --- a/perf-test.py +++ b/perf-test.py @@ -277,6 +277,14 @@ def run_simple_benchmark( logger.info(f"Using executor type: {executor_type}") + # Detect available GPUs from Ray cluster + import ray as _ray + + num_gpus = int(_ray.cluster_resources().get("GPU", 0)) + if num_gpus <= 0: + raise RuntimeError("No GPUs available in Ray cluster") + logger.info(f"Detected {num_gpus} GPUs in Ray cluster") + # Base config cfg_dict = { "project_name": "simple-benchmark", @@ -302,7 +310,7 @@ def run_simple_benchmark( "skip_op_error": False, # fail loudly during debugging "batch_mode": True, "num_gpus": 1, - "num_proc": 8, + "num_proc": num_gpus, }, }, ], From 2e5f84f2ea48301d08569973e1d22b9ba09db7dd Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 13:39:19 -0700 Subject: [PATCH 05/17] Fix empty partitions causing idle GPUs during concurrent processing Repartition dataset to exactly num_partitions blocks before split() to ensure even row distribution. Without this, some input blocks can be empty, causing split() to produce 0-row partitions and leaving GPUs idle (observed 6/8 GPUs utilized with 2 empty partitions). - Repartition before split: redistributes rows evenly, no data loss - Skip empty partitions as safety net in concurrent task submission --- .../core/executor/ray_executor_partitioned.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index a53eda67aa..8e06199b73 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -708,9 +708,18 @@ def _process_single_partition_task( partition_dataset.data = partition_dataset.data.materialize() return partition_dataset.data - # Submit tasks + # Submit tasks (skip empty partitions) futures = {} for i, partition in enumerate(partitions): + # Skip empty partitions to avoid wasting GPU resources + try: + row_count = partition.count() + except Exception: + row_count = -1 # can't determine, submit anyway + if row_count == 0: + logger.info(f"Partition {i}: empty (0 rows), skipping") + continue + # Check if partition is fully checkpointed before submitting latest_ckpt = self.ckpt_manager.find_latest_checkpoint(i) if latest_ckpt and latest_ckpt[0] >= len(ops) - 1: @@ -1180,7 +1189,19 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Check for existing partitioning info (resumption case) saved_info = self._load_partitioning_info() - # Split the dataset + # Repartition to exactly num_partitions blocks before splitting. + # split() distributes by blocks — if some input blocks are empty + # or the block count doesn't divide evenly, some partitions get + # 0 rows. Repartitioning redistributes rows evenly without + # dropping any data (unlike split(equal=True) which truncates). + num_blocks = dataset.data.num_blocks() + if num_blocks != self.num_partitions: + logger.info( + f"Repartitioning dataset from {num_blocks} blocks to " + f"{self.num_partitions} blocks for even splitting" + ) + dataset.data = dataset.data.repartition(self.num_partitions) + logger.info(f"Splitting dataset into {self.num_partitions} partitions (deterministic mode)...") partitions = dataset.data.split(self.num_partitions) logger.info(f"Created {len(partitions)} partitions") From 128d64120ca03f589eaffee2213d024f0ebedd63 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 14:31:58 -0700 Subject: [PATCH 06/17] Fix concurrency scoping bug causing deadlock in concurrent partition processing Set ray_execution_mode = "actor" BEFORE calling scope_op_concurrency() so that use_ray_actor() returns True and num_proc gets correctly divided by max_concurrent_partitions (e.g. 8 // 8 = 1 GPU per partition instead of 8). --- .../core/executor/ray_executor_partitioned.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 8e06199b73..3af32f71bb 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -608,7 +608,7 @@ def _process_partitions_concurrent(self, partitions, ops, partitioning_info): ckpt_op_names = getattr(self.ckpt_manager, "checkpoint_op_names", []) op_fusion_enabled = getattr(cfg, "op_fusion", False) - @ray.remote + @ray.remote(num_cpus=0) def _process_single_partition_task( partition_data, partition_id, @@ -644,9 +644,9 @@ def _process_single_partition_task( # ops default to task mode (model reloads per batch). Force # actor mode for GPU ops so the model loads once per actor. for op in task_ops: - op.num_proc = scope_op_concurrency(op, max_concurrent_partitions) if getattr(op, "num_gpus", 0) and op.num_gpus > 0: op.ray_execution_mode = "actor" + op.num_proc = scope_op_concurrency(op, max_concurrent_partitions) # Create local checkpoint manager ckpt_manager = RayCheckpointManager( @@ -1189,16 +1189,16 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Check for existing partitioning info (resumption case) saved_info = self._load_partitioning_info() - # Repartition to exactly num_partitions blocks before splitting. - # split() distributes by blocks — if some input blocks are empty - # or the block count doesn't divide evenly, some partitions get - # 0 rows. Repartitioning redistributes rows evenly without - # dropping any data (unlike split(equal=True) which truncates). + # Ensure enough blocks so split() doesn't produce empty partitions. + # split() distributes by blocks — if there are fewer non-empty + # blocks than partitions, some partitions get 0 rows. We only + # repartition UP (never down) to preserve block-level parallelism + # for GPU actors within each partition. num_blocks = dataset.data.num_blocks() - if num_blocks != self.num_partitions: + if num_blocks < self.num_partitions: logger.info( f"Repartitioning dataset from {num_blocks} blocks to " - f"{self.num_partitions} blocks for even splitting" + f"{self.num_partitions} blocks (too few blocks for {self.num_partitions} partitions)" ) dataset.data = dataset.data.repartition(self.num_partitions) From 4866ecd648ae5a2fdfc39014b1d130b89b946c51 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 14:39:59 -0700 Subject: [PATCH 07/17] repartition then split for equal partitioning --- .../core/executor/ray_executor_partitioned.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 3af32f71bb..64eaa3e1e5 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -1191,16 +1191,10 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Ensure enough blocks so split() doesn't produce empty partitions. # split() distributes by blocks — if there are fewer non-empty - # blocks than partitions, some partitions get 0 rows. We only - # repartition UP (never down) to preserve block-level parallelism - # for GPU actors within each partition. - num_blocks = dataset.data.num_blocks() - if num_blocks < self.num_partitions: - logger.info( - f"Repartitioning dataset from {num_blocks} blocks to " - f"{self.num_partitions} blocks (too few blocks for {self.num_partitions} partitions)" - ) - dataset.data = dataset.data.repartition(self.num_partitions) + # blocks than partitions, some partitions get 0 rows. + # Always repartition to num_partitions to avoid materializing the + # dataset just to check num_blocks() (which kills lazy evaluation). + dataset.data = dataset.data.repartition(self.num_partitions) logger.info(f"Splitting dataset into {self.num_partitions} partitions (deterministic mode)...") partitions = dataset.data.split(self.num_partitions) From ec446524f80a59b731e02a848787004d13cd1131 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 15:12:30 -0700 Subject: [PATCH 08/17] update design doc --- docs/design/parallel_partition_actor_reuse.md | 1189 +++-------------- 1 file changed, 218 insertions(+), 971 deletions(-) diff --git a/docs/design/parallel_partition_actor_reuse.md b/docs/design/parallel_partition_actor_reuse.md index f2d2617ab7..7284922467 100644 --- a/docs/design/parallel_partition_actor_reuse.md +++ b/docs/design/parallel_partition_actor_reuse.md @@ -1,16 +1,16 @@ -# Design Doc: Parallel Partition Processing with Actor Reuse +# Design Doc: Concurrent Partition Processing with GPU Scoping **Author:** Data-Juicer Team **Created:** 2026-03-09 -**Updated:** 2026-03-09 -**Status:** Draft -**Branch:** `feat/parallel-partition-actor-reuse` +**Updated:** 2026-03-17 +**Status:** Implemented +**Branch:** `feat/cyrusz/parallel-partition-actor-reuse` --- ## 1. Problem Statement -### Current Behavior +### Current Behavior (Before This Change) The `PartitionedRayExecutor` processes partitions **sequentially**, creating new GPU actors for each partition: @@ -32,49 +32,47 @@ For a typical video processing pipeline with 3 GPU operators and 10 partitions: - Model loading time: ~60s per operator × 3 operators × 10 partitions = **30 minutes of pure overhead** - This overhead can exceed actual processing time for smaller datasets -### Root Cause Analysis - -The issue is **not** the partitioning strategy itself. Partitioning provides: -- Memory control -- Checkpoint granularity -- Resume capability - -The issue is **actor lifecycle management**: -- Actors are created per `map_batches()` call -- Each partition triggers a new `map_batches()` call -- Ray garbage collects actors after each partition completes - --- -## 2. Proposed Solution +## 2. Implemented Solution: Concurrent Partition Processing ### Overview -Implement **shared actor pools with detached lifecycle** that persist across partitions: +Instead of sequential processing with shared actor pools (originally proposed), we implemented **concurrent partition processing** where all partitions run in parallel as independent Ray remote tasks, each with its own scoped GPU actors: ``` -┌─────────────────────────────────────────────────────────────┐ -│ Shared Actor Pool (Detached) │ -│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ -│ │ A0 │ │ A1 │ │ A2 │ │ A3 │ ← Models loaded │ -│ │GPU0 │ │GPU1 │ │GPU2 │ │GPU3 │ ONCE at start │ -│ └──────┘ └──────┘ └──────┘ └──────┘ │ -└─────────────────────────────────────────────────────────────┘ - ↑ ↑ ↑ ↑ - │ │ │ │ - ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ - │ P0 │ │ P1 │ │ P2 │ │ P3 │ - └─────┘ └─────┘ └─────┘ └─────┘ - -Partitions processed sequentially, actors reused across all +┌──────────────────────────────────────────────────────────────────┐ +│ Concurrent Partition Processing │ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Task P0 │ │ Task P1 │ │ Task P2 │ ... │ Task P7 │ │ +│ │ 1 GPU │ │ 1 GPU │ │ 1 GPU │ │ 1 GPU │ │ +│ │ Actor │ │ Actor │ │ Actor │ │ Actor │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ +│ ↕ ↕ ↕ ↕ │ +│ GPU 0 GPU 1 GPU 2 GPU 7 │ +└──────────────────────────────────────────────────────────────────┘ + +All partitions processed concurrently, each with its own scoped actor ``` +### Why Concurrent Instead of Sequential + Actor Reuse + +The original design proposed sequential processing with detached shared actor pools. During implementation, we chose concurrent processing because: + +1. **Simpler architecture**: No need for detached actor lifecycle management, pool coordination, or cross-partition actor sharing +2. **Better GPU utilization**: All GPUs are busy simultaneously instead of sequentially +3. **Natural Ray fit**: Each partition is a self-contained Ray remote task — no complex orchestration +4. **Same model loading cost**: Each GPU loads the model once per partition, but all load concurrently (~60s wall time vs. N × 60s sequential) +5. **Maintained benefits**: Checkpointing, resume, and memory control per partition are all preserved + ### Key Design Principles -1. **Keep Sequential Partition Processing**: Preserves resume capability and checkpoint granularity -2. **Detached Actor Lifecycle**: Actors persist across partitions, models load once -3. **Explicit Pool Management**: Create pools at job start, cleanup at job end -4. **Compatible with Resume**: Skip completed partitions, reuse actors for remaining +1. **Concurrent partition processing**: All partitions run in parallel (up to `max_concurrent_partitions`) +2. **Concurrency scoping**: Each partition's GPU ops get `num_proc = total_gpus // max_concurrent_partitions` actors +3. **Forced actor mode**: GPU ops are set to `ray_execution_mode = "actor"` inside the remote task (where CUDA is not visible) +4. **Per-partition checkpointing**: Each remote task manages its own checkpoint state +5. **Resume support**: Skip completed partitions on restart --- @@ -84,29 +82,29 @@ Partitions processed sequentially, actors reused across all ``` ┌─────────────────────────────────────────────────────────────────────────┐ -│ ActorReusePartitionExecutor │ +│ PartitionedRayExecutor │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ -│ │ SharedActorPoolManager │ │ -│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ -│ │ │ Pool: NSFW │ │Pool: Aesth. │ │Pool: Caption│ │ │ -│ │ │ 8 actors │ │ 16 actors │ │ 8 actors │ │ │ -│ │ │ num_gpus=1 │ │ num_gpus=0.5│ │ num_gpus=1 │ │ │ -│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ -│ └─────────────────────────────────────────────────────────────────┘ │ -│ ↑ │ -│ │ (shared across partitions) │ -│ ↓ │ -│ ┌─────────────────────────────────────────────────────────────────┐ │ -│ │ Sequential Partition Processing │ │ -│ │ │ │ -│ │ for partition_id, partition in enumerate(partitions): │ │ -│ │ if is_complete(partition_id): continue # Resume support │ │ -│ │ result = process_with_shared_actors(partition, ops) │ │ -│ │ checkpoint(partition_id, result) │ │ -│ │ │ │ +│ │ _process_partitions_concurrent() │ │ +│ │ │ │ +│ │ 1. Extract serializable config values │ │ +│ │ 2. Submit Ray remote tasks (one per partition) │ │ +│ │ 3. Collect results, union partitions │ │ │ └─────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌───────────────┼───────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Remote Task P0 │ │ Remote Task │ │ Remote Task │ ... │ +│ │ │ │ P1 │ │ P2 │ │ +│ │ - load_ops() │ │ │ │ │ │ +│ │ - force actor │ │ (same) │ │ (same) │ │ +│ │ mode for GPU │ │ │ │ │ │ +│ │ - scope conc. │ │ │ │ │ │ +│ │ - process data │ │ │ │ │ │ +│ │ - checkpoint │ │ │ │ │ │ +│ └─────────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ``` @@ -114,641 +112,167 @@ Partitions processed sequentially, actors reused across all ### 3.2 Execution Flow ``` -Phase 1: Initialization -───────────────────────── +Phase 1: Dataset Splitting +────────────────────────── Job Start │ ▼ -┌─────────────────────┐ -│ Analyze Operators │ Identify GPU operators and their requirements -└──────────┬──────────┘ - │ - ▼ -┌─────────────────────┐ -│ Create Actor Pools │ Create detached actors, load models ONCE -└──────────┬──────────┘ - │ - ▼ -┌─────────────────────┐ -│ Split into Partitions│ -└──────────┬──────────┘ - - -Phase 2: Sequential Processing with Actor Reuse -─────────────────────────────────────────────── - │ - ┌─────┴─────┐ - │ │ - ▼ ▼ -┌─────────┐ ┌─────────┐ -│ P0 │ │ P1 │ ... -└────┬────┘ └────┬────┘ - │ │ - ▼ ▼ -┌─────────────────────────────────────────┐ -│ Shared Actor Pool │ -│ (models already loaded) │ -│ │ -│ Process P0 → Checkpoint P0 │ -│ Process P1 → Checkpoint P1 │ -│ Process P2 → Checkpoint P2 │ -│ ... │ -└─────────────────────────────────────────┘ - - -Phase 3: Cleanup -──────────────── - │ - ▼ -┌─────────────────────┐ -│ Cleanup Actor Pools │ Kill detached actors -└──────────┬──────────┘ - │ - ▼ - Job End -``` +┌──────────────────────────┐ +│ Repartition to N blocks │ Ensure enough blocks for N partitions +└───────────┬──────────────┘ + │ + ▼ +┌──────────────────────────┐ +│ Split into N partitions │ Each partition gets ~equal rows +└───────────┬──────────────┘ + + +Phase 2: Concurrent Processing +────────────────────────────── + │ + ┌───────┼───────┬───────┬───── ... ─────┐ + ▼ ▼ ▼ ▼ ▼ +┌──────┐┌──────┐┌──────┐┌──────┐ ┌──────┐ +│ P0 ││ P1 ││ P2 ││ P3 │ │ P7 │ +│1 GPU ││1 GPU ││1 GPU ││1 GPU │ │1 GPU │ +└──┬───┘└──┬───┘└──┬───┘└──┬───┘ └──┬───┘ + │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ + [Load] [Load] [Load] [Load] ... [Load] ← Models load concurrently + │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ +[Process][Process][Process][Process] [Process] ← All GPUs busy + │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ + [Ckpt] [Ckpt] [Ckpt] [Ckpt] ... [Ckpt] ← Per-partition checkpoint + + +Phase 3: Merge Results +────────────────────── + └───────┴───────┴───────┴───── ... ─────┘ + │ + ▼ + ┌──────────────────┐ + │ Union partitions │ + └──────────────────┘ + │ + ▼ + Job End +``` + +### 3.3 Concurrency Scoping + +The critical mechanism that prevents GPU over-allocation: -### 3.3 Resume Flow +```python +# Inside each remote task: +for op in task_ops: + # Step 1: Force actor mode (MUST be before scope_op_concurrency) + if getattr(op, "num_gpus", 0) and op.num_gpus > 0: + op.ray_execution_mode = "actor" + # Step 2: Scope concurrency — divides num_proc by max_concurrent_partitions + op.num_proc = scope_op_concurrency(op, max_concurrent_partitions) ``` -Resume from Crash (Partition 2 was in progress) -────────────────────────────────────────────── - -Checkpoint State: -├── partition_0/ ✓ (complete) -├── partition_1/ ✓ (complete) -├── partition_2/ ✗ (incomplete - no _SUCCESS marker) -└── partition_3/ ✗ (not started) -Resume: - │ - ▼ -┌─────────────────────┐ -│ Scan Checkpoints │ Find: completed=[0,1], pending=[2,3] -└──────────┬──────────┘ - │ - ▼ -┌─────────────────────┐ -│ Create Actor Pools │ Models load once (fresh start) -└──────────┬──────────┘ - │ - ▼ -┌─────────────────────────────────────────┐ -│ for partition_id in [0, 1, 2, 3]: │ -│ if partition_id in [0, 1]: │ -│ skip (already complete) │ -│ else: │ -│ process with shared actors │ -│ checkpoint │ -└─────────────────────────────────────────┘ - │ - ▼ - Only partitions 2, 3 processed - Models loaded only ONCE for both -``` +**Why order matters:** +- The remote task runs on a CPU-only node (no GPU assigned to the task itself) +- `torch.cuda.is_available()` returns `False` in the remote task +- Without explicitly setting `ray_execution_mode = "actor"`, `use_ray_actor()` returns `False` +- `scope_op_concurrency()` only divides `num_proc` for actor-mode ops +- If actor mode is not set first, `num_proc` stays at the full value (e.g., 8), causing each partition to request all 8 GPUs → deadlock -### 3.4 Core Components +**Example with 8 GPUs, 8 partitions:** +- `num_proc` original = 8 (wants 8 GPU actors) +- `scope_op_concurrency(op, 8)` → `8 // 8 = 1` (1 GPU actor per partition) +- 8 partitions × 1 GPU = 8 GPUs total → fits exactly -#### 3.4.1 DetachedActorPool +### 3.4 Remote Task Design -```python -import ray -from ray.util.actor_pool import ActorPool -from typing import Dict, List, Any, Optional -from dataclasses import dataclass -import uuid - - -@dataclass -class ActorPoolConfig: - """Configuration for an actor pool.""" - op_class: type - op_init_args: tuple - op_init_kwargs: dict - num_gpus: float - num_cpus: float - num_actors: int - pool_id: str - - -class DetachedActorPool: - """ - A pool of detached Ray actors that persist across partitions. - - Key features: - - Actors have 'detached' lifetime (not garbage collected) - - Models loaded once during actor creation - - Explicit cleanup required at job end - """ - - def __init__(self, config: ActorPoolConfig): - self.config = config - self.actors: List[ray.actor.ActorHandle] = [] - self.pool: Optional[ActorPool] = None - self._created = False - - def initialize(self): - """Create detached actors and load models.""" - if self._created: - return - - # Create actor class with resource requirements - actor_cls = ray.remote( - num_gpus=self.config.num_gpus, - num_cpus=self.config.num_cpus, - )(self.config.op_class) - - # Create detached actors - for i in range(self.config.num_actors): - actor_name = f"{self.config.pool_id}_actor_{i}" - - # Check if actor already exists (from previous run) - try: - actor = ray.get_actor(actor_name) - logger.info(f"Reusing existing actor: {actor_name}") - except ValueError: - # Create new detached actor - actor = actor_cls.options( - name=actor_name, - lifetime="detached", - max_restarts=3, - ).remote(*self.config.op_init_args, **self.config.op_init_kwargs) - logger.info(f"Created new actor: {actor_name}") - - self.actors.append(actor) - - self.pool = ActorPool(self.actors) - self._created = True - logger.info(f"Actor pool initialized: {self.config.pool_id} with {len(self.actors)} actors") - - def map_batches(self, batches: List[Any], method_name: str = "process") -> List[Any]: - """ - Process batches using the actor pool. - - Args: - batches: List of batches to process - method_name: Name of the actor method to call - - Returns: - List of processed results - """ - if not self._created: - raise RuntimeError("Actor pool not initialized. Call initialize() first.") - - results = [] - - # Submit all batches to the pool - for batch in batches: - self.pool.submit( - lambda actor, b: getattr(actor, method_name).remote(b), - batch - ) - - # Collect results in order - while self.pool.has_next(): - result = self.pool.get_next() - results.append(result) - - return results - - def process_dataset(self, dataset: ray.data.Dataset, batch_size: int = 1000) -> ray.data.Dataset: - """ - Process a Ray Dataset using the actor pool. - - This method iterates through the dataset in batches, - submits them to the actor pool, and collects results. - """ - if not self._created: - raise RuntimeError("Actor pool not initialized. Call initialize() first.") - - processed_batches = [] - - for batch in dataset.iter_batches(batch_size=batch_size, batch_format="pyarrow"): - # Submit to pool - self.pool.submit( - lambda actor, b: actor.process.remote(b), - batch - ) - - # Collect all results - while self.pool.has_next(): - result = self.pool.get_next() - processed_batches.append(result) - - # Convert back to Ray Dataset - return ray.data.from_arrow(processed_batches) - - def cleanup(self): - """Kill all actors in the pool.""" - for actor in self.actors: - try: - ray.kill(actor) - except Exception as e: - logger.warning(f"Failed to kill actor: {e}") - - self.actors = [] - self.pool = None - self._created = False - logger.info(f"Actor pool cleaned up: {self.config.pool_id}") -``` +Each partition is processed by an independent `@ray.remote(num_cpus=0)` task that: -#### 3.4.2 SharedActorPoolManager +1. **Re-creates ops from config** — avoids serialization issues with GPU operator state +2. **Forces actor mode** — sets `ray_execution_mode = "actor"` for GPU ops +3. **Scopes concurrency** — divides `num_proc` by `max_concurrent_partitions` +4. **Manages its own checkpoints** — creates a local `RayCheckpointManager` +5. **Handles resume** — checks for existing checkpoints before processing -```python -class SharedActorPoolManager: - """ - Manages shared actor pools for GPU operators across partitions. - - Responsibilities: - - Create actor pools for each unique GPU operator configuration - - Reuse pools across partitions - - Handle cleanup at job completion - """ - - def __init__(self, job_id: str = None): - self.job_id = job_id or str(uuid.uuid4())[:8] - self.pools: Dict[str, DetachedActorPool] = {} - self._initialized = False - - def initialize_pools(self, ops: List[OP], num_gpus_available: int = 8): - """ - Initialize actor pools for all GPU operators. - - Args: - ops: List of operators in the pipeline - num_gpus_available: Total GPUs available in cluster - """ - if self._initialized: - logger.info("Actor pools already initialized, skipping") - return - - for op in ops: - if not op.use_ray_actor(): - continue # Skip CPU operators - - pool_key = self._get_pool_key(op) - - if pool_key in self.pools: - continue # Pool already created for similar operator - - # Calculate number of actors based on GPU requirements - num_actors = self._calculate_num_actors(op, num_gpus_available) - - config = ActorPoolConfig( - op_class=op.__class__, - op_init_args=op._init_args, - op_init_kwargs=op._init_kwargs, - num_gpus=op.num_gpus or 1, - num_cpus=op.num_cpus or 1, - num_actors=num_actors, - pool_id=f"{self.job_id}_{op._name}", - ) - - pool = DetachedActorPool(config) - pool.initialize() - self.pools[pool_key] = pool - - logger.info(f"Created actor pool for {op._name}: {num_actors} actors, {op.num_gpus} GPUs each") - - self._initialized = True - - def get_pool(self, op: OP) -> Optional[DetachedActorPool]: - """Get the actor pool for an operator.""" - pool_key = self._get_pool_key(op) - return self.pools.get(pool_key) - - def _get_pool_key(self, op: OP) -> str: - """ - Generate a unique key for an operator's pool. - - Operators with same class and GPU requirements can share a pool. - """ - return f"{op.__class__.__name__}_{op.num_gpus}_{op.num_cpus}" - - def _calculate_num_actors(self, op: OP, num_gpus_available: int) -> int: - """Calculate optimal number of actors for an operator.""" - if op.num_gpus and op.num_gpus > 0: - # GPU operator: actors = available_gpus / gpus_per_actor - return max(1, int(num_gpus_available / op.num_gpus)) - else: - # CPU operator with actor mode: use num_proc - return op.num_proc if op.num_proc > 0 else 4 - - def cleanup_all(self): - """Cleanup all actor pools.""" - for pool_key, pool in self.pools.items(): - pool.cleanup() - - self.pools = {} - self._initialized = False - logger.info("All actor pools cleaned up") -``` +The task requests `num_cpus=0` because the actual compute is done by Ray Data actors/tasks spawned within. -#### 3.4.3 ActorReusePartitionExecutor +### 3.5 Dataset Splitting ```python -class ActorReusePartitionExecutor: - """ - Partition executor with actor reuse across partitions. - - Key features: - - Sequential partition processing (resume-friendly) - - Shared actor pools (models load once) - - Per-partition checkpointing - - Resume from last incomplete partition - """ - - def __init__(self, cfg): - self.cfg = cfg - self.num_partitions = cfg.partition.get('num_of_partitions', 10) - self.checkpoint_dir = cfg.get('checkpoint_dir', './checkpoints') - self.actor_manager = SharedActorPoolManager(job_id=cfg.get('job_id')) - - # Detect available GPUs - self.num_gpus = self._detect_gpus() - - def run(self, dataset: ray.data.Dataset, ops: List[OP]) -> ray.data.Dataset: - """ - Main execution method. - - Args: - dataset: Input Ray Dataset - ops: List of operators to apply - - Returns: - Processed Ray Dataset - """ - try: - # Phase 1: Initialize actor pools for GPU operators - logger.info("Phase 1: Initializing actor pools...") - self.actor_manager.initialize_pools(ops, self.num_gpus) - - # Phase 2: Split dataset into partitions - logger.info(f"Phase 2: Splitting dataset into {self.num_partitions} partitions...") - partitions = dataset.split(self.num_partitions) - - # Phase 3: Process partitions sequentially with actor reuse - logger.info("Phase 3: Processing partitions...") - processed_partitions = [] - - for partition_id, partition in enumerate(partitions): - # Check if partition already completed (resume support) - if self._is_partition_complete(partition_id): - logger.info(f"Partition {partition_id}: Loading from checkpoint (already complete)") - result = self._load_partition_checkpoint(partition_id) - else: - logger.info(f"Partition {partition_id}: Processing...") - result = self._process_partition(partition, ops, partition_id) - - # Checkpoint after successful processing - self._save_partition_checkpoint(partition_id, result) - logger.info(f"Partition {partition_id}: Checkpointed") - - processed_partitions.append(result) - - # Phase 4: Union all partitions - logger.info("Phase 4: Merging partitions...") - final_dataset = self._union_partitions(processed_partitions) - - return final_dataset - - finally: - # Phase 5: Cleanup actor pools - logger.info("Phase 5: Cleaning up actor pools...") - self.actor_manager.cleanup_all() - - def _process_partition( - self, - partition: ray.data.Dataset, - ops: List[OP], - partition_id: int - ) -> ray.data.Dataset: - """ - Process a single partition using shared actor pools. - - Args: - partition: The partition dataset to process - ops: List of operators to apply - partition_id: ID of this partition (for logging) - - Returns: - Processed partition dataset - """ - result = partition - - for op_idx, op in enumerate(ops): - logger.debug(f"Partition {partition_id}, Op {op_idx}: {op._name}") - - if op.use_ray_actor(): - # GPU operator: use shared actor pool - pool = self.actor_manager.get_pool(op) - if pool is None: - raise RuntimeError(f"No actor pool found for operator: {op._name}") - - result = pool.process_dataset(result, batch_size=op.batch_size or 1000) - else: - # CPU operator: use standard Ray Data processing - result = result.map_batches( - op.process, - batch_size=op.batch_size, - batch_format="pyarrow", - ) - - return result - - def _is_partition_complete(self, partition_id: int) -> bool: - """Check if a partition checkpoint exists and is complete.""" - checkpoint_path = self._get_checkpoint_path(partition_id) - success_marker = os.path.join(checkpoint_path, "_SUCCESS") - return os.path.exists(success_marker) - - def _save_partition_checkpoint(self, partition_id: int, dataset: ray.data.Dataset): - """Save partition checkpoint atomically.""" - checkpoint_path = self._get_checkpoint_path(partition_id) - temp_path = f"{checkpoint_path}.tmp" - - # Write to temp location - dataset.write_parquet(temp_path) - - # Atomic rename - if os.path.exists(checkpoint_path): - shutil.rmtree(checkpoint_path) - os.rename(temp_path, checkpoint_path) - - # Write success marker - with open(os.path.join(checkpoint_path, "_SUCCESS"), 'w') as f: - f.write(datetime.now().isoformat()) - - def _load_partition_checkpoint(self, partition_id: int) -> ray.data.Dataset: - """Load partition from checkpoint.""" - checkpoint_path = self._get_checkpoint_path(partition_id) - return ray.data.read_parquet(checkpoint_path) - - def _get_checkpoint_path(self, partition_id: int) -> str: - """Get checkpoint path for a partition.""" - return os.path.join(self.checkpoint_dir, f"partition_{partition_id:04d}") - - def _union_partitions(self, partitions: List[ray.data.Dataset]) -> ray.data.Dataset: - """Union all partitions into a single dataset.""" - if not partitions: - raise ValueError("No partitions to union") - - result = partitions[0] - for p in partitions[1:]: - result = result.union(p) - - return result - - def _detect_gpus(self) -> int: - """Detect number of available GPUs in the cluster.""" - try: - resources = ray.cluster_resources() - return int(resources.get("GPU", 0)) - except Exception: - return 8 # Default assumption +# Repartition to ensure enough blocks, then split +dataset.data = dataset.data.repartition(self.num_partitions) +partitions = dataset.data.split(self.num_partitions) ``` ---- - -## 4. Integration with Existing PartitionedRayExecutor +- `repartition(N)` ensures at least N blocks exist (lazy, adds a shuffle stage) +- `split(N)` distributes blocks across N independent `Dataset` objects +- Without repartition, split may produce empty partitions if there are fewer blocks than partitions -### 4.1 Minimal Changes Approach - -```python -# In ray_executor_partitioned.py - -class PartitionedRayExecutor: - def __init__(self, cfg): - self.cfg = cfg - # ... existing init ... - - # New: Actor pool manager for GPU operator reuse - self.actor_manager = None - self.actor_reuse_enabled = cfg.get('partition', {}).get('actor_reuse', True) - - def run(self): - """Modified run method with actor reuse support.""" - try: - # Initialize shared actor pools if enabled - if self.actor_reuse_enabled and self._has_gpu_ops(): - self._initialize_shared_actors() - - # ... existing partition processing logic ... - - finally: - # Cleanup actors - if self.actor_manager: - self.actor_manager.cleanup_all() - - def _initialize_shared_actors(self): - """Initialize shared actor pools for GPU operators.""" - self.actor_manager = SharedActorPoolManager( - job_id=self.cfg.get('job_id', str(uuid.uuid4())[:8]) - ) - self.actor_manager.initialize_pools(self.ops, self._detect_gpus()) - - def _process_partition_with_actor_reuse(self, partition, ops): - """Process partition using shared actors for GPU ops.""" - result = partition - - for op in ops: - if op.use_ray_actor() and self.actor_manager: - # Use shared actor pool - pool = self.actor_manager.get_pool(op) - result = pool.process_dataset(result) - else: - # Standard processing - result = self._apply_op_standard(result, op) - - return result -``` +--- -### 4.2 Configuration Changes +## 4. Configuration ```yaml -# New configuration options - partition: - mode: 'auto' - num_of_partitions: 10 - - # Actor reuse settings (NEW) - actor_reuse: true # Enable actor reuse across partitions - actor_pool: - max_restarts: 3 # Actor restart limit on failure - reuse_across_similar_ops: true # Share pool for ops with same config + mode: 'auto' # 'auto' | 'manual' + num_of_partitions: 8 # Number of partitions + max_concurrent_partitions: 8 # Max partitions running in parallel + +checkpoint: + enabled: true + dir: './checkpoints' + strategy: 'per_op' # Checkpoint after each operator ``` +The `max_concurrent_partitions` parameter controls how many partitions run simultaneously and how GPU resources are divided. Setting it equal to the number of GPUs (one partition per GPU) is typical for GPU-bound workloads. + --- -## 5. Comparison: Before vs After +## 5. Performance Comparison -### Timeline Comparison +### Timeline: Sequential vs Concurrent -**Before (No Actor Reuse):** +**Before (Sequential, no actor reuse):** ``` Time ────────────────────────────────────────────────────────────────────▶ -Partition 0: [Load Model 60s][Process 30s][GC] -Partition 1: [Load Model 60s][Process 30s][GC] -Partition 2: [Load 60s][Process 30s] +P0: [Load 60s][Process 120s][GC] +P1: [Load 60s][Process 120s][GC] +P2: [Load 60s][Process 120s] -Total: 3 × (60s + 30s) = 270s -Model loads: 3 +Total: 3 × (60 + 120) = 540s +GPU idle: ~67% of total time ``` -**After (With Actor Reuse):** +**After (Concurrent, 8 partitions on 8 GPUs):** ``` Time ────────────────────────────────────────────────────────────────────▶ -Actor Pool: [Load Model 60s]─────────────────────────────────────────[Cleanup] -Partition 0: [Process 30s] -Partition 1: [Process 30s] -Partition 2: [Process 30s] +P0: [Load 60s][Process 120s] +P1: [Load 60s][Process 120s] ← All load concurrently +P2: [Load 60s][Process 120s] +... +P7: [Load 60s][Process 120s] -Total: 60s + 3 × 30s = 150s -Model loads: 1 +Total: 60 + 120 = 180s (wall time) +GPU idle: ~0% during processing ``` -**Speedup: 1.8x** (more significant with more partitions) - -### Performance Analysis +### Observed Results -| Metric | Before | After | Improvement | -|--------|--------|-------|-------------| -| Model Loading | N × T | 1 × T | **Nx faster** | -| GPU Idle Time | High (between partitions) | Low (continuous) | **Significant** | -| Memory Efficiency | Good (per-partition) | Good (same) | No change | -| Resume Support | Yes | Yes | No change | -| Checkpoint Granularity | Per-partition | Per-partition | No change | +**Setup:** 8× A100 80GB, 6000 video samples, VideoAestheticsFilter -Where N = number of partitions, T = model loading time +| Mode | Time | GPU Utilization | +|------|------|-----------------| +| Pure GPU (no partitioning) | ~1100s | 100% on all 8 GPUs | +| Concurrent partitions (8) | ~1100-1300s | 100% on all 8 GPUs | +| Sequential (old, deadlocked) | ∞ (deadlock) | 8/8 GPU allocated, 14+ pending | -### Benchmark Projection - -**Setup:** -- 8 GPUs (A100 80GB) -- 10,000 video samples -- 10 partitions -- 3 GPU operators (NSFW ~30s load, Aesthetics ~20s load, Captioning ~60s load) -- Processing: ~100s per partition - -**Before:** -``` -Model loading: 10 × (30 + 20 + 60) = 1100s (~18 min) -Processing: 10 × 100s = 1000s (~17 min) -Total: ~35 minutes -``` - -**After:** -``` -Model loading: 1 × (30 + 20 + 60) = 110s (~2 min) -Processing: 10 × 100s = 1000s (~17 min) -Total: ~19 minutes -``` - -**Speedup: ~1.8x** +The concurrent approach matches pure GPU mode performance while adding partition-level checkpointing and resume capability. --- @@ -756,361 +280,84 @@ Total: ~19 minutes ### Checkpoint Structure +Each remote task manages its own checkpoints: + ``` checkpoints/ -├── job_metadata.json # Job-level metadata -├── partition_0000/ -│ ├── data.parquet -│ └── _SUCCESS # Completion marker -├── partition_0001/ -│ ├── data.parquet -│ └── _SUCCESS -├── partition_0002/ -│ └── data.parquet # No _SUCCESS = incomplete -└── partition_0003/ # Directory doesn't exist = not started +├── partitioning_info.json # Partition metadata for validation +├── partition_0/ +│ ├── op_0_video_aesthetics_filter/ +│ │ ├── data.parquet +│ │ └── _SUCCESS +│ └── ... +├── partition_1/ +│ └── ... +└── ... ``` -### Resume Logic - -```python -def find_resume_point(checkpoint_dir: str, num_partitions: int) -> List[int]: - """ - Find which partitions need processing. +### Resume Flow - Returns: - List of partition IDs that need processing - """ - pending = [] - - for partition_id in range(num_partitions): - checkpoint_path = f"{checkpoint_dir}/partition_{partition_id:04d}" - success_marker = f"{checkpoint_path}/_SUCCESS" - - if not os.path.exists(success_marker): - pending.append(partition_id) - - return pending ``` +Resume from Crash (Partition 2 was in progress) +────────────────────────────────────────────── -### Atomic Checkpoint Write - -```python -def atomic_checkpoint_write(dataset, checkpoint_path): - """ - Write checkpoint atomically to prevent corruption. - - Steps: - 1. Write to temp location - 2. Verify write succeeded - 3. Atomic rename to final location - 4. Write success marker - """ - temp_path = f"{checkpoint_path}.tmp.{uuid.uuid4()}" - - try: - # Step 1: Write to temp - dataset.write_parquet(temp_path) - - # Step 2: Verify - test_read = ray.data.read_parquet(temp_path) - if test_read.count() != dataset.count(): - raise RuntimeError("Checkpoint verification failed") - - # Step 3: Atomic rename - if os.path.exists(checkpoint_path): - shutil.rmtree(checkpoint_path) - os.rename(temp_path, checkpoint_path) - - # Step 4: Success marker - with open(f"{checkpoint_path}/_SUCCESS", 'w') as f: - f.write(json.dumps({ - 'timestamp': datetime.now().isoformat(), - 'row_count': dataset.count(), - })) - - except Exception as e: - # Cleanup temp on failure - if os.path.exists(temp_path): - shutil.rmtree(temp_path) - raise +1. Load partitioning_info.json +2. Validate current partitions match saved metadata +3. Submit all partition tasks concurrently +4. Each task independently: + - Checks its own checkpoint state + - Skips completed ops (loads from checkpoint) + - Resumes from last incomplete op +5. Collect results and union ``` --- ## 7. Error Handling -### Actor Failure Recovery - -```python -class DetachedActorPool: - def _handle_actor_failure(self, actor_idx: int, error: Exception): - """ - Handle actor failure with restart. - - Ray's 'max_restarts' handles automatic restart, - but we may need to re-add to pool. - """ - logger.warning(f"Actor {actor_idx} failed: {error}") - - # Check if actor was restarted by Ray - actor = self.actors[actor_idx] - try: - # Ping actor to check if alive - ray.get(actor.ping.remote(), timeout=5) - logger.info(f"Actor {actor_idx} recovered") - except Exception: - # Actor dead, create replacement - logger.info(f"Creating replacement for actor {actor_idx}") - self.actors[actor_idx] = self._create_actor(actor_idx) - - # Rebuild pool with updated actor list - self.pool = ActorPool(self.actors) -``` - -### Partition Failure Recovery - -```python -def _process_partition_with_retry(self, partition, ops, partition_id, max_retries=3): - """Process partition with retry on failure.""" - last_error = None - - for attempt in range(max_retries): - try: - result = self._process_partition(partition, ops, partition_id) - return result - except Exception as e: - last_error = e - logger.warning(f"Partition {partition_id} attempt {attempt + 1} failed: {e}") +### Partition Task Failure - # Check if actors need recovery - self._recover_actors_if_needed() +If a remote task fails: +- Other partitions continue processing independently +- Failed partition's actors are cleaned up by Ray +- On retry/resume, the failed partition restarts from its last checkpoint - if attempt < max_retries - 1: - time.sleep(2 ** attempt) # Exponential backoff +### GPU Resource Deadlock Prevention - raise RuntimeError(f"Partition {partition_id} failed after {max_retries} attempts: {last_error}") -``` +The concurrency scoping mechanism prevents deadlock by ensuring: +- Total GPU requests across all concurrent partitions ≤ available GPUs +- `num_proc` is divided by `max_concurrent_partitions` for actor-mode ops +- Actor mode is set before scoping (critical ordering requirement) --- -## 8. Implementation Plan - -### Phase 1: Core Actor Pool Infrastructure (3-4 days) - -- [ ] Implement `DetachedActorPool` class - - [ ] Actor creation with detached lifetime - - [ ] Batch processing methods - - [ ] Cleanup methods - - [ ] Unit tests - -- [ ] Implement `SharedActorPoolManager` class - - [ ] Pool key generation - - [ ] Pool initialization logic - - [ ] Actor count calculation - - [ ] Unit tests - -### Phase 2: Integration with PartitionedRayExecutor (2-3 days) - -- [ ] Modify `PartitionedRayExecutor.__init__` for actor manager -- [ ] Modify `_process_partition` to use shared actors -- [ ] Add configuration options -- [ ] Integration tests - -### Phase 3: Checkpointing Enhancements (2 days) - -- [ ] Implement atomic checkpoint writes -- [ ] Implement resume logic -- [ ] Add checkpoint verification -- [ ] Recovery tests +## 8. Known Limitations and Future Work -### Phase 4: Testing & Optimization (2-3 days) +1. **No actor reuse across partitions**: Each partition loads models independently. For workloads dominated by model loading time, a shared actor pool approach (the original design) could reduce overhead. -- [ ] End-to-end benchmarks -- [ ] Memory profiling -- [ ] GPU utilization monitoring -- [ ] Performance comparison tests +2. **Repartition cost**: `repartition()` adds a shuffle stage. For large datasets this is cheap relative to processing, but for small datasets it adds overhead. -### Phase 5: Documentation & Production Readiness (1-2 days) +3. **Single block per partition**: After split, each partition typically has one block, which means the entire partition is processed as a single batch by the actor. This prevents streaming output — no progress is visible until the whole partition completes. -- [ ] Update user documentation -- [ ] Add logging and metrics -- [ ] Configuration validation -- [ ] Code review and merge - -**Total Estimated Time: 10-14 days** +4. **`max_concurrent_partitions` tuning**: Must be ≤ available GPUs for GPU-bound workloads. Auto-detection sets it to the GPU count, but mixed CPU/GPU pipelines may benefit from different values. --- -## 9. Testing Strategy - -### Unit Tests - -```python -class TestDetachedActorPool: - def test_initialize_creates_actors(self): - """Test that initialization creates correct number of actors.""" - pass - - def test_actors_are_detached(self): - """Test that actors have detached lifetime.""" - pass - - def test_process_batches(self): - """Test batch processing through actor pool.""" - pass - - def test_cleanup_kills_actors(self): - """Test that cleanup properly kills all actors.""" - pass - - def test_reuse_existing_actors(self): - """Test that existing detached actors are reused.""" - pass - - -class TestSharedActorPoolManager: - def test_initialize_pools_for_gpu_ops(self): - """Test pool creation for GPU operators.""" - pass - - def test_skip_cpu_ops(self): - """Test that CPU operators don't get pools.""" - pass - - def test_pool_reuse_for_similar_ops(self): - """Test that similar ops share pools.""" - pass - - -class TestActorReusePartitionExecutor: - def test_end_to_end_processing(self): - """Test complete pipeline execution.""" - pass - - def test_resume_from_checkpoint(self): - """Test resuming from partial completion.""" - pass - - def test_actor_reuse_across_partitions(self): - """Verify actors are reused (models not reloaded).""" - pass -``` - -### Integration Tests - -```python -class TestActorReuseIntegration: - def test_with_real_gpu_operator(self): - """Test with actual GPU operator (e.g., VideoNSFWFilter).""" - pass - - def test_mixed_cpu_gpu_pipeline(self): - """Test pipeline with both CPU and GPU operators.""" - pass - - def test_checkpoint_and_resume(self): - """Test full checkpoint/resume cycle.""" - pass - - def test_actor_failure_recovery(self): - """Test recovery from actor failure.""" - pass -``` - -### Benchmark Tests +## 9. Design Decision Log -```python -class TestPerformance: - def test_model_loading_count(self): - """Verify model is loaded only once across partitions.""" - # Count model loading log messages - pass - - def test_speedup_vs_baseline(self): - """Compare performance against current implementation.""" - pass - - def test_gpu_utilization(self): - """Measure GPU utilization during processing.""" - pass -``` - ---- - -## 10. Risks and Mitigations - -| Risk | Likelihood | Impact | Mitigation | -|------|------------|--------|------------| -| Detached actors not cleaned up | Medium | Medium | Explicit cleanup in finally block, cleanup script for orphans | -| Actor pool exhaustion | Low | High | Queue-based submission, backpressure handling | -| Memory leak in long-running actors | Low | Medium | Periodic actor restart option, memory monitoring | -| Incompatibility with Ray upgrades | Low | High | Pin Ray version, abstract actor APIs | -| Checkpoint corruption on crash | Low | High | Atomic writes, verification, temp files | - ---- - -## 11. Future Enhancements - -1. **Dynamic Actor Scaling**: Scale actor pool based on queue depth -2. **Actor Health Monitoring**: Proactive detection of unhealthy actors -3. **Cross-Job Actor Reuse**: Reuse actors across multiple jobs (with same model) -4. **GPU Memory Optimization**: Pack multiple small models on single GPU -5. **Async Checkpointing**: Checkpoint in background without blocking processing - ---- - -## 12. Appendix - -### A. Glossary - -| Term | Definition | -|------|------------| -| **Detached Actor** | Ray actor with `lifetime="detached"` that persists until explicitly killed | -| **Actor Pool** | Collection of actors that process work items in parallel | -| **Partition** | A subset of the dataset processed as a unit | -| **Checkpoint** | Saved state of a partition for resume capability | - -### B. Configuration Reference - -```yaml -partition: - # Existing options - mode: 'auto' # 'auto' | 'manual' - num_of_partitions: 10 # Number of partitions - - # New options for actor reuse - actor_reuse: true # Enable actor reuse (default: true) - actor_pool: - max_restarts: 3 # Max actor restarts on failure - reuse_across_similar_ops: true # Share pool for same-config ops - cleanup_on_error: true # Cleanup actors on job failure - -checkpoint: - enabled: true - dir: './checkpoints' - atomic_write: true # Use atomic checkpoint writes - verify_on_write: true # Verify checkpoint after write -``` - -### C. Monitoring and Observability - -```python -# Metrics to expose -METRICS = { - 'actor_pool_size': Gauge, # Number of actors in pool - 'actor_restarts_total': Counter, # Total actor restarts - 'partition_processing_time': Histogram, # Time per partition - 'model_load_time': Histogram, # Model loading time - 'gpu_utilization': Gauge, # GPU utilization percentage - 'checkpoint_write_time': Histogram, # Checkpoint write time -} -``` +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Sequential vs concurrent | Concurrent | Better GPU utilization, simpler architecture | +| Shared actors vs per-partition | Per-partition | Avoids detached actor lifecycle complexity | +| Repartition before split | Always repartition | Avoids materializing dataset to check num_blocks | +| Actor mode + scoping order | Actor mode first | Required for scope_op_concurrency to work correctly | +| Remote task num_cpus | 0 | Task is just an orchestrator; actual compute uses Ray Data actors | --- ## References - [Ray Actors Documentation](https://docs.ray.io/en/latest/ray-core/actors.html) -- [Ray Actor Lifetimes](https://docs.ray.io/en/latest/ray-core/actors/named-actors.html#actor-lifetimes) - [Ray Data User Guide](https://docs.ray.io/en/latest/data/data.html) -- Data-Juicer Source: `data_juicer/core/executor/ray_executor_partitioned.py` +- Source: `data_juicer/core/executor/ray_executor_partitioned.py` +- Source: `data_juicer/core/executor/concurrency_scoping.py` From ebd09449cea54119c1ef84f1d43440d5844c29fa Mon Sep 17 00:00:00 2001 From: Cyrus Zhang Date: Tue, 17 Mar 2026 15:12:57 -0700 Subject: [PATCH 09/17] Update data_juicer/core/executor/ray_executor_partitioned.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- data_juicer/core/executor/ray_executor_partitioned.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 64eaa3e1e5..7cb6fb6b39 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -749,7 +749,7 @@ def _process_single_partition_task( # Collect results processed_partitions = [] - for i in range(len(partitions)): + for i in sorted(futures.keys()): result = futures[i] if isinstance(result, ray.ObjectRef): try: From 1eca46a6619042a0524145926c144dbb8d68cadb Mon Sep 17 00:00:00 2001 From: Cyrus Zhang Date: Tue, 17 Mar 2026 15:13:13 -0700 Subject: [PATCH 10/17] Update data_juicer/core/executor/ray_executor_partitioned.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- data_juicer/core/executor/ray_executor_partitioned.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 7cb6fb6b39..b42512ff93 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -767,6 +767,10 @@ def _process_single_partition_task( # Union results logger.info("Merging concurrently processed partitions...") + if not processed_partitions: + logger.warning("All partitions were empty or skipped. Returning an empty dataset.") + return RayDataset(ray.data.from_items([]), cfg=self.cfg) + if len(processed_partitions) == 1: merged_dataset = processed_partitions[0] else: From acd5bfc92a4ce9d305927367b7e9d550ef2c9f14 Mon Sep 17 00:00:00 2001 From: Cyrus Zhang Date: Tue, 17 Mar 2026 15:13:37 -0700 Subject: [PATCH 11/17] Update data_juicer/core/executor/ray_executor_partitioned.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- data_juicer/core/executor/ray_executor_partitioned.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index b42512ff93..6c49d5880f 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -316,7 +316,8 @@ def _resolve_max_concurrent(raw_value) -> int: if isinstance(raw_value, str) and raw_value.lower() == "auto": try: num_gpus = int(ray.cluster_resources().get("GPU", 0)) - except Exception: + except Exception as e: + logger.warning(f"Could not get GPU resources from Ray cluster, defaulting to 0. Error: {e}") num_gpus = 0 if num_gpus > 1: logger.info( From 807e1fa7c5bc475da1315c302e7ba0f5c73f55e9 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 17:29:17 -0700 Subject: [PATCH 12/17] preserve natural blocks for streaming pipelining --- .../core/executor/ray_executor_partitioned.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 6c49d5880f..b0fb3ddd6f 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -1194,13 +1194,12 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Check for existing partitioning info (resumption case) saved_info = self._load_partitioning_info() - # Ensure enough blocks so split() doesn't produce empty partitions. - # split() distributes by blocks — if there are fewer non-empty - # blocks than partitions, some partitions get 0 rows. - # Always repartition to num_partitions to avoid materializing the - # dataset just to check num_blocks() (which kills lazy evaluation). - dataset.data = dataset.data.repartition(self.num_partitions) - + # Split using the dataset's natural block structure. split() + # distributes existing blocks round-robin, so partitions inherit + # multiple blocks and Ray Data's streaming executor can pipeline + # stages within each partition. If there are fewer blocks than + # partitions, some partitions will be empty — that's handled + # downstream (empty partitions are skipped). logger.info(f"Splitting dataset into {self.num_partitions} partitions (deterministic mode)...") partitions = dataset.data.split(self.num_partitions) logger.info(f"Created {len(partitions)} partitions") From 338a1ec05b93ace9f04377af9782ae1ca5b91b91 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 17:41:13 -0700 Subject: [PATCH 13/17] skip meta collection --- .../core/executor/ray_executor_partitioned.py | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index b0fb3ddd6f..e1ff510f68 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -1194,12 +1194,18 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Check for existing partitioning info (resumption case) saved_info = self._load_partitioning_info() - # Split using the dataset's natural block structure. split() - # distributes existing blocks round-robin, so partitions inherit - # multiple blocks and Ray Data's streaming executor can pipeline - # stages within each partition. If there are fewer blocks than - # partitions, some partitions will be empty — that's handled - # downstream (empty partitions are skipped). + # Repartition to num_partitions * blocks_per_partition so that + # after split() each partition has multiple blocks. This enables + # Ray Data's streaming executor to pipeline stages (e.g. overlap + # process_batch_arrow with VideoAestheticsFilter across blocks). + # A single block per partition forces strictly sequential stages. + blocks_per_partition = 4 + total_blocks = self.num_partitions * blocks_per_partition + logger.info( + f"Repartitioning to {total_blocks} blocks " f"({blocks_per_partition} blocks/partition) for streaming..." + ) + dataset.data = dataset.data.repartition(total_blocks) + logger.info(f"Splitting dataset into {self.num_partitions} partitions (deterministic mode)...") partitions = dataset.data.split(self.num_partitions) logger.info(f"Created {len(partitions)} partitions") @@ -1219,24 +1225,16 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: self._clear_invalid_checkpoints() saved_info = None - # Collect metadata for new partitions - logger.info("Collecting partition metadata for checkpoint validation...") - total_rows = sum(p.count() for p in partitions) - partition_metadata = [] - - for i, partition in enumerate(partitions): - meta = self._collect_partition_metadata(partition, i) - partition_metadata.append(meta) - logger.debug(f"Partition {i}: {meta.row_count} rows, hash={meta.first_row_hash[:8]}...") - + # On first run, skip expensive metadata collection (count(), take()) + # which triggers redundant pipeline executions on lazy datasets. + # Save only the partition count; full metadata is not needed until + # resume validation. partitioning_info = PartitioningInfo( num_partitions=self.num_partitions, - total_rows=total_rows, - partitions=partition_metadata, + total_rows=-1, # unknown until processing completes + partitions=[], deterministic=True, ) - - # Save partitioning info self._save_partitioning_info(partitioning_info) return partitions, partitioning_info From f4e42c538e1739add0ee56c1cc56a5e9bc2688d3 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Tue, 17 Mar 2026 20:02:38 -0700 Subject: [PATCH 14/17] Remove repartition before split to preserve natural source blocks Repartition(32) was reducing the 96 natural source blocks (from JSONL shards) down to 32, losing parallelism. Let split() distribute the original blocks round-robin so each partition inherits ~12 blocks, enabling better streaming pipelining within each partition. --- .../core/executor/ray_executor_partitioned.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index e1ff510f68..ff63f01e00 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -1194,18 +1194,14 @@ def _split_dataset_deterministic(self, dataset: RayDataset) -> tuple: # Check for existing partitioning info (resumption case) saved_info = self._load_partitioning_info() - # Repartition to num_partitions * blocks_per_partition so that - # after split() each partition has multiple blocks. This enables - # Ray Data's streaming executor to pipeline stages (e.g. overlap - # process_batch_arrow with VideoAestheticsFilter across blocks). - # A single block per partition forces strictly sequential stages. - blocks_per_partition = 4 - total_blocks = self.num_partitions * blocks_per_partition - logger.info( - f"Repartitioning to {total_blocks} blocks " f"({blocks_per_partition} blocks/partition) for streaming..." - ) - dataset.data = dataset.data.repartition(total_blocks) - + # Split using the dataset's natural block structure. split() + # distributes existing blocks round-robin, so partitions inherit + # multiple blocks and Ray Data's streaming executor can pipeline + # stages within each partition. Avoid repartition() here — it + # adds a costly shuffle and may reduce block count (e.g. 96 source + # blocks repartitioned to 32 loses parallelism). If there are + # fewer blocks than partitions, some partitions will be empty — + # that's handled downstream (empty partitions are skipped). logger.info(f"Splitting dataset into {self.num_partitions} partitions (deterministic mode)...") partitions = dataset.data.split(self.num_partitions) logger.info(f"Created {len(partitions)} partitions") From 080968e2880ae8ecd999049225f1298924e11a0e Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Wed, 18 Mar 2026 09:04:28 -0700 Subject: [PATCH 15/17] add gpu-dj mode --- perf-test.py | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 159 insertions(+), 1 deletion(-) diff --git a/perf-test.py b/perf-test.py index 6e1a0c6032..80de36657f 100644 --- a/perf-test.py +++ b/perf-test.py @@ -521,6 +521,152 @@ def add_stats_column(table: pyarrow.Table): logger.info(f"Pipeline setup time: {time.time() - t1:.2f}s") +def run_direct_gpu_test_dj_match( + data_path, + num_shards=96, + batch_size=10, # DJ CUDA default + gpu_concurrency=8, + fail_fast=True, +): + """ + Direct GPU test that matches the DJ pipeline as closely as possible. + Adds: convert_to_absolute_paths, count(), columns(), filter step. + """ + from functools import partial + + import pyarrow + import ray + + os.environ["HF_ENDPOINT"] = "https://hf-mirror.com" + os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") + os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") + + model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" + + precheck_environment(model_path=model_path, fail_fast=fail_fast) + init_ray(object_store_gb=300) + + logger.info("Direct GPU Test (DJ-matched pipeline)") + monitor_gpu() + + t0 = time.time() + if os.path.isfile(data_path): + data_path = split_jsonl(data_path, num_shards) + + ds = ray.data.read_json(data_path) + + # --- Match DJ: count() before processing --- + t_count = time.time() + row_count = ds.count() + logger.info(f"count(): {row_count} rows in {time.time() - t_count:.2f}s") + + # --- Match DJ: columns() --- + t_cols = time.time() + cols = ds.columns() + logger.info(f"columns(): {cols} in {time.time() - t_cols:.2f}s") + + # --- Match DJ: convert_to_absolute_paths --- + dataset_dir = os.path.dirname(data_path) + + def convert_to_absolute_paths(batch, dataset_dir, path_keys): + for key in path_keys: + if key in batch.column_names: + col = batch.column(key) + new_col = [] + for val in col.to_pylist(): + if isinstance(val, list): + new_col.append([os.path.join(dataset_dir, p) if not os.path.isabs(p) else p for p in val]) + elif isinstance(val, str): + new_col.append(os.path.join(dataset_dir, val) if not os.path.isabs(val) else val) + else: + new_col.append(val) + idx = batch.column_names.index(key) + batch = batch.set_column(idx, key, [new_col]) + return batch + + path_keys = [k for k in ["videos", "images", "audios"] if k in cols] + if path_keys: + ds = ds.map_batches( + partial(convert_to_absolute_paths, dataset_dir=dataset_dir, path_keys=path_keys), + batch_format="pyarrow", + zero_copy_batch=True, + batch_size=1000, + ) + logger.info(f"Added convert_to_absolute_paths for keys: {path_keys}") + + # --- Match DJ: add __dj__stats__ column --- + def add_stats_column(table: pyarrow.Table): + new_column_data = [{} for _ in range(len(table))] + return table.append_column("__dj__stats__", [new_column_data]) + + ds = ds.map_batches(add_stats_column, batch_format="pyarrow", batch_size=1000) + logger.info("Added __dj__stats__ column") + + # --- Match DJ: compute_stats via actor --- + from data_juicer.ops.filter.video_aesthetics_filter import VideoAestheticsFilter + + op = VideoAestheticsFilter( + hf_scorer_model=model_path, + trust_remote_code=True, + min_score=0.4, + max_score=1.0, + frame_num=9223372036854775807, + reduce_mode="avg", + num_gpus=1, + batch_mode=True, + ) + logger.info(f"Op: {op._name}, batch_size={batch_size}, is_batched={op.is_batched_op()}") + + import torch + + available_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 0 + if available_gpus <= 0: + raise RuntimeError("No CUDA GPUs visible") + gpu_concurrency = min(gpu_concurrency, available_gpus) + logger.info(f"gpu_concurrency={gpu_concurrency}, batch_size={batch_size}") + + t1 = time.time() + ds = ds.map_batches( + VideoAestheticsFilter, + fn_constructor_args=op._init_args, + fn_constructor_kwargs=op._init_kwargs, + batch_size=batch_size, + num_gpus=1, + concurrency=gpu_concurrency, + batch_format="pyarrow", + ) + logger.info("Added compute_stats map_batches (actor mode)") + + # --- Match DJ: filter step --- + def filter_batch(batch, filter_func): + mask = pyarrow.array(filter_func(batch.to_pydict())) + return batch.filter(mask) + + ds = ds.map_batches( + partial(filter_batch, filter_func=op.process), + batch_format="pyarrow", + zero_copy_batch=True, + batch_size=1000, + ) + logger.info("Added filter_batch step") + + # --- Execute --- + logger.info("Executing full DJ-matched pipeline...") + t2 = time.time() + try: + result = ds.materialize() + except Exception: + logger.exception("Pipeline execution failed") + raise + + logger.info(f"Pipeline execution: {time.time() - t2:.2f}s") + count = result.count() + logger.info(f"Result: {count} rows (filtered from {row_count})") + monitor_gpu() + logger.info(f"Total time: {time.time() - t0:.2f}s") + logger.info(f"Pipeline time (from first map_batches): {time.time() - t1:.2f}s") + + def main(): parser = argparse.ArgumentParser(description="Simple benchmark") parser.add_argument( @@ -540,7 +686,7 @@ def main(): parser.add_argument("--gpu-concurrency", type=int, default=8) parser.add_argument("--fail-fast", action="store_true", default=True) parser.add_argument("--no-fail-fast", dest="fail_fast", action="store_false") - parser.add_argument("--mode", type=str, choices=["ray", "dj", "gpu", "both"], default="gpu") + parser.add_argument("--mode", type=str, choices=["ray", "dj", "gpu", "gpu-dj", "both"], default="gpu") parser.add_argument( "--executor", type=str, @@ -585,6 +731,18 @@ def main(): fail_fast=args.fail_fast, ) + if args.mode == "gpu-dj": + logger.info("\n" + "=" * 60) + logger.info("Testing Direct GPU (DJ-matched pipeline)") + logger.info("=" * 60) + run_direct_gpu_test_dj_match( + jsonl_path, + num_shards=args.num_shards, + batch_size=10, # DJ CUDA default + gpu_concurrency=args.gpu_concurrency, + fail_fast=args.fail_fast, + ) + if __name__ == "__main__": main() From b925795fdae8e028bbc23b7e9e89b1742cd5e442 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Wed, 18 Mar 2026 12:07:32 -0700 Subject: [PATCH 16/17] refactor: constant paths in front --- perf-test.py | 44 +++++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/perf-test.py b/perf-test.py index 80de36657f..81a81fb93c 100644 --- a/perf-test.py +++ b/perf-test.py @@ -15,16 +15,18 @@ from loguru import logger -# Data-Juicer path +# ── Paths ───────────────────────────────────────────────────────────────────── DJ_CODE_PATH = "/mnt/workspace/yileiz/data-juicer" +OUTPUT_DIR = "/mnt/workspace/yileiz/outputs/partitioned_ray/simple_workdir" +MODEL_PATH = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" +DEFAULT_CAPTION_JSONL = "/mnt/workspace/miaoxiang.zfr/data/Youku-AliceMind/caption_val_abs_6k.jsonl" +DEFAULT_VIDEO_DIR = "/mnt/workspace/shurui.ksr/Project/data/modelscope/Youku-AliceMind/videos/caption" +# ────────────────────────────────────────────────────────────────────────────── + if os.path.exists(DJ_CODE_PATH): sys.path.insert(0, DJ_CODE_PATH) -# Output directory -OUTPUT_DIR = "/mnt/workspace/yileiz/outputs/partitioned_ray/simple_workdir" - - def setup_logging(log_dir=None): """Setup logging to file and console.""" if log_dir is None: @@ -147,7 +149,7 @@ def require_module(module_name, pip_hint=None): raise RuntimeError(f"Missing required module [{module_name}].{hint}\nOriginal error: {e}") from e -def precheck_environment(model_path, fail_fast=True): +def precheck_environment(fail_fast=True): """ Precheck environment in driver process to avoid hanging inside Ray actors. """ @@ -161,13 +163,13 @@ def precheck_environment(model_path, fail_fast=True): logger.info(f'HF_ENDPOINT={os.environ.get("HF_ENDPOINT")}') # Model path - if not os.path.exists(model_path): - msg = f"Model path does not exist: {model_path}" + if not os.path.exists(MODEL_PATH): + msg = f"Model path does not exist: {MODEL_PATH}" if fail_fast: raise FileNotFoundError(msg) logger.warning(msg) else: - logger.info(f"Model path exists: {model_path}") + logger.info(f"Model path exists: {MODEL_PATH}") # Required modules require_module("torch", "pip install torch") @@ -175,8 +177,6 @@ def precheck_environment(model_path, fail_fast=True): require_module("ray", "pip install ray") require_module("pyarrow", "pip install pyarrow") - # This is the key one from your log - # Torch / CUDA visibility import torch @@ -259,10 +259,8 @@ def run_simple_benchmark( os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") - model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" - # Fail fast before actors - precheck_environment(model_path=model_path, fail_fast=fail_fast) + precheck_environment(fail_fast=fail_fast) # Initialize Ray init_ray(object_store_gb=300) @@ -301,7 +299,7 @@ def run_simple_benchmark( "process": [ { "video_aesthetics_filter": { - "hf_scorer_model": model_path, + "hf_scorer_model": MODEL_PATH, "trust_remote_code": True, "min_score": 0.4, "max_score": 1.0, @@ -415,10 +413,8 @@ def run_direct_gpu_test( os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") - model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" - # Precheck before actor creation - precheck_environment(model_path=model_path, fail_fast=fail_fast) + precheck_environment(fail_fast=fail_fast) init_ray(object_store_gb=300) @@ -445,7 +441,7 @@ def add_stats_column(table: pyarrow.Table): # Create operator on driver for validation only op_t0 = time.time() op = VideoAestheticsFilter( - hf_scorer_model=model_path, + hf_scorer_model=MODEL_PATH, trust_remote_code=True, min_score=0.4, max_score=1.0, @@ -541,9 +537,7 @@ def run_direct_gpu_test_dj_match( os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") - model_path = "/mnt/workspace/miaoxiang.zfr/models/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE" - - precheck_environment(model_path=model_path, fail_fast=fail_fast) + precheck_environment(fail_fast=fail_fast) init_ray(object_store_gb=300) logger.info("Direct GPU Test (DJ-matched pipeline)") @@ -606,7 +600,7 @@ def add_stats_column(table: pyarrow.Table): from data_juicer.ops.filter.video_aesthetics_filter import VideoAestheticsFilter op = VideoAestheticsFilter( - hf_scorer_model=model_path, + hf_scorer_model=MODEL_PATH, trust_remote_code=True, min_score=0.4, max_score=1.0, @@ -672,12 +666,12 @@ def main(): parser.add_argument( "--caption-jsonl", type=str, - default="/mnt/workspace/miaoxiang.zfr/data/Youku-AliceMind/caption_val_abs_6k.jsonl", + default=DEFAULT_CAPTION_JSONL, ) parser.add_argument( "--video-dir", type=str, - default="/mnt/workspace/shurui.ksr/Project/data/modelscope/Youku-AliceMind/videos/caption", + default=DEFAULT_VIDEO_DIR, ) parser.add_argument("--num-samples", type=int, default=6000) parser.add_argument("--num-shards", type=int, default=96) From 81b3e3f605eaf809c3366d3db68fb187b9bccb70 Mon Sep 17 00:00:00 2001 From: cyruszhang Date: Wed, 18 Mar 2026 12:43:29 -0700 Subject: [PATCH 17/17] fix uv issue --- pyproject.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 186cd6b031..c88badaab3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -186,9 +186,6 @@ requires = [ ] build-backend = "hatchling.build" -[tool.uv] -constraint-dependencies = ["kaleido==0.2.1"] - [tool.hatch.version] path = "data_juicer/__init__.py" @@ -237,6 +234,7 @@ filterwarnings = [ ] [tool.uv] +constraint-dependencies = ["kaleido==0.2.1"] override-dependencies = [ "opencv-python; sys_platform == 'never'", "opencv-python-headless; sys_platform == 'never'",