Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
478 changes: 476 additions & 2 deletions data_juicer/core/data/load_strategy.py

Large diffs are not rendered by default.

73 changes: 71 additions & 2 deletions data_juicer/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def __init__(
self.keep_stats_in_res_ds = keep_stats_in_res_ds
self.keep_hashes_in_res_ds = keep_hashes_in_res_ds
self.export_stats = export_stats
self.export_extra_args = kwargs
self.export_type = export_type
self.suffix = self._get_suffix(export_path) if export_type is None else export_type
support_dict = self._router()
if self.suffix not in support_dict:
Expand Down Expand Up @@ -159,12 +161,14 @@ def _get_suffix(self, export_path):
"""
Get the suffix of export path and check if it's supported.

We only support ["jsonl", "json", "parquet"] for now.
We only support ["jsonl", "json", "parquet", "iceberg"] for now.

:param export_path: the path to export datasets.
:return: the suffix of export_path.
"""
suffix = export_path.split(".")[-1].lower()
if self.export_type == "iceberg":
suffix = "iceberg"
return suffix

@staticmethod
Expand Down Expand Up @@ -250,14 +254,16 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
feature_fields = set(dataset.features.keys())
removed_fields = extra_fields.intersection(feature_fields)
dataset = dataset.remove_columns(removed_fields)
export_method = Exporter._router()[suffix]
export_method = Exporter._router().get(suffix, Exporter.to_parquet)
Comment thread
Dludora marked this conversation as resolved.
if self.export_shard_size <= 0:
# export the whole dataset into one single file.
logger.info("Export dataset into a single file...")
export_kwargs = {"num_proc": self.num_proc if self.export_in_parallel else 1}
# Add storage_options if available (for S3 export)
if self.storage_options is not None:
export_kwargs["storage_options"] = self.storage_options
if suffix == "iceberg":
export_kwargs["export_extra_args"] = self.export_extra_args
export_method(dataset, export_path, **export_kwargs)
self._encrypt_local_file(export_path)
else:
Expand Down Expand Up @@ -317,6 +323,8 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
# Add storage_options if available (for S3 export)
if self.storage_options is not None:
export_kwargs["storage_options"] = self.storage_options
if suffix == "iceberg":
export_kwargs["export_extra_args"] = self.export_extra_args
pool.apply_async(
export_method,
args=(
Expand Down Expand Up @@ -449,6 +457,66 @@ def to_parquet(dataset, export_path, **kwargs):
else:
dataset.to_parquet(export_path)

@staticmethod
def to_iceberg(dataset, export_path, **kwargs):
"""
Export method for iceberg target tables.
Checks for table existence/connectivity. If check fails, safe fall-back to JSON.
"""
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError

export_extra_args = kwargs.get("export_extra_args", {})
catalog_kwargs = export_extra_args.get("catalog_kwargs", {})
table_identifier = export_extra_args.get("table_identifier", export_path)

use_iceberg = False

try:
catalog = load_catalog(**catalog_kwargs)
catalog.load_table(table_identifier)
logger.info(f"Iceberg table {table_identifier} exists. Writing to Iceberg.")
use_iceberg = True

except NoSuchTableError as e:
logger.warning(
f"Iceberg target unavailable ({e.__class__.__name__}). Fallback to exporting to {export_path}..."
)
# Get pyarrow schema from HF Dataset
schema = dataset.features.arrow_schema
Comment thread
Dludora marked this conversation as resolved.
logger.info(f"Creating new Iceberg table {table_identifier} with schema: {schema}")
try:
catalog.create_table(table_identifier, schema)
use_iceberg = True
except Exception as e:
logger.error(f"Failed to create Iceberg table: {e}. Fallback to exporting to {export_path}...")
except Exception as e:
logger.error(f"Unexpected error checking Iceberg: {e}. Fallback to exporting to {export_path}...")

if use_iceberg:
try:
import daft
Comment thread
Dludora marked this conversation as resolved.

# convert huggingface dataset to daft dataframe
df = daft.from_arrow(dataset.data.table)
table = catalog.load_table(table_identifier)
df.write_iceberg(table, mode="append")
return
except Exception as e:
logger.error(f"Write to Iceberg failed during execution: {e}. Fallback to json...")

suffix = os.path.splitext(export_path)[-1].strip(".").lower()
if not suffix:
suffix = "jsonl"
logger.warning(f"No suffix found in {export_path}, using default fallback: {suffix}")

logger.info(f"Falling back to file export. Format: [{suffix}], Path: [{export_path}]")

if suffix in ["json", "jsonl"]:
return Exporter.to_jsonl(dataset, export_path, **kwargs)
else:
return Exporter.to_parquet(dataset, export_path, **kwargs)

# suffix to export method
@staticmethod
def _router():
Expand All @@ -461,4 +529,5 @@ def _router():
"jsonl": Exporter.to_jsonl,
"json": Exporter.to_json,
"parquet": Exporter.to_parquet,
"iceberg": Exporter.to_iceberg,
}
Loading
Loading