From ba5ebf488f710f76be7a1c7216062e8513553b45 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 15:59:13 -0700 Subject: [PATCH 01/19] initial implementation Signed-off-by: Daniel Shin --- python/ray/data/__init__.py | 10 + .../_internal/datasource/uc_datasource.py | 231 ------------- python/ray/data/catalog.py | 315 ++++++++++++++++++ python/ray/data/read_api.py | 120 +++++-- .../tests/datasource/test_uc_datasource.py | 297 ----------------- python/ray/data/tests/test_catalog.py | 257 ++++++++++++++ 6 files changed, 679 insertions(+), 551 deletions(-) delete mode 100644 python/ray/data/_internal/datasource/uc_datasource.py create mode 100644 python/ray/data/catalog.py delete mode 100644 python/ray/data/tests/datasource/test_uc_datasource.py create mode 100644 python/ray/data/tests/test_catalog.py diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index cc95da357aa0..bb1c8e2d3f79 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -83,6 +83,12 @@ read_videos, read_webdataset, ) +from ray.data.catalog import ( + Catalog, + ReaderFormat, + ResolvedSource, + UnityCatalog, +) # Module-level cached global functions for callable classes. It needs to be defined here # since it has to be process-global across cloudpickled funcs. @@ -192,6 +198,10 @@ "read_unity_catalog", "read_videos", "read_webdataset", + "Catalog", + "ReaderFormat", + "ResolvedSource", + "UnityCatalog", "KafkaAuthConfig", "Preprocessor", ] diff --git a/python/ray/data/_internal/datasource/uc_datasource.py b/python/ray/data/_internal/datasource/uc_datasource.py deleted file mode 100644 index a87b2b2f9910..000000000000 --- a/python/ray/data/_internal/datasource/uc_datasource.py +++ /dev/null @@ -1,231 +0,0 @@ -import atexit -import logging -import os -import tempfile -from typing import Any, Callable, Dict, Optional - -import requests - -import ray -from ray.data._internal.datasource.databricks_credentials import ( - DatabricksCredentialProvider, - request_with_401_retry, -) - -logger = logging.getLogger(__name__) - -_FILE_FORMAT_TO_RAY_READER = { - "delta": "read_delta", - "parquet": "read_parquet", -} - - -class UnityCatalogConnector: - """ - Load a Unity Catalog table or files into a Ray Dataset, handling cloud credentials automatically. - - Currently only supports Databricks-managed Unity Catalog - - Supported formats: delta, parquet. - Supports AWS, Azure, and GCP with automatic credential handoff. - """ - - def __init__( - self, - *, - table_full_name: str, - credential_provider: DatabricksCredentialProvider, - region: Optional[str] = None, - data_format: Optional[str] = "delta", - operation: str = "READ", - ray_init_kwargs: Optional[Dict] = None, - reader_kwargs: Optional[Dict] = None, - ): - self._credential_provider = credential_provider - self.base_url = self._credential_provider.get_host().rstrip("/") - if not self.base_url.startswith(("http://", "https://")): - self.base_url = f"https://{self.base_url}" - self.table_full_name = table_full_name - self.data_format = data_format.lower() if data_format else None - self.region = region - self.operation = operation - self.ray_init_kwargs = ray_init_kwargs or {} - self.reader_kwargs = reader_kwargs or {} - self._gcp_temp_file = None - - def _get_table_info(self) -> dict: - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table_full_name}" - resp = request_with_401_retry( - requests.get, - url, - self._credential_provider, - ) - data = resp.json() - self._table_info = data - self._table_id = data["table_id"] - return data - - def _get_creds(self): - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" - payload = {"table_id": self._table_id, "operation": self.operation} - resp = request_with_401_retry( - requests.post, - url, - self._credential_provider, - json=payload, - ) - self._creds_response = resp.json() - self._table_url = self._creds_response["url"] - - def _set_env(self): - env_vars = {} - creds = self._creds_response - - if "aws_temp_credentials" in creds: - aws = creds["aws_temp_credentials"] - env_vars["AWS_ACCESS_KEY_ID"] = aws["access_key_id"] - env_vars["AWS_SECRET_ACCESS_KEY"] = aws["secret_access_key"] - env_vars["AWS_SESSION_TOKEN"] = aws["session_token"] - if self.region: - env_vars["AWS_REGION"] = self.region - env_vars["AWS_DEFAULT_REGION"] = self.region - elif "azuresasuri" in creds: - env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds["azuresasuri"] - # Azure UC returns a user delegation SAS; see - # https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - elif "azure_user_delegation_sas" in creds: - azure = creds["azure_user_delegation_sas"] or {} - sas_token = ( - azure.get("sas_token") - or azure.get("sas") - or azure.get("token") - or azure.get("sasToken") - ) - if sas_token and sas_token.startswith("?"): - sas_token = sas_token[1:] - if sas_token: - env_vars["AZURE_STORAGE_SAS_TOKEN"] = sas_token - else: - known_keys = ", ".join(azure.keys()) - raise ValueError( - "Azure UC credentials missing SAS token in azure_user_delegation_sas. " - f"Available keys: {known_keys}" - ) - storage_account = azure.get("storage_account") - if storage_account: - env_vars["AZURE_STORAGE_ACCOUNT"] = storage_account - env_vars["AZURE_STORAGE_ACCOUNT_NAME"] = storage_account - elif "gcp_service_account" in creds: - gcp_json = creds["gcp_service_account"] - temp_file = tempfile.NamedTemporaryFile( - mode="w", - prefix="gcp_sa_", - suffix=".json", - delete=False, - ) - temp_file.write(gcp_json) - temp_file.close() - env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name - self._gcp_temp_file = temp_file.name - atexit.register(self._cleanup_gcp_temp_file, temp_file.name) - else: - known_keys = ", ".join(creds.keys()) - raise ValueError( - "No known credential type found in Databricks UC response. " - f"Available keys: {known_keys}" - ) - - for k, v in env_vars.items(): - os.environ[k] = v - self._runtime_env = {"env_vars": env_vars} - - @staticmethod - def _cleanup_gcp_temp_file(temp_file_path: str): - """Clean up temporary GCP service account file.""" - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except OSError: - pass - - def _infer_data_format(self) -> str: - if self.data_format: - return self.data_format - - info = self._table_info or self._get_table_info() - if "data_source_format" in info and info["data_source_format"]: - fmt = info["data_source_format"].lower() - return fmt - - storage_loc = info.get("storage_location") or getattr(self, "_table_url", None) - if storage_loc: - ext = os.path.splitext(storage_loc)[-1].replace(".", "").lower() - if ext in _FILE_FORMAT_TO_RAY_READER: - return ext - - raise ValueError("Could not infer data format from table metadata.") - - def _get_ray_reader(self, data_format: str) -> Callable[..., Any]: - fmt = data_format.lower() - if fmt in _FILE_FORMAT_TO_RAY_READER: - reader_func = getattr(ray.data, _FILE_FORMAT_TO_RAY_READER[fmt], None) - if reader_func: - return reader_func - raise ValueError(f"Unsupported data format: {fmt}") - - def _read_delta_with_credentials(self): - """Read Delta table with proper PyArrow filesystem for session tokens.""" - import pyarrow.fs as pafs - - creds = self._creds_response - reader_kwargs = self.reader_kwargs.copy() - - # For AWS, create PyArrow S3FileSystem with session tokens - if "aws_temp_credentials" in creds: - if not self.region: - raise ValueError( - "The 'region' parameter is required for AWS S3 access. " - "Please specify the AWS region (e.g., region='us-west-2')." - ) - aws = creds["aws_temp_credentials"] - filesystem = pafs.S3FileSystem( - access_key=aws["access_key_id"], - secret_key=aws["secret_access_key"], - session_token=aws["session_token"], - region=self.region, - ) - reader_kwargs["filesystem"] = filesystem - - # Call ray.data.read_delta with proper error handling - try: - return ray.data.read_delta(self._table_url, **reader_kwargs) - except Exception as e: - error_msg = str(e) - if ( - "DeletionVectors" in error_msg - or "Unsupported reader features" in error_msg - ): - raise RuntimeError( - f"Delta table uses Deletion Vectors, which requires deltalake>=0.10.0. " - f"Error: {error_msg}\n" - f"Solution: pip install --upgrade 'deltalake>=0.10.0'" - ) from e - raise - - def read(self): - self._get_table_info() - self._get_creds() - self._set_env() - - data_format = self._infer_data_format() - - if not ray.is_initialized(): - ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) - - # Use special Delta reader for proper filesystem handling - if data_format == "delta": - return self._read_delta_with_credentials() - - # Use standard reader for other formats - reader = self._get_ray_reader(data_format) - return reader(self._table_url, **self.reader_kwargs) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py new file mode 100644 index 000000000000..702bebacbeed --- /dev/null +++ b/python/ray/data/catalog.py @@ -0,0 +1,315 @@ +"""Catalog connectors for Ray Data readers. + +A :class:`Catalog` resolves a table name into a readable source (location + +credentials) for a reader such as :func:`ray.data.read_delta`, +:func:`ray.data.read_parquet`, or :func:`ray.data.read_iceberg`. + +This inverts the previous design (``read_unity_catalog``), where the +authentication layer owned the readers. Now the reader is primary and a catalog +is passed in only when authentication is required. +""" + +import atexit +import logging +import os +import tempfile +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple + +import requests + +from ray.util.annotations import PublicAPI + +if TYPE_CHECKING: + import pyarrow.fs + + from ray.data._internal.datasource.databricks_credentials import ( + DatabricksCredentialProvider, + ) + +logger = logging.getLogger(__name__) + + +@PublicAPI(stability="alpha") +class ReaderFormat(str, Enum): + """Which reader is asking the catalog to resolve a table. + + ``str``-based so it stays ergonomic (``ReaderFormat.DELTA == "delta"``) and + serializes cleanly. + """ + + DELTA = "delta" + PARQUET = "parquet" + ICEBERG = "iceberg" + + +@PublicAPI(stability="alpha") +@dataclass +class ResolvedSource: + """The output of :meth:`Catalog.resolve` — location/credentials for a reader. + + A reader consumes only the fields it understands: + + * ``read_delta``: ``path`` + (``storage_options`` and/or ``filesystem``) + * ``read_parquet``: ``path`` + ``filesystem`` + * ``read_iceberg``: ``catalog_kwargs`` + + Unused fields are ``None``. + """ + + path: Optional[str] = None + filesystem: Optional["pyarrow.fs.FileSystem"] = None + storage_options: Optional[Dict[str, Any]] = None + catalog_kwargs: Optional[Dict[str, Any]] = None + data_format: Optional[ReaderFormat] = None # hint, e.g. ReaderFormat.DELTA + + +@PublicAPI(stability="alpha") +class Catalog(ABC): + """A directory service that resolves a table name to a readable source. + + Readers depend on this small interface; the catalog knows nothing about + readers (the dependency is inverted relative to the old + ``read_unity_catalog``). + """ + + @abstractmethod + def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: + """Resolve ``table`` for the given ``reader``.""" + ... + + +@PublicAPI(stability="alpha") +class UnityCatalog(Catalog): + """Databricks Unity Catalog connector. + + For Delta and Parquet tables this performs Unity Catalog credential vending + (temporary, least-privilege cloud credentials). For Iceberg tables it + returns configuration pointing PyIceberg at Unity Catalog's Iceberg REST + catalog endpoint. + + Args: + url: Databricks workspace URL (e.g. + ``"https://dbc-XXXX.cloud.databricks.com"``). Required unless + ``credential_provider`` is given. + token: Databricks Personal Access Token with ``EXTERNAL USE SCHEMA`` + permission. Required unless ``credential_provider`` is given. + credential_provider: A custom + :class:`~ray.data._internal.datasource.databricks_credentials.DatabricksCredentialProvider`. + If provided, ``url``/``token`` are ignored. + region: AWS region for S3 access (e.g. ``"us-west-2"``). Required for + AWS-backed tables; not needed for Azure/GCP. + + Example: + >>> import ray + >>> catalog = ray.data.UnityCatalog( # doctest: +SKIP + ... url="https://dbc-XXXX.cloud.databricks.com", + ... token="dapi...", + ... region="us-west-2", + ... ) + >>> ds = ray.data.read_delta( # doctest: +SKIP + ... "main.sales.transactions", catalog=catalog + ... ) + """ + + def __init__( + self, + url: Optional[str] = None, + token: Optional[str] = None, + *, + credential_provider: Optional["DatabricksCredentialProvider"] = None, + region: Optional[str] = None, + ): + from ray.data._internal.datasource.databricks_credentials import ( + UnityCatalogCredentialConfig, + resolve_credential_provider, + ) + + self._provider = resolve_credential_provider( + UnityCatalogCredentialConfig( + credential_provider=credential_provider, url=url, token=token + ) + ) + self._region = region + self._base_url = self._normalize_host(self._provider.get_host()) + + @staticmethod + def _normalize_host(host: str) -> str: + host = host.rstrip("/") + if not host.startswith(("http://", "https://")): + host = f"https://{host}" + return host + + # ---- Catalog interface ------------------------------------------------- + def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: + if reader is ReaderFormat.ICEBERG: + return self._resolve_iceberg(table) + if reader in (ReaderFormat.DELTA, ReaderFormat.PARQUET): + return self._resolve_storage(table, reader) + raise ValueError(f"UnityCatalog does not support reader={reader!r}") + + # ---- storage-credential vending (delta / parquet) ---------------------- + def _resolve_storage(self, table: str, reader: ReaderFormat) -> ResolvedSource: + table_info = self._get_table_info(table) + creds, table_url = self._get_creds(table_info["table_id"]) + filesystem, storage_options = self._creds_to_reader_args(creds, reader) + return ResolvedSource( + path=table_url, + filesystem=filesystem, + storage_options=storage_options, + data_format=self._infer_format(table_info, table_url), + ) + + # ---- iceberg REST catalog --------------------------------------------- + def _resolve_iceberg(self, table: str) -> ResolvedSource: + # PyIceberg speaks the Iceberg REST protocol; Unity Catalog implements + # it and vends data-file credentials via the access-delegation header. + # No manual S3/ADLS/GCS keys are needed here. + return ResolvedSource( + catalog_kwargs={ + "type": "rest", + "uri": f"{self._base_url}/api/2.1/unity-catalog/iceberg", + "token": self._provider.get_token(), + "header.X-Iceberg-Access-Delegation": "vended-credentials", + }, + data_format=ReaderFormat.ICEBERG, + ) + + # ---- Unity Catalog REST helpers --------------------------------------- + def _get_table_info(self, table: str) -> dict: + from ray.data._internal.datasource.databricks_credentials import ( + request_with_401_retry, + ) + + url = f"{self._base_url}/api/2.1/unity-catalog/tables/{table}" + resp = request_with_401_retry(requests.get, url, self._provider) + return resp.json() + + def _get_creds(self, table_id: str) -> Tuple[dict, str]: + from ray.data._internal.datasource.databricks_credentials import ( + request_with_401_retry, + ) + + url = f"{self._base_url}/api/2.1/unity-catalog/temporary-table-credentials" + payload = {"table_id": table_id, "operation": "READ"} + resp = request_with_401_retry(requests.post, url, self._provider, json=payload) + creds = resp.json() + return creds, creds["url"] + + @staticmethod + def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: + """Best-effort format hint from table metadata or file extension.""" + fmt = (table_info.get("data_source_format") or "").lower() + if fmt in (ReaderFormat.DELTA.value, ReaderFormat.PARQUET.value): + return ReaderFormat(fmt) + + storage_loc = table_info.get("storage_location") or table_url + if storage_loc: + ext = os.path.splitext(storage_loc)[-1].replace(".", "").lower() + if ext in (ReaderFormat.DELTA.value, ReaderFormat.PARQUET.value): + return ReaderFormat(ext) + return None + + def _creds_to_reader_args( + self, creds: dict, reader: ReaderFormat + ) -> Tuple[Optional["pyarrow.fs.FileSystem"], Optional[Dict[str, Any]]]: + """Translate vended credentials into reader-shaped arguments. + + AWS is delivered as a serializable ``S3FileSystem`` (worker-safe, no + global env mutation). Azure/GCP retain the env-var / ``storage_options`` + delivery used historically; eliminating that residual env mutation and + making it worker-safe is tracked as follow-up work alongside credential + refresh. + """ + import pyarrow.fs as pafs + + if "aws_temp_credentials" in creds: + if not self._region: + raise ValueError( + "The 'region' parameter is required for AWS S3 access. " + "Please specify the AWS region (e.g., region='us-west-2')." + ) + aws = creds["aws_temp_credentials"] + filesystem = pafs.S3FileSystem( + access_key=aws["access_key_id"], + secret_key=aws["secret_access_key"], + session_token=aws["session_token"], + region=self._region, + ) + return filesystem, None + + if "azuresasuri" in creds or "azure_user_delegation_sas" in creds: + storage_options = self._parse_azure_creds(creds) + if reader is ReaderFormat.PARQUET: + # read_parquet has no storage_options; pyarrow's default Azure + # filesystem reads these from the process environment. + for k, v in storage_options.items(): + os.environ[k] = v + logger.warning( + "UnityCatalog: Azure credentials for read_parquet are " + "delivered via process environment variables, which is " + "driver-local. Worker propagation is a known limitation." + ) + return None, storage_options + + if "gcp_service_account" in creds: + self._write_gcp_creds(creds["gcp_service_account"]) + logger.warning( + "UnityCatalog: GCP credentials are delivered via the " + "GOOGLE_APPLICATION_CREDENTIALS environment variable, which is " + "driver-local. Worker propagation is a known limitation." + ) + return None, None + + raise ValueError( + "No known credential type found in Databricks UC response. " + f"Available keys: {', '.join(creds.keys())}" + ) + + @staticmethod + def _parse_azure_creds(creds: dict) -> Dict[str, Any]: + if "azuresasuri" in creds: + return {"AZURE_STORAGE_SAS_TOKEN": creds["azuresasuri"]} + + # Azure UC returns a user delegation SAS; see + # https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html + azure = creds["azure_user_delegation_sas"] or {} + sas_token = ( + azure.get("sas_token") + or azure.get("sas") + or azure.get("token") + or azure.get("sasToken") + ) + if sas_token and sas_token.startswith("?"): + sas_token = sas_token[1:] + if not sas_token: + raise ValueError( + "Azure UC credentials missing SAS token in " + "azure_user_delegation_sas. " + f"Available keys: {', '.join(azure.keys())}" + ) + storage_options: Dict[str, Any] = {"AZURE_STORAGE_SAS_TOKEN": sas_token} + storage_account = azure.get("storage_account") + if storage_account: + storage_options["AZURE_STORAGE_ACCOUNT"] = storage_account + storage_options["AZURE_STORAGE_ACCOUNT_NAME"] = storage_account + return storage_options + + def _write_gcp_creds(self, gcp_json: str) -> None: + temp_file = tempfile.NamedTemporaryFile( + mode="w", prefix="gcp_sa_", suffix=".json", delete=False + ) + temp_file.write(gcp_json) + temp_file.close() + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name + atexit.register(self._cleanup_gcp_temp_file, temp_file.name) + + @staticmethod + def _cleanup_gcp_temp_file(temp_file_path: str) -> None: + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except OSError: + pass diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index e885578da0c4..e4a00cb7bebd 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -63,7 +63,6 @@ from ray.data._internal.datasource.text_datasource import TextDatasource from ray.data._internal.datasource.tfrecords_datasource import TFRecordDatasource from ray.data._internal.datasource.torch_datasource import TorchDatasource -from ray.data._internal.datasource.uc_datasource import UnityCatalogConnector from ray.data._internal.datasource.video_datasource import VideoDatasource from ray.data._internal.datasource.webdataset_datasource import WebDatasetDatasource from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder @@ -115,7 +114,12 @@ _validate_head_node_resources_for_local_scheduling, ) from ray.types import ObjectRef -from ray.util.annotations import DeveloperAPI, PublicAPI, RayDeprecationWarning +from ray.util.annotations import ( + Deprecated, + DeveloperAPI, + PublicAPI, + RayDeprecationWarning, +) if TYPE_CHECKING: import daft @@ -132,6 +136,8 @@ from pyiceberg.expressions import BooleanExpression from tensorflow_metadata.proto.v0 import schema_pb2 + from ray.data.catalog import Catalog + T = TypeVar("T") logger = logging.getLogger(__name__) @@ -1129,6 +1135,7 @@ def read_parquet( paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, + catalog: Optional["Catalog"] = None, columns: Optional[List[str]] = None, parallelism: int = -1, num_cpus: Optional[float] = None, @@ -1230,6 +1237,11 @@ def read_parquet( the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the ``S3FileSystem`` is used. If ``None``, this function uses a system-chosen implementation. + catalog: An optional :class:`~ray.data.Catalog` (e.g. + :class:`~ray.data.UnityCatalog`) used to authenticate access. When + provided, ``paths`` is interpreted as a catalog table identifier + (e.g. ``"catalog.schema.table"``) rather than a filesystem path, and + the catalog resolves the physical location and credentials. columns: A list of column names to read. Only the specified columns are read during the file scan. Deprecated — use :meth:`~ray.data.Dataset.select_columns` on the returned dataset @@ -1306,6 +1318,20 @@ def read_parquet( """ _validate_shuffle_arg(shuffle) + if catalog is not None: + from ray.data.catalog import ReaderFormat + + resolved = catalog.resolve(paths, reader=ReaderFormat.PARQUET) + paths = resolved.path + if resolved.filesystem is not None: + if filesystem is not None: + logger.warning( + "Both `filesystem` and `catalog` were specified. Overriding " + "the provided `filesystem` with the catalog-resolved " + "credentials." + ) + filesystem = resolved.filesystem + # Check for deprecated filter parameter if "filter" in arrow_parquet_args: warnings.warn( @@ -4159,6 +4185,7 @@ def read_iceberg( snapshot_id: Optional[int] = None, scan_kwargs: Optional[Dict[str, str]] = None, catalog_kwargs: Optional[Dict[str, str]] = None, + catalog: Optional["Catalog"] = None, ray_remote_args: Optional[Dict[str, Any]] = None, num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, @@ -4209,6 +4236,11 @@ def read_iceberg( `pyiceberg catalog `_. + catalog: An optional :class:`~ray.data.Catalog` (e.g. + :class:`~ray.data.UnityCatalog`) used to authenticate access. When + provided, the catalog supplies ``catalog_kwargs`` pointing at its + Iceberg REST endpoint. ``catalog`` will be ignored if ``catalog_kwargs`` + is specified. ray_remote_args: Optional arguments to pass to :func:`ray.remote` in the read tasks. num_cpus: The number of CPUs to reserve for each parallel read worker. @@ -4247,6 +4279,18 @@ def read_iceberg( stacklevel=2, ) + if catalog is not None: + if catalog_kwargs: + logger.warning( + "`catalog` and `catalog_kwargs` are both specified. " + "Ignoring `catalog` and using `catalog_kwargs` instead." + ) + else: + from ray.data.catalog import ReaderFormat + + resolved = catalog.resolve(table_identifier, reader=ReaderFormat.ICEBERG) + catalog_kwargs = resolved.catalog_kwargs or {} + # Setup the Datasource datasource = IcebergDatasource( table_identifier=table_identifier, @@ -4457,7 +4501,16 @@ def read_clickhouse( ) -@PublicAPI(stability="alpha") +@Deprecated( + message=( + "``read_unity_catalog`` is deprecated. Use ``read_delta``, " + "``read_parquet``, or ``read_iceberg`` with a " + "``catalog=ray.data.UnityCatalog(...)`` instead. For example::\n\n" + " catalog = ray.data.UnityCatalog(url=..., token=..., region=...)\n" + " ds = ray.data.read_delta('main.sales.transactions', catalog=catalog)" + ), + warning=True, +) def read_unity_catalog( table: str, url: Optional[str] = None, @@ -4477,10 +4530,6 @@ def read_unity_catalog( ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request. The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet. - .. note:: - - This function is experimental and under active development. - Examples: Read a Unity Catalog Delta table: @@ -4531,25 +4580,28 @@ def read_unity_catalog( Returns: A :class:`~ray.data.Dataset` containing the data from Unity Catalog. """ # noqa: E501 - from ray.data._internal.datasource.databricks_credentials import ( - UnityCatalogCredentialConfig, - resolve_credential_provider, - ) - - resolved_provider = resolve_credential_provider( - UnityCatalogCredentialConfig( - credential_provider=credential_provider, url=url, token=token - ) - ) + from ray.data.catalog import ReaderFormat, UnityCatalog - connector = UnityCatalogConnector( - table_full_name=table, - credential_provider=resolved_provider, - data_format=data_format, + catalog = UnityCatalog( + url=url, + token=token, + credential_provider=credential_provider, region=region, - reader_kwargs=reader_kwargs, ) - return connector.read() + reader_kwargs = reader_kwargs or {} + + fmt = ReaderFormat(data_format.lower()) if data_format else None + if fmt is None: + # One extra REST call to infer the format; acceptable on the + # deprecated path. + info = catalog._get_table_info(table) + fmt = catalog._infer_format(info, info.get("storage_location")) + + if fmt is ReaderFormat.DELTA: + return read_delta(table, catalog=catalog, **reader_kwargs) + if fmt is ReaderFormat.PARQUET: + return read_parquet(table, catalog=catalog, **reader_kwargs) + raise ValueError(f"Unsupported data_format for read_unity_catalog: {fmt!r}") @PublicAPI(stability="alpha") @@ -4559,6 +4611,7 @@ def read_delta( *, storage_options: Optional[Dict[str, Any]] = None, filesystem: Optional["pyarrow.fs.FileSystem"] = None, + catalog: Optional["Catalog"] = None, columns: Optional[List[str]] = None, parallelism: int = -1, num_cpus: Optional[float] = None, @@ -4628,6 +4681,11 @@ def read_delta( the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the ``S3FileSystem`` is used. If ``None``, this function uses a system-chosen implementation. + catalog: An optional :class:`~ray.data.Catalog` (e.g. + :class:`~ray.data.UnityCatalog`) used to authenticate access. When + provided, ``path`` is interpreted as a catalog table identifier + (e.g. ``"catalog.schema.table"``) rather than a filesystem path, and + the catalog resolves the physical location and credentials. columns: A list of column names to read. Only the specified columns are read during the file scan. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. @@ -4675,6 +4733,22 @@ def read_delta( from deltalake import DeltaTable + if catalog is not None: + from ray.data.catalog import ReaderFormat + + resolved = catalog.resolve(path, reader=ReaderFormat.DELTA) + path = resolved.path + if resolved.storage_options: + storage_options = {**resolved.storage_options, **(storage_options or {})} + if resolved.filesystem is not None: + if filesystem is not None: + logger.warning( + "Both `filesystem` and `catalog` were specified. Overriding " + "the provided `filesystem` with the catalog-resolved " + "credentials." + ) + filesystem = resolved.filesystem + # This seems reasonable to keep it at one table, even Spark doesn't really support # multi-table reads, it's usually up to the developer to keep it in one table. if not isinstance(path, str): diff --git a/python/ray/data/tests/datasource/test_uc_datasource.py b/python/ray/data/tests/datasource/test_uc_datasource.py deleted file mode 100644 index 3df80b8dce5f..000000000000 --- a/python/ray/data/tests/datasource/test_uc_datasource.py +++ /dev/null @@ -1,297 +0,0 @@ -"""Tests for Unity Catalog datasource (uc_datasource.py).""" - -from unittest import mock - -import pytest - -from ray.data._internal.datasource.databricks_credentials import ( - StaticCredentialProvider, - build_headers, - request_with_401_retry, -) -from ray.data._internal.datasource.uc_datasource import ( - UnityCatalogConnector, -) -from ray.data.tests.datasource.databricks_test_utils import ( - MockResponse, - RefreshableCredentialProvider, -) - -# ============================================================================= -# Pytest fixtures -# ============================================================================= - - -@pytest.fixture -def static_credential_provider(): - """Fixture that provides a static credential provider.""" - return StaticCredentialProvider( - token="test_token", host="https://test-host.databricks.com" - ) - - -@pytest.fixture -def refreshable_credential_provider(): - """Fixture that provides a refreshable credential provider.""" - return RefreshableCredentialProvider() - - -@pytest.fixture -def requests_mocker(): - """Fixture that mocks requests.get and requests.post.""" - with mock.patch("requests.get") as mock_get: - with mock.patch("requests.post") as mock_post: - yield {"get": mock_get, "post": mock_post} - - -# ============================================================================= -# Test classes -# ============================================================================= - - -class TestBuildHeaders: - """Tests for build_headers function.""" - - def test_builds_correct_headers(self, static_credential_provider): - """Test that headers contain correct token and content type.""" - headers = build_headers(static_credential_provider) - - assert headers["Content-Type"] == "application/json" - assert headers["Authorization"] == "Bearer test_token" - - def test_fetches_fresh_token(self, refreshable_credential_provider): - """Test that token is fetched fresh each time.""" - headers1 = build_headers(refreshable_credential_provider) - assert "expired_token" in headers1["Authorization"] - - refreshable_credential_provider.invalidate() - - headers2 = build_headers(refreshable_credential_provider) - assert "refreshed_token" in headers2["Authorization"] - - -class TestRequestWith401Retry: - """Tests for request_with_401_retry function.""" - - def test_successful_request_no_retry(self, static_credential_provider): - """Test that successful request doesn't trigger retry.""" - mock_request = mock.Mock(return_value=MockResponse(status_code=200)) - - response = request_with_401_retry( - mock_request, - "https://test-url.com", - static_credential_provider, - ) - - assert response.status_code == 200 - assert mock_request.call_count == 1 - - def test_401_triggers_invalidate_and_retry(self, refreshable_credential_provider): - """Test that 401 response triggers credential invalidation and retry.""" - call_count = [0] - headers_captured = [] - - def mock_request(url, headers=None, **kwargs): - call_count[0] += 1 - headers_captured.append(headers.get("Authorization", "")) - if call_count[0] == 1: - return MockResponse(status_code=401) - return MockResponse(status_code=200) - - response = request_with_401_retry( - mock_request, - "https://test-url.com", - refreshable_credential_provider, - ) - - assert response.status_code == 200 - assert call_count[0] == 2 - assert refreshable_credential_provider.invalidate_count == 1 - assert "expired_token" in headers_captured[0] - assert "refreshed_token" in headers_captured[1] - - def test_non_401_error_raises(self, static_credential_provider): - """Test that non-401 errors are raised without retry.""" - mock_request = mock.Mock(return_value=MockResponse(status_code=500)) - - with pytest.raises(Exception, match="HTTP Error 500"): - request_with_401_retry( - mock_request, - "https://test-url.com", - static_credential_provider, - ) - - assert mock_request.call_count == 1 - - -class TestUnityCatalogConnectorInit: - """Tests for UnityCatalogConnector initialization.""" - - def test_init_with_credential_provider(self, static_credential_provider): - """Test initialization with credential provider.""" - connector = UnityCatalogConnector( - table_full_name="catalog.schema.table", - credential_provider=static_credential_provider, - ) - - assert connector.base_url == "https://test-host.databricks.com" - assert connector.table_full_name == "catalog.schema.table" - - @pytest.mark.parametrize( - "input_host,expected_url", - [ - ("test-host.databricks.com", "https://test-host.databricks.com"), - ("https://test-host.databricks.com/", "https://test-host.databricks.com"), - ("http://test-host.databricks.com", "http://test-host.databricks.com"), - ], - ids=["adds_https", "strips_trailing_slash", "preserves_http"], - ) - def test_init_normalizes_host_url(self, input_host, expected_url): - """Test that host URL is normalized correctly.""" - provider = StaticCredentialProvider(token="token", host=input_host) - - connector = UnityCatalogConnector( - table_full_name="catalog.schema.table", - credential_provider=provider, - ) - - assert connector.base_url == expected_url - - -class TestUnityCatalogConnector401Retry: - """Tests for 401 retry behavior in UnityCatalogConnector.""" - - def test_401_during_get_table_info( - self, requests_mocker, refreshable_credential_provider - ): - """Test that 401 during _get_table_info triggers retry.""" - call_count = [0] - headers_captured = [] - - def get_side_effect(url, headers=None, **kwargs): - call_count[0] += 1 - headers_captured.append(headers.get("Authorization", "")) - if call_count[0] == 1: - return MockResponse(status_code=401) - return MockResponse( - status_code=200, - _json_data={"table_id": "test_table_id", "name": "table"}, - ) - - requests_mocker["get"].side_effect = get_side_effect - - connector = UnityCatalogConnector( - table_full_name="catalog.schema.table", - credential_provider=refreshable_credential_provider, - ) - result = connector._get_table_info() - - assert result["table_id"] == "test_table_id" - assert call_count[0] == 2 - assert refreshable_credential_provider.invalidate_count == 1 - assert "expired_token" in headers_captured[0] - assert "refreshed_token" in headers_captured[1] - - def test_401_during_get_creds( - self, requests_mocker, refreshable_credential_provider - ): - """Test that 401 during _get_creds triggers retry.""" - # First set up table info - requests_mocker["get"].return_value = MockResponse( - status_code=200, - _json_data={"table_id": "test_table_id", "name": "table"}, - ) - - connector = UnityCatalogConnector( - table_full_name="catalog.schema.table", - credential_provider=refreshable_credential_provider, - ) - connector._get_table_info() - - # Reset for _get_creds test - refreshable_credential_provider.invalidate_count = 0 - refreshable_credential_provider.current_token = "expired_token" - - post_call_count = [0] - post_headers_captured = [] - - def post_side_effect(url, headers=None, **kwargs): - post_call_count[0] += 1 - post_headers_captured.append(headers.get("Authorization", "")) - if post_call_count[0] == 1: - return MockResponse(status_code=401) - return MockResponse( - status_code=200, - _json_data={"url": "s3://bucket/path"}, - ) - - requests_mocker["post"].side_effect = post_side_effect - - connector._get_creds() - - assert connector._table_url == "s3://bucket/path" - assert post_call_count[0] == 2 - assert refreshable_credential_provider.invalidate_count == 1 - assert "expired_token" in post_headers_captured[0] - assert "refreshed_token" in post_headers_captured[1] - - -class TestReadUnityCatalogAPI: - """Tests for read_unity_catalog API function.""" - - @pytest.mark.parametrize( - "credential_provider, url, token", - [ - ( - StaticCredentialProvider( - token="my_token", host="https://my-host.databricks.com" - ), - None, - None, - ), - (None, "https://my-host.databricks.com", "my_token"), - ], - ids=["with_credential_provider", "with_url_and_token"], - ) - def test_successful_read_with_valid_credentials( - self, requests_mocker, credential_provider, url, token - ): - """Test read_unity_catalog succeeds with valid credentials.""" - import ray.data - - with mock.patch.object( - UnityCatalogConnector, "read", return_value=mock.Mock() - ) as mock_read: - ray.data.read_unity_catalog( - table="catalog.schema.table", - credential_provider=credential_provider, - url=url, - token=token, - ) - mock_read.assert_called_once() - - @pytest.mark.parametrize( - "url,token", - [ - (None, None), - ("https://my-host.databricks.com", None), - (None, "my_token"), - ], - ids=["no_credentials", "only_url", "only_token"], - ) - def test_raises_with_incomplete_credentials(self, url, token): - """Test that read_unity_catalog raises when credentials are incomplete.""" - import ray.data - - with pytest.raises(ValueError, match="Either 'credential_provider' or both"): - ray.data.read_unity_catalog( - table="catalog.schema.table", - url=url, - token=token, - ) - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py new file mode 100644 index 000000000000..69d2689973b1 --- /dev/null +++ b/python/ray/data/tests/test_catalog.py @@ -0,0 +1,257 @@ +"""Tests for the Catalog connector API (ray.data.catalog).""" + +import contextlib +import pickle +from unittest import mock + +import pyarrow as pa +import pyarrow.fs as pafs +import pyarrow.parquet as pq +import pytest + +import ray +from ray.data.catalog import Catalog, ReaderFormat, ResolvedSource, UnityCatalog + +# conftest provides ray_start_regular_shared +from ray.data.tests.conftest import * # noqa: F401,F403 +from ray.data.tests.datasource.databricks_test_utils import MockResponse + +AWS_CREDS = { + "url": "s3://bucket/path", + "aws_temp_credentials": { + "access_key_id": "AKIA", + "secret_access_key": "secret", + "session_token": "token", + }, +} + + +def _mock_uc_rest(data_source_format="DELTA", creds=None): + """Patch ray.data.catalog.requests so UC REST calls return canned data.""" + creds = creds if creds is not None else AWS_CREDS + table_info = { + "table_id": "tid-123", + "data_source_format": data_source_format, + "storage_location": creds["url"], + } + patcher = mock.patch("ray.data.catalog.requests") + m = patcher.start() + m.get.return_value = MockResponse(_json_data=table_info) + m.post.return_value = MockResponse(_json_data=creds) + return patcher + + +@pytest.fixture +def uc_catalog(): + return UnityCatalog( + url="https://dbc-test.cloud.databricks.com", + token="dapi-test", + region="us-west-2", + ) + + +# --------------------------------------------------------------------------- +# UnityCatalog.resolve +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("reader", [ReaderFormat.DELTA, ReaderFormat.PARQUET]) +def test_resolve_storage_aws(uc_catalog, reader): + patcher = _mock_uc_rest() + try: + resolved = uc_catalog.resolve("main.sales.txns", reader=reader) + finally: + patcher.stop() + + assert resolved.path == "s3://bucket/path" + assert isinstance(resolved.filesystem, pafs.S3FileSystem) + assert resolved.storage_options is None + assert resolved.data_format is ReaderFormat.DELTA + + +def test_resolve_aws_requires_region(): + catalog = UnityCatalog(url="https://h.databricks.com", token="t") # no region + patcher = _mock_uc_rest() + try: + with pytest.raises(ValueError, match="region"): + catalog.resolve("main.sales.txns", reader=ReaderFormat.DELTA) + finally: + patcher.stop() + + +def test_resolve_iceberg(uc_catalog): + # Iceberg resolution does not hit the credential-vending REST endpoints. + resolved = uc_catalog.resolve("main.sales.txns", reader=ReaderFormat.ICEBERG) + + assert resolved.path is None + assert resolved.filesystem is None + assert resolved.data_format is ReaderFormat.ICEBERG + ckw = resolved.catalog_kwargs + assert ckw["type"] == "rest" + assert ckw["uri"] == ( + "https://dbc-test.cloud.databricks.com/api/2.1/unity-catalog/iceberg" + ) + assert ckw["token"] == "dapi-test" + assert ckw["header.X-Iceberg-Access-Delegation"] == "vended-credentials" + + +def test_resolve_unsupported_reader(uc_catalog): + with pytest.raises(ValueError, match="does not support"): + uc_catalog.resolve("main.sales.txns", reader="bogus") + + +def test_resolve_azure_returns_storage_options(): + catalog = UnityCatalog(url="https://h.databricks.com", token="t") + azure_creds = { + "url": "abfss://c@acct.dfs.core.windows.net/path", + "azuresasuri": "sv=2021&sig=abc", + } + patcher = _mock_uc_rest(data_source_format="DELTA", creds=azure_creds) + try: + resolved = catalog.resolve("main.sales.txns", reader=ReaderFormat.DELTA) + finally: + patcher.stop() + + assert resolved.filesystem is None + assert resolved.storage_options == {"AZURE_STORAGE_SAS_TOKEN": "sv=2021&sig=abc"} + + +# --------------------------------------------------------------------------- +# Reader integration via a fake catalog (no network) +# --------------------------------------------------------------------------- + + +class _FakeCatalog(Catalog): + """Returns a pre-baked ResolvedSource; records the reader it was asked for.""" + + def __init__(self, resolved): + self._resolved = resolved + self.calls = [] + + def resolve(self, table, *, reader): + self.calls.append((table, reader)) + return self._resolved + + +def test_read_parquet_with_catalog(ray_start_regular_shared, tmp_path): + path = str(tmp_path / "data.parquet") + pq.write_table(pa.table({"id": [1, 2, 3]}), path) + + catalog = _FakeCatalog(ResolvedSource(path=path)) + ds = ray.data.read_parquet("main.db.tbl", catalog=catalog) + + assert sorted(r["id"] for r in ds.take_all()) == [1, 2, 3] + assert catalog.calls == [("main.db.tbl", ReaderFormat.PARQUET)] + + +@pytest.mark.parametrize("reader", ["parquet", "delta"]) +def test_catalog_filesystem_overrides_with_warning(reader): + # The catalog-resolved filesystem overrides a user-supplied one, but warns. + # The warning fires at the top of the reader body; suppress any downstream + # failure from the (intentionally unreachable) s3 path. + if reader == "delta": + pytest.importorskip("deltalake") + fs = pafs.S3FileSystem( + access_key="AKIA", secret_key="secret", session_token="t", region="us-west-2" + ) + catalog = _FakeCatalog(ResolvedSource(path="s3://b/p", filesystem=fs)) + read_fn = ray.data.read_parquet if reader == "parquet" else ray.data.read_delta + + with mock.patch.object(ray.data.read_api.logger, "warning") as warn: + with contextlib.suppress(Exception): + read_fn("main.db.tbl", catalog=catalog, filesystem=pafs.LocalFileSystem()) + + assert any( + "Overriding the provided `filesystem`" in str(c) for c in warn.call_args_list + ) + + +def test_read_delta_with_catalog(ray_start_regular_shared, tmp_path): + deltalake = pytest.importorskip("deltalake") + path = str(tmp_path / "delta-table") + deltalake.write_deltalake(path, pa.table({"id": [1, 2, 3]})) + + catalog = _FakeCatalog(ResolvedSource(path=path)) + ds = ray.data.read_delta("main.db.tbl", catalog=catalog) + + assert sorted(r["id"] for r in ds.take_all()) == [1, 2, 3] + assert catalog.calls == [("main.db.tbl", ReaderFormat.DELTA)] + + +def test_read_iceberg_uses_catalog_resolved_kwargs(): + catalog = _FakeCatalog( + ResolvedSource(catalog_kwargs={"type": "rest", "uri": "u", "token": "tk"}) + ) + with mock.patch( + "ray.data._internal.datasource.iceberg_datasource.IcebergDatasource" + ) as ds_cls, mock.patch("ray.data.read_api.read_datasource"): + ray.data.read_iceberg(table_identifier="main.db.tbl", catalog=catalog) + + _, kwargs = ds_cls.call_args + assert kwargs["catalog_kwargs"] == {"type": "rest", "uri": "u", "token": "tk"} + assert catalog.calls == [("main.db.tbl", ReaderFormat.ICEBERG)] + + +def test_read_iceberg_explicit_catalog_kwargs_take_precedence(): + # When both catalog and catalog_kwargs are given, catalog is ignored. + catalog = _FakeCatalog(ResolvedSource(catalog_kwargs={"type": "rest", "uri": "u"})) + with mock.patch( + "ray.data._internal.datasource.iceberg_datasource.IcebergDatasource" + ) as ds_cls, mock.patch("ray.data.read_api.read_datasource"): + ray.data.read_iceberg( + table_identifier="main.db.tbl", + catalog=catalog, + catalog_kwargs={"type": "sql", "uri": "explicit"}, + ) + + _, kwargs = ds_cls.call_args + assert kwargs["catalog_kwargs"] == {"type": "sql", "uri": "explicit"} + assert catalog.calls == [] # catalog was not consulted + + +# --------------------------------------------------------------------------- +# Serialization +# --------------------------------------------------------------------------- + + +def test_unity_catalog_is_picklable(uc_catalog): + restored = pickle.loads(pickle.dumps(uc_catalog)) + assert isinstance(restored, UnityCatalog) + assert restored._region == "us-west-2" + + +def test_resolved_source_with_filesystem_is_picklable(): + fs = pafs.S3FileSystem( + access_key="AKIA", secret_key="secret", session_token="t", region="us-west-2" + ) + src = ResolvedSource(path="s3://b/p", filesystem=fs, data_format=ReaderFormat.DELTA) + restored = pickle.loads(pickle.dumps(src)) + assert restored.path == "s3://b/p" + assert isinstance(restored.filesystem, pafs.S3FileSystem) + assert restored.data_format is ReaderFormat.DELTA + + +# --------------------------------------------------------------------------- +# Deprecated read_unity_catalog shim +# --------------------------------------------------------------------------- + + +def test_read_unity_catalog_deprecation_delegates(): + with mock.patch("ray.data.read_api.read_delta") as read_delta: + with pytest.warns(DeprecationWarning, match="read_unity_catalog"): + ray.data.read_unity_catalog( + table="main.db.tbl", + url="https://h.databricks.com", + token="t", + data_format="delta", + ) + + read_delta.assert_called_once() + _, kwargs = read_delta.call_args + assert isinstance(kwargs["catalog"], UnityCatalog) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) From a71a9fb1f4347c78b71a6e846f176d3b9ef23383 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:04:08 -0700 Subject: [PATCH 02/19] update comment Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 702bebacbeed..94d337e7fc28 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -68,12 +68,7 @@ class ResolvedSource: @PublicAPI(stability="alpha") class Catalog(ABC): - """A directory service that resolves a table name to a readable source. - - Readers depend on this small interface; the catalog knows nothing about - readers (the dependency is inverted relative to the old - ``read_unity_catalog``). - """ + """A directory service that resolves a table name to a readable source.""" @abstractmethod def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: From ee8476f6e759d1fff8c1f96ee2ee8d9cf0ad4239 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:13:19 -0700 Subject: [PATCH 03/19] no exit handler Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 18 +++++++----------- python/ray/data/tests/test_catalog.py | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 94d337e7fc28..9a55b21d3472 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -9,7 +9,6 @@ is passed in only when authentication is required. """ -import atexit import logging import os import tempfile @@ -292,19 +291,16 @@ def _parse_azure_creds(creds: dict) -> Dict[str, Any]: storage_options["AZURE_STORAGE_ACCOUNT_NAME"] = storage_account return storage_options - def _write_gcp_creds(self, gcp_json: str) -> None: + @staticmethod + def _write_gcp_creds(gcp_json: str) -> None: + # The file is intentionally not deleted. The Ray driver may be + # long-lived there may still be references to the old file via + # GOOGLE_APPLICATION_CREDENTIALS, so removing it could break in-flight + # reads. The OS reclaims the temp directory in due course and file + # size is expected to be trivial. temp_file = tempfile.NamedTemporaryFile( mode="w", prefix="gcp_sa_", suffix=".json", delete=False ) temp_file.write(gcp_json) temp_file.close() os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name - atexit.register(self._cleanup_gcp_temp_file, temp_file.name) - - @staticmethod - def _cleanup_gcp_temp_file(temp_file_path: str) -> None: - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except OSError: - pass diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index 69d2689973b1..a61029a99d4b 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -1,6 +1,7 @@ """Tests for the Catalog connector API (ray.data.catalog).""" import contextlib +import os import pickle from unittest import mock @@ -116,6 +117,19 @@ def test_resolve_azure_returns_storage_options(): assert resolved.storage_options == {"AZURE_STORAGE_SAS_TOKEN": "sv=2021&sig=abc"} +def test_gcp_creds_written_and_env_set(monkeypatch): + monkeypatch.setenv("GOOGLE_APPLICATION_CREDENTIALS", "") + catalog = UnityCatalog(url="https://h.databricks.com", token="t") + + catalog._write_gcp_creds('{"sa": 1}') + + path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + assert os.path.exists(path) + with open(path) as f: + assert f.read() == '{"sa": 1}' + os.unlink(path) # test housekeeping; production intentionally leaves it + + # --------------------------------------------------------------------------- # Reader integration via a fake catalog (no network) # --------------------------------------------------------------------------- From 4b93c6978f668ec684aba03048197dae56fac1f6 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:16:26 -0700 Subject: [PATCH 04/19] prevent users specifying multiple table names in read_parquet Signed-off-by: Daniel Shin --- python/ray/data/read_api.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index e4a00cb7bebd..6a10402bd36e 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1227,8 +1227,8 @@ def read_parquet( pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_. Args: - paths: A single file path or directory, or a list of file paths. Multiple - directories are not supported. + paths: A single file path, directory, a list of file paths, or a table name + (when used with ``catalog``). Multiple directories/tables are not supported. filesystem: The PyArrow filesystem implementation to read from. These filesystems are specified in the `pyarrow docs Date: Wed, 17 Jun 2026 16:17:53 -0700 Subject: [PATCH 05/19] add test_catalog to BUILD.bazel Signed-off-by: Daniel Shin --- python/ray/data/BUILD.bazel | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index 4b532e5a68b5..c409f7aeb27e 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -2186,6 +2186,20 @@ py_test( ], ) +py_test( + name = "test_catalog", + size = "small", + srcs = ["tests/test_catalog.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_iter_jax", size = "small", From f56a25c4ef9061650f53b7947d1f8caadcf39952 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:20:40 -0700 Subject: [PATCH 06/19] make gemini happy for delta Signed-off-by: Daniel Shin --- python/ray/data/read_api.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 6a10402bd36e..c0320ff05b3b 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4736,6 +4736,11 @@ def read_delta( from deltalake import DeltaTable + # This seems reasonable to keep it at one table, even Spark doesn't really support + # multi-table reads, it's usually up to the developer to keep it in one table. + if not isinstance(path, str): + raise ValueError("Only a single Delta Lake table path is supported.") + if catalog is not None: from ray.data.catalog import ReaderFormat @@ -4752,11 +4757,6 @@ def read_delta( ) filesystem = resolved.filesystem - # This seems reasonable to keep it at one table, even Spark doesn't really support - # multi-table reads, it's usually up to the developer to keep it in one table. - if not isinstance(path, str): - raise ValueError("Only a single Delta Lake table path is supported.") - dt = DeltaTable(path, version=version, storage_options=storage_options) pa_dataset = dt.to_pyarrow_dataset(filesystem=filesystem) From d51ded6e8176762c27669b1bb5dcc3e1cd8da336 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:25:58 -0700 Subject: [PATCH 07/19] better format inference Signed-off-by: Daniel Shin --- python/ray/data/read_api.py | 7 ++++--- python/ray/data/tests/test_catalog.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index c0320ff05b3b..51b775b67589 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4595,10 +4595,11 @@ def read_unity_catalog( fmt = ReaderFormat(data_format.lower()) if data_format else None if fmt is None: - # One extra REST call to infer the format; acceptable on the - # deprecated path. + # Infer the format from table metadata, falling back to the vended + # credential URL's extension. info = catalog._get_table_info(table) - fmt = catalog._infer_format(info, info.get("storage_location")) + _, table_url = catalog._get_creds(info["table_id"]) + fmt = catalog._infer_format(info, table_url) if fmt is ReaderFormat.DELTA: return read_delta(table, catalog=catalog, **reader_kwargs) diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index a61029a99d4b..07125af92035 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -265,6 +265,26 @@ def test_read_unity_catalog_deprecation_delegates(): assert isinstance(kwargs["catalog"], UnityCatalog) +def test_read_unity_catalog_infers_format_from_cred_url(): + # Metadata omits both data_source_format and storage_location; the vended + # credential URL extension must still identify the format. + patcher = mock.patch("ray.data.catalog.requests") + m = patcher.start() + m.get.return_value = MockResponse(_json_data={"table_id": "tid"}) + m.post.return_value = MockResponse(_json_data={"url": "s3://bucket/data.parquet"}) + try: + with mock.patch("ray.data.read_api.read_parquet") as read_parquet, pytest.warns( + DeprecationWarning + ): + ray.data.read_unity_catalog( + table="main.db.tbl", url="https://h.databricks.com", token="t" + ) + finally: + patcher.stop() + + read_parquet.assert_called_once() + + if __name__ == "__main__": import sys From 6d4902bca8f31cf294445bf3ab263fc61dc2d6ec Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:40:26 -0700 Subject: [PATCH 08/19] nit Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 142 +++++++++++++++----------- python/ray/data/tests/test_catalog.py | 56 ++++++++-- 2 files changed, 130 insertions(+), 68 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 9a55b21d3472..e61045b2352d 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -148,11 +148,28 @@ def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: def _resolve_storage(self, table: str, reader: ReaderFormat) -> ResolvedSource: table_info = self._get_table_info(table) creds, table_url = self._get_creds(table_info["table_id"]) - filesystem, storage_options = self._creds_to_reader_args(creds, reader) + + # Deliver vended credentials via environment variables. This is the + # mechanism the underlying libraries read uniformly: pyarrow (Parquet + # data, and S3/Azure/GCS auto-filesystems) and deltalake's object_store + # (the Delta transaction *log* read in `DeltaTable(...)`, which neither + # a pyarrow `filesystem` nor `storage_options` keyed for pyarrow would + # satisfy). See `_apply_env` for the worker-propagation note. + # + # TODO: remove the env-var + ray.init mechanism once credential vending + # is performed inside the read tasks themselves (worker-side). + self._apply_env(self._creds_to_env(creds)) + + # For AWS Delta, also hand the data scan an explicit S3FileSystem: the + # vended session token isn't reliably propagated through + # `DeltaTable.to_pyarrow_dataset`'s auto-built filesystem. + filesystem = None + if "aws_temp_credentials" in creds and reader is ReaderFormat.DELTA: + filesystem = self._build_s3_filesystem(creds["aws_temp_credentials"]) + return ResolvedSource( path=table_url, filesystem=filesystem, - storage_options=storage_options, data_format=self._infer_format(table_info, table_url), ) @@ -206,56 +223,29 @@ def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: return ReaderFormat(ext) return None - def _creds_to_reader_args( - self, creds: dict, reader: ReaderFormat - ) -> Tuple[Optional["pyarrow.fs.FileSystem"], Optional[Dict[str, Any]]]: - """Translate vended credentials into reader-shaped arguments. - - AWS is delivered as a serializable ``S3FileSystem`` (worker-safe, no - global env mutation). Azure/GCP retain the env-var / ``storage_options`` - delivery used historically; eliminating that residual env mutation and - making it worker-safe is tracked as follow-up work alongside credential - refresh. - """ - import pyarrow.fs as pafs - + def _creds_to_env(self, creds: dict) -> Dict[str, str]: + """Translate vended credentials into environment variables.""" if "aws_temp_credentials" in creds: - if not self._region: - raise ValueError( - "The 'region' parameter is required for AWS S3 access. " - "Please specify the AWS region (e.g., region='us-west-2')." - ) aws = creds["aws_temp_credentials"] - filesystem = pafs.S3FileSystem( - access_key=aws["access_key_id"], - secret_key=aws["secret_access_key"], - session_token=aws["session_token"], - region=self._region, - ) - return filesystem, None + env = { + "AWS_ACCESS_KEY_ID": aws["access_key_id"], + "AWS_SECRET_ACCESS_KEY": aws["secret_access_key"], + "AWS_SESSION_TOKEN": aws["session_token"], + } + if self._region: + env["AWS_REGION"] = self._region + env["AWS_DEFAULT_REGION"] = self._region + return env if "azuresasuri" in creds or "azure_user_delegation_sas" in creds: - storage_options = self._parse_azure_creds(creds) - if reader is ReaderFormat.PARQUET: - # read_parquet has no storage_options; pyarrow's default Azure - # filesystem reads these from the process environment. - for k, v in storage_options.items(): - os.environ[k] = v - logger.warning( - "UnityCatalog: Azure credentials for read_parquet are " - "delivered via process environment variables, which is " - "driver-local. Worker propagation is a known limitation." - ) - return None, storage_options + return self._parse_azure_creds(creds) if "gcp_service_account" in creds: - self._write_gcp_creds(creds["gcp_service_account"]) - logger.warning( - "UnityCatalog: GCP credentials are delivered via the " - "GOOGLE_APPLICATION_CREDENTIALS environment variable, which is " - "driver-local. Worker propagation is a known limitation." - ) - return None, None + return { + "GOOGLE_APPLICATION_CREDENTIALS": self._write_gcp_creds( + creds["gcp_service_account"] + ) + } raise ValueError( "No known credential type found in Databricks UC response. " @@ -263,7 +253,41 @@ def _creds_to_reader_args( ) @staticmethod - def _parse_azure_creds(creds: dict) -> Dict[str, Any]: + def _apply_env(env_vars: Dict[str, str]) -> None: + """Set vended credentials in the environment and propagate to workers. + + Credentials are set on the driver's ``os.environ`` and, if Ray has not + been initialized yet, into the cluster ``runtime_env`` so read tasks on + workers inherit them. If Ray is already running we cannot retroactively + amend its ``runtime_env``; driver-side env still covers driver reads + (e.g. the Delta log) and single-node execution. + + TODO: remove once credential vending happens inside the read tasks. + """ + import ray + + for k, v in env_vars.items(): + os.environ[k] = v + if not ray.is_initialized(): + ray.init(runtime_env={"env_vars": dict(env_vars)}) + + def _build_s3_filesystem(self, aws: dict) -> "pyarrow.fs.FileSystem": + if not self._region: + raise ValueError( + "The 'region' parameter is required for AWS S3 access. " + "Please specify the AWS region (e.g., region='us-west-2')." + ) + import pyarrow.fs as pafs + + return pafs.S3FileSystem( + access_key=aws["access_key_id"], + secret_key=aws["secret_access_key"], + session_token=aws["session_token"], + region=self._region, + ) + + @staticmethod + def _parse_azure_creds(creds: dict) -> Dict[str, str]: if "azuresasuri" in creds: return {"AZURE_STORAGE_SAS_TOKEN": creds["azuresasuri"]} @@ -284,23 +308,25 @@ def _parse_azure_creds(creds: dict) -> Dict[str, Any]: "azure_user_delegation_sas. " f"Available keys: {', '.join(azure.keys())}" ) - storage_options: Dict[str, Any] = {"AZURE_STORAGE_SAS_TOKEN": sas_token} + env: Dict[str, str] = {"AZURE_STORAGE_SAS_TOKEN": sas_token} storage_account = azure.get("storage_account") if storage_account: - storage_options["AZURE_STORAGE_ACCOUNT"] = storage_account - storage_options["AZURE_STORAGE_ACCOUNT_NAME"] = storage_account - return storage_options + env["AZURE_STORAGE_ACCOUNT"] = storage_account + env["AZURE_STORAGE_ACCOUNT_NAME"] = storage_account + return env @staticmethod - def _write_gcp_creds(gcp_json: str) -> None: - # The file is intentionally not deleted. The Ray driver may be - # long-lived there may still be references to the old file via - # GOOGLE_APPLICATION_CREDENTIALS, so removing it could break in-flight - # reads. The OS reclaims the temp directory in due course and file - # size is expected to be trivial. + def _write_gcp_creds(gcp_json: str) -> str: + """Write the GCP service-account JSON to a temp file; return its path. + + The file is intentionally not deleted. The Ray driver may be long-lived + and workers may still reference it via GOOGLE_APPLICATION_CREDENTIALS, + so removing it could break in-flight reads. The OS reclaims the temp + directory in due course and the file is trivially small. + """ temp_file = tempfile.NamedTemporaryFile( mode="w", prefix="gcp_sa_", suffix=".json", delete=False ) temp_file.write(gcp_json) temp_file.close() - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name + return temp_file.name diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index 07125af92035..de86cd83a849 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -51,13 +51,22 @@ def uc_catalog(): ) +@pytest.fixture +def isolated_env(monkeypatch): + # resolve() exports vended creds to os.environ and may call ray.init. + # Isolate the env mutation and skip the real ray.init for unit tests. + monkeypatch.setattr("ray.is_initialized", lambda: True) + monkeypatch.setattr(os, "environ", dict(os.environ)) + return os.environ + + # --------------------------------------------------------------------------- # UnityCatalog.resolve # --------------------------------------------------------------------------- @pytest.mark.parametrize("reader", [ReaderFormat.DELTA, ReaderFormat.PARQUET]) -def test_resolve_storage_aws(uc_catalog, reader): +def test_resolve_storage_aws(uc_catalog, isolated_env, reader): patcher = _mock_uc_rest() try: resolved = uc_catalog.resolve("main.sales.txns", reader=reader) @@ -65,12 +74,19 @@ def test_resolve_storage_aws(uc_catalog, reader): patcher.stop() assert resolved.path == "s3://bucket/path" - assert isinstance(resolved.filesystem, pafs.S3FileSystem) - assert resolved.storage_options is None + # AWS creds are exported to the environment for both readers. + assert isolated_env["AWS_ACCESS_KEY_ID"] == "AKIA" + assert isolated_env["AWS_SESSION_TOKEN"] == "token" + # Delta additionally gets an explicit S3FileSystem for the data scan; + # Parquet reads via the environment-configured filesystem. + if reader is ReaderFormat.DELTA: + assert isinstance(resolved.filesystem, pafs.S3FileSystem) + else: + assert resolved.filesystem is None assert resolved.data_format is ReaderFormat.DELTA -def test_resolve_aws_requires_region(): +def test_resolve_aws_requires_region(isolated_env): catalog = UnityCatalog(url="https://h.databricks.com", token="t") # no region patcher = _mock_uc_rest() try: @@ -80,6 +96,24 @@ def test_resolve_aws_requires_region(): patcher.stop() +def test_resolve_initializes_ray_with_runtime_env(uc_catalog, monkeypatch): + # When Ray isn't running, vended creds are propagated via runtime_env. + monkeypatch.setattr(os, "environ", dict(os.environ)) + monkeypatch.setattr("ray.is_initialized", lambda: False) + init_kwargs = {} + monkeypatch.setattr("ray.init", lambda **kw: init_kwargs.update(kw)) + + patcher = _mock_uc_rest() + try: + uc_catalog.resolve("main.sales.txns", reader=ReaderFormat.PARQUET) + finally: + patcher.stop() + + env_vars = init_kwargs["runtime_env"]["env_vars"] + assert env_vars["AWS_ACCESS_KEY_ID"] == "AKIA" + assert env_vars["AWS_SESSION_TOKEN"] == "token" + + def test_resolve_iceberg(uc_catalog): # Iceberg resolution does not hit the credential-vending REST endpoints. resolved = uc_catalog.resolve("main.sales.txns", reader=ReaderFormat.ICEBERG) @@ -101,7 +135,7 @@ def test_resolve_unsupported_reader(uc_catalog): uc_catalog.resolve("main.sales.txns", reader="bogus") -def test_resolve_azure_returns_storage_options(): +def test_resolve_azure_sets_env(isolated_env): catalog = UnityCatalog(url="https://h.databricks.com", token="t") azure_creds = { "url": "abfss://c@acct.dfs.core.windows.net/path", @@ -113,17 +147,19 @@ def test_resolve_azure_returns_storage_options(): finally: patcher.stop() + # Azure creds flow via the environment (read by both pyarrow and the + # deltalake object_store log reader); no filesystem/storage_options. assert resolved.filesystem is None - assert resolved.storage_options == {"AZURE_STORAGE_SAS_TOKEN": "sv=2021&sig=abc"} + assert resolved.storage_options is None + assert isolated_env["AZURE_STORAGE_SAS_TOKEN"] == "sv=2021&sig=abc" -def test_gcp_creds_written_and_env_set(monkeypatch): - monkeypatch.setenv("GOOGLE_APPLICATION_CREDENTIALS", "") +def test_gcp_creds_to_env_writes_file(monkeypatch): catalog = UnityCatalog(url="https://h.databricks.com", token="t") - catalog._write_gcp_creds('{"sa": 1}') + env = catalog._creds_to_env({"gcp_service_account": '{"sa": 1}'}) - path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + path = env["GOOGLE_APPLICATION_CREDENTIALS"] assert os.path.exists(path) with open(path) as f: assert f.read() == '{"sa": 1}' From 0902846b2abe89a14fd7686acf4a43e1d92db1a4 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:48:49 -0700 Subject: [PATCH 09/19] nit Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index e61045b2352d..cc4801e7aded 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -31,6 +31,13 @@ logger = logging.getLogger(__name__) +def _normalize_host(host: str) -> str: + host = host.rstrip("/") + if not host.startswith(("http://", "https://")): + host = f"https://{host}" + return host + + @PublicAPI(stability="alpha") class ReaderFormat(str, Enum): """Which reader is asking the catalog to resolve a table. @@ -127,14 +134,7 @@ def __init__( ) ) self._region = region - self._base_url = self._normalize_host(self._provider.get_host()) - - @staticmethod - def _normalize_host(host: str) -> str: - host = host.rstrip("/") - if not host.startswith(("http://", "https://")): - host = f"https://{host}" - return host + self._base_url = _normalize_host(self._provider.get_host()) # ---- Catalog interface ------------------------------------------------- def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: From b2ffc19a294296c20e7a182de07996084445ec84 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:51:38 -0700 Subject: [PATCH 10/19] restore cleanup code Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 16 ++++++++++++---- python/ray/data/tests/test_catalog.py | 11 ++++++++--- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index cc4801e7aded..d4c8eb53d847 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -9,6 +9,7 @@ is passed in only when authentication is required. """ +import atexit import logging import os import tempfile @@ -319,14 +320,21 @@ def _parse_azure_creds(creds: dict) -> Dict[str, str]: def _write_gcp_creds(gcp_json: str) -> str: """Write the GCP service-account JSON to a temp file; return its path. - The file is intentionally not deleted. The Ray driver may be long-lived - and workers may still reference it via GOOGLE_APPLICATION_CREDENTIALS, - so removing it could break in-flight reads. The OS reclaims the temp - directory in due course and the file is trivially small. + Registers an ``atexit`` handler to remove the file on interpreter exit. """ temp_file = tempfile.NamedTemporaryFile( mode="w", prefix="gcp_sa_", suffix=".json", delete=False ) temp_file.write(gcp_json) temp_file.close() + atexit.register(UnityCatalog._cleanup_gcp_temp_file, temp_file.name) return temp_file.name + + @staticmethod + def _cleanup_gcp_temp_file(temp_file_path: str) -> None: + """Clean up temporary GCP service account file.""" + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except OSError: + pass diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index de86cd83a849..1240d48a3698 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -154,16 +154,21 @@ def test_resolve_azure_sets_env(isolated_env): assert isolated_env["AZURE_STORAGE_SAS_TOKEN"] == "sv=2021&sig=abc" -def test_gcp_creds_to_env_writes_file(monkeypatch): +def test_gcp_creds_to_env_writes_file_and_registers_cleanup(): catalog = UnityCatalog(url="https://h.databricks.com", token="t") - env = catalog._creds_to_env({"gcp_service_account": '{"sa": 1}'}) + with mock.patch("ray.data.catalog.atexit.register") as register: + env = catalog._creds_to_env({"gcp_service_account": '{"sa": 1}'}) path = env["GOOGLE_APPLICATION_CREDENTIALS"] assert os.path.exists(path) with open(path) as f: assert f.read() == '{"sa": 1}' - os.unlink(path) # test housekeeping; production intentionally leaves it + # An atexit handler is registered to remove the file on exit. + register.assert_called_once_with(UnityCatalog._cleanup_gcp_temp_file, path) + # The cleanup handler removes the file. + UnityCatalog._cleanup_gcp_temp_file(path) + assert not os.path.exists(path) # --------------------------------------------------------------------------- From c4be39e782056ed59e300eb41bf1838dc814f7f0 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 16:54:02 -0700 Subject: [PATCH 11/19] better infer_format in unity_catalog implementation Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 9 +++++++++ python/ray/data/read_api.py | 6 +----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index d4c8eb53d847..b3b9862d78e0 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -224,6 +224,15 @@ def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: return ReaderFormat(ext) return None + def infer_format(self, table: str) -> Optional[ReaderFormat]: + """Best-effort format hint from table metadata or file extension. + + Calling this function will query UnityCatalog to get the relevant + information.""" + info = self._get_table_info(table) + _, table_url = self._get_creds(info["table_id"]) + return self._infer_format(info, table_url) + def _creds_to_env(self, creds: dict) -> Dict[str, str]: """Translate vended credentials into environment variables.""" if "aws_temp_credentials" in creds: diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 51b775b67589..24fcad6cf8f3 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4595,11 +4595,7 @@ def read_unity_catalog( fmt = ReaderFormat(data_format.lower()) if data_format else None if fmt is None: - # Infer the format from table metadata, falling back to the vended - # credential URL's extension. - info = catalog._get_table_info(table) - _, table_url = catalog._get_creds(info["table_id"]) - fmt = catalog._infer_format(info, table_url) + fmt = catalog.infer_format(table) if fmt is ReaderFormat.DELTA: return read_delta(table, catalog=catalog, **reader_kwargs) From e7102b2ba0a3133617fc349ef61c50dae0995af8 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 17:15:49 -0700 Subject: [PATCH 12/19] better ReaderFormat handling Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 8 +++----- python/ray/data/tests/test_catalog.py | 7 ++++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index b3b9862d78e0..639d4eeca39e 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -41,11 +41,7 @@ def _normalize_host(host: str) -> str: @PublicAPI(stability="alpha") class ReaderFormat(str, Enum): - """Which reader is asking the catalog to resolve a table. - - ``str``-based so it stays ergonomic (``ReaderFormat.DELTA == "delta"``) and - serializes cleanly. - """ + """Which reader is asking the catalog to resolve a table.""" DELTA = "delta" PARQUET = "parquet" @@ -139,10 +135,12 @@ def __init__( # ---- Catalog interface ------------------------------------------------- def resolve(self, table: str, *, reader: ReaderFormat) -> ResolvedSource: + assert isinstance(reader, ReaderFormat) if reader is ReaderFormat.ICEBERG: return self._resolve_iceberg(table) if reader in (ReaderFormat.DELTA, ReaderFormat.PARQUET): return self._resolve_storage(table, reader) + # Reached only if a new ReaderFormat is added without handling here. raise ValueError(f"UnityCatalog does not support reader={reader!r}") # ---- storage-credential vending (delta / parquet) ---------------------- diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index 1240d48a3698..a593d44fcafe 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -130,9 +130,10 @@ def test_resolve_iceberg(uc_catalog): assert ckw["header.X-Iceberg-Access-Delegation"] == "vended-credentials" -def test_resolve_unsupported_reader(uc_catalog): - with pytest.raises(ValueError, match="does not support"): - uc_catalog.resolve("main.sales.txns", reader="bogus") +def test_resolve_requires_readerformat(uc_catalog): + # reader must be a ReaderFormat enum, not its raw string value. + with pytest.raises(AssertionError, match="must be a ReaderFormat"): + uc_catalog.resolve("main.sales.txns", reader="delta") def test_resolve_azure_sets_env(isolated_env): From 8c1cabe3a6ff232032ee822d9a79a010ed8dd6a6 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 17:20:18 -0700 Subject: [PATCH 13/19] . Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 4 ++-- python/ray/data/tests/test_catalog.py | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 639d4eeca39e..1510c9f6f72c 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -211,8 +211,8 @@ def _get_creds(self, table_id: str) -> Tuple[dict, str]: @staticmethod def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: """Best-effort format hint from table metadata or file extension.""" - fmt = (table_info.get("data_source_format") or "").lower() - if fmt in (ReaderFormat.DELTA.value, ReaderFormat.PARQUET.value): + fmt = table_info.get("data_source_format", "").lower() + if fmt in {f.value for f in ReaderFormat}: return ReaderFormat(fmt) storage_loc = table_info.get("storage_location") or table_url diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index a593d44fcafe..146c573d92f5 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -130,12 +130,6 @@ def test_resolve_iceberg(uc_catalog): assert ckw["header.X-Iceberg-Access-Delegation"] == "vended-credentials" -def test_resolve_requires_readerformat(uc_catalog): - # reader must be a ReaderFormat enum, not its raw string value. - with pytest.raises(AssertionError, match="must be a ReaderFormat"): - uc_catalog.resolve("main.sales.txns", reader="delta") - - def test_resolve_azure_sets_env(isolated_env): catalog = UnityCatalog(url="https://h.databricks.com", token="t") azure_creds = { From 34307e663a5ebfbe883a1199f00eb405d1e6a2db Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Wed, 17 Jun 2026 17:23:44 -0700 Subject: [PATCH 14/19] revert simplification Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 1510c9f6f72c..1fdfe5cc4bd6 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -211,7 +211,7 @@ def _get_creds(self, table_id: str) -> Tuple[dict, str]: @staticmethod def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: """Best-effort format hint from table metadata or file extension.""" - fmt = table_info.get("data_source_format", "").lower() + fmt = (table_info.get("data_source_format") or "").lower() if fmt in {f.value for f in ReaderFormat}: return ReaderFormat(fmt) From 330f513313b1991fa140d29424feb68741c63db9 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Thu, 18 Jun 2026 17:15:44 -0700 Subject: [PATCH 15/19] some fix for delta Signed-off-by: Daniel Shin --- python/ray/data/read_api.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 24fcad6cf8f3..0c0b9d1981b6 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4752,7 +4752,14 @@ def read_delta( "the provided `filesystem` with the catalog-resolved " "credentials." ) - filesystem = resolved.filesystem + # `to_pyarrow_dataset` emits table-relative file paths and requires + # the filesystem to be rooted at the table directory. The catalog + # vends a bucket-rooted filesystem, so wrap it in a SubTreeFileSystem + # rooted at the table path (see `DeltaTable.to_pyarrow_dataset`). + import pyarrow.fs as pafs + + _, normalized_path = pafs.FileSystem.from_uri(path) + filesystem = pafs.SubTreeFileSystem(normalized_path, resolved.filesystem) dt = DeltaTable(path, version=version, storage_options=storage_options) pa_dataset = dt.to_pyarrow_dataset(filesystem=filesystem) From 6e8b7f966e4cf4c8660969e77f8521d537e4b922 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Thu, 18 Jun 2026 17:39:40 -0700 Subject: [PATCH 16/19] fix iceberg Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 16 +++++++++++++--- python/ray/data/read_api.py | 4 ++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 1fdfe5cc4bd6..974baed5006a 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -48,7 +48,6 @@ class ReaderFormat(str, Enum): ICEBERG = "iceberg" -@PublicAPI(stability="alpha") @dataclass class ResolvedSource: """The output of :meth:`Catalog.resolve` — location/credentials for a reader. @@ -57,7 +56,7 @@ class ResolvedSource: * ``read_delta``: ``path`` + (``storage_options`` and/or ``filesystem``) * ``read_parquet``: ``path`` + ``filesystem`` - * ``read_iceberg``: ``catalog_kwargs`` + * ``read_iceberg``: ``catalog_kwargs`` + ``table_identifier`` Unused fields are ``None``. """ @@ -66,6 +65,10 @@ class ResolvedSource: filesystem: Optional["pyarrow.fs.FileSystem"] = None storage_options: Optional[Dict[str, Any]] = None catalog_kwargs: Optional[Dict[str, Any]] = None + # Identifier the reader should address the table by, if the catalog rewrites + # it (e.g. Iceberg REST scopes the warehouse to the catalog, so the table is + # addressed as ``schema.table`` rather than ``catalog.schema.table``). + table_identifier: Optional[str] = None data_format: Optional[ReaderFormat] = None # hint, e.g. ReaderFormat.DELTA @@ -177,10 +180,17 @@ def _resolve_iceberg(self, table: str) -> ResolvedSource: # PyIceberg speaks the Iceberg REST protocol; Unity Catalog implements # it and vends data-file credentials via the access-delegation header. # No manual S3/ADLS/GCS keys are needed here. + # + # The REST catalog is scoped to a single UC catalog via `warehouse`, so + # the table is addressed by `schema.table` (the catalog prefix would + # otherwise be double-applied, e.g. `tmp.tmp.schema.table`). + catalog_name, _, namespace_table = table.partition(".") return ResolvedSource( + table_identifier=namespace_table, catalog_kwargs={ "type": "rest", - "uri": f"{self._base_url}/api/2.1/unity-catalog/iceberg", + "uri": f"{self._base_url}/api/2.1/unity-catalog/iceberg-rest", + "warehouse": catalog_name, "token": self._provider.get_token(), "header.X-Iceberg-Access-Delegation": "vended-credentials", }, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 0c0b9d1981b6..4074a80db66c 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4293,6 +4293,8 @@ def read_iceberg( resolved = catalog.resolve(table_identifier, reader=ReaderFormat.ICEBERG) catalog_kwargs = resolved.catalog_kwargs or {} + if resolved.table_identifier is not None: + table_identifier = resolved.table_identifier # Setup the Datasource datasource = IcebergDatasource( @@ -4601,6 +4603,8 @@ def read_unity_catalog( return read_delta(table, catalog=catalog, **reader_kwargs) if fmt is ReaderFormat.PARQUET: return read_parquet(table, catalog=catalog, **reader_kwargs) + if fmt is ReaderFormat.ICEBERG: + return read_iceberg(table_identifier=table, catalog=catalog, **reader_kwargs) raise ValueError(f"Unsupported data_format for read_unity_catalog: {fmt!r}") From ef239dc2c4b83a1199db4167c22c43419c78cc9d Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Thu, 18 Jun 2026 17:51:30 -0700 Subject: [PATCH 17/19] add iceberg infer Signed-off-by: Daniel Shin --- python/ray/data/catalog.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/ray/data/catalog.py b/python/ray/data/catalog.py index 974baed5006a..00cc3f82a92f 100644 --- a/python/ray/data/catalog.py +++ b/python/ray/data/catalog.py @@ -223,6 +223,15 @@ def _infer_format(table_info: dict, table_url: str) -> Optional[ReaderFormat]: """Best-effort format hint from table metadata or file extension.""" fmt = (table_info.get("data_source_format") or "").lower() if fmt in {f.value for f in ReaderFormat}: + # A UniForm Delta table reports DELTA but also exposes Iceberg + # metadata. Prefer Iceberg: these tables require columnMapping, which + # deltalake's pyarrow reader can't read, so the Delta path would fail. + if fmt == ReaderFormat.DELTA.value: + uniform = (table_info.get("properties") or {}).get( + "delta.universalFormat.enabledFormats", "" + ) + if "iceberg" in uniform.lower(): + return ReaderFormat.ICEBERG return ReaderFormat(fmt) storage_loc = table_info.get("storage_location") or table_url From d58f1a97fbb21af180198357a0b327c8de7105d3 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Thu, 18 Jun 2026 18:02:08 -0700 Subject: [PATCH 18/19] add DeletionVector support Signed-off-by: Daniel Shin --- python/ray/data/read_api.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 4074a80db66c..08a5031630c4 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4766,7 +4766,17 @@ def read_delta( filesystem = pafs.SubTreeFileSystem(normalized_path, resolved.filesystem) dt = DeltaTable(path, version=version, storage_options=storage_options) - pa_dataset = dt.to_pyarrow_dataset(filesystem=filesystem) + try: + pa_dataset = dt.to_pyarrow_dataset(filesystem=filesystem) + except Exception as e: + error_msg = str(e) + if "DeletionVectors" in error_msg or "Unsupported reader features" in error_msg: + raise RuntimeError( + f"Delta table uses Deletion Vectors, which requires deltalake>=0.10.0. " + f"Error: {error_msg}\n" + f"Solution: pip install --upgrade 'deltalake>=0.10.0'" + ) from e + raise datasource = ParquetDatasource.from_pyarrow_dataset( pa_dataset, From 125060885f75aef8d9fe8886331e6721e028e0c8 Mon Sep 17 00:00:00 2001 From: Daniel Shin Date: Thu, 18 Jun 2026 18:38:21 -0700 Subject: [PATCH 19/19] fix test Signed-off-by: Daniel Shin --- python/ray/data/tests/test_catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_catalog.py b/python/ray/data/tests/test_catalog.py index 146c573d92f5..ed5fcec18159 100644 --- a/python/ray/data/tests/test_catalog.py +++ b/python/ray/data/tests/test_catalog.py @@ -124,7 +124,7 @@ def test_resolve_iceberg(uc_catalog): ckw = resolved.catalog_kwargs assert ckw["type"] == "rest" assert ckw["uri"] == ( - "https://dbc-test.cloud.databricks.com/api/2.1/unity-catalog/iceberg" + "https://dbc-test.cloud.databricks.com/api/2.1/unity-catalog/iceberg-rest" ) assert ckw["token"] == "dapi-test" assert ckw["header.X-Iceberg-Access-Delegation"] == "vended-credentials"