diff --git a/src/datachain/client/azure.py b/src/datachain/client/azure.py index 5f106d359..50f380597 100644 --- a/src/datachain/client/azure.py +++ b/src/datachain/client/azure.py @@ -30,6 +30,7 @@ class AzureClient(Client): "tenant_id", } ) + _ANON_FALLBACK = True @classmethod def bucket_status(cls, name: str, **kwargs) -> BucketStatus: diff --git a/src/datachain/client/fsspec.py b/src/datachain/client/fsspec.py index e79e90907..44c359b79 100644 --- a/src/datachain/client/fsspec.py +++ b/src/datachain/client/fsspec.py @@ -63,6 +63,144 @@ class BucketStatus(NamedTuple): error: str | None = None +class AnonFallbackFS: + """Proxy for an fsspec ``AbstractFileSystem`` that retries read methods + on ``PermissionError`` with ``anon=True``, and caches the decision per + ``(protocol, bucket)`` so future calls go straight to the right mode. + + Only read methods are wrapped - anon is read-only on every cloud + backend, so retrying writes would always fail and would mis-mark the + bucket as anon-impossible, defeating the feature for later reads. + """ + + # fsspec read-side methods we wrap. ``open`` / ``_open`` are also here + # but the wrapper checks ``mode`` and skips retry for write modes. + READ_METHODS: ClassVar[frozenset[str]] = frozenset( + { + "_info", + "_open", + "open", + "_cat_file", + "_ls", + "_get_file", + "_glob", + "_walk", + "_find", + "_du", + "_size", + "_exists", + "_isdir", + "_isfile", + "_modified", + "sign", + } + ) + + def __init__(self, client: "Client") -> None: + self._client = client + kwargs = dict(client.fs_kwargs) + if ( + self._should_use_anon_cache() + and client._bucket_needs_anon(client.name) is True + ): + kwargs["anon"] = True + self._inner_fs = type(client).create_fs(**kwargs) + + @property # type: ignore[misc] + def __class__(self): + # Pretend to be the underlying fsspec class so isinstance checks + # (e.g. pyarrow's ``isinstance(fs, AbstractFileSystem)``) pass. + return type(self._inner_fs) + + def __getattr__(self, name: str): + attr = getattr(self._inner_fs, name) + if not callable(attr) or name not in self.READ_METHODS: + return attr + if asyncio.iscoroutinefunction(attr): + return self._wrap_async(name, attr) + return self._wrap_sync(name, attr) + + @staticmethod + def _is_write_open(args, kwargs) -> bool: + """``open`` / ``_open`` mode arg indicates write/append/exclusive.""" + mode = args[1] if len(args) >= 2 else kwargs.get("mode", "rb") + return any(c in mode for c in "wax+") + + def _should_use_anon_cache(self) -> bool: + # Clients with explicit creds skip the shared anon cache - reading + # could lock them out of paths their creds cover, writing could + # mis-flag a bucket as anon-needed for future no-creds callers. + return not type(self._client).has_explicit_credentials(self._client.fs_kwargs) + + def _can_retry(self) -> bool: + if self._client.fs_kwargs.get("anon"): + return False + if not self._should_use_anon_cache(): + return True + return self._client._bucket_needs_anon(self._client.name) is None + + def _swap_to_anon(self) -> None: + self._inner_fs = type(self._client).create_fs( + **{**self._client.fs_kwargs, "anon": True} + ) + + def _wrap_sync(self, name, attr): + def call(*args, **kwargs): + if name in ("open", "_open") and self._is_write_open(args, kwargs): + return attr(*args, **kwargs) + try: + return attr(*args, **kwargs) + except PermissionError: + if not self._can_retry(): + raise + saved = self._inner_fs + self._swap_to_anon() + keep_anon = False + try: + result = getattr(self._inner_fs, name)(*args, **kwargs) + keep_anon = True + if self._should_use_anon_cache(): + self._client._mark_bucket_anon(self._client.name, True) + return result + except PermissionError: + if self._should_use_anon_cache(): + self._client._mark_bucket_anon(self._client.name, False) + raise + finally: + if not keep_anon: + self._inner_fs = saved + + return call + + def _wrap_async(self, name, attr): + async def call(*args, **kwargs): + if name in ("open", "_open") and self._is_write_open(args, kwargs): + return await attr(*args, **kwargs) + try: + return await attr(*args, **kwargs) + except PermissionError: + if not self._can_retry(): + raise + saved = self._inner_fs + self._swap_to_anon() + keep_anon = False + try: + result = await getattr(self._inner_fs, name)(*args, **kwargs) + keep_anon = True + if self._should_use_anon_cache(): + self._client._mark_bucket_anon(self._client.name, True) + return result + except PermissionError: + if self._should_use_anon_cache(): + self._client._mark_bucket_anon(self._client.name, False) + raise + finally: + if not keep_anon: + self._inner_fs = saved + + return call + + class Client(ABC): MAX_THREADS = multiprocessing.cpu_count() FS_CLASS: ClassVar[type["AbstractFileSystem"]] @@ -70,6 +208,15 @@ class Client(ABC): protocol: ClassVar[str] # client_config keys this backend treats as credentials. CREDENTIAL_KEYS: ClassVar[frozenset[str]] = frozenset() + # Whether ``anon=True`` is a meaningful fsspec kwarg for this backend. + # Subclasses for S3/GCS/Azure flip this to True so ``Client.fs`` returns + # an ``AnonFallbackFS`` proxy. + _ANON_FALLBACK: ClassVar[bool] = False + # Process-local cache of (protocol, bucket) anon decisions. + # True = anon retry succeeded, use anon from the start. + # False = anon retry also failed, don't bother retrying again. + # Missing key = no decision yet. + _anon_buckets: ClassVar[dict[tuple[str, str], bool]] = {} @classmethod def has_explicit_credentials(cls, client_config: dict | None) -> bool: @@ -78,6 +225,14 @@ def has_explicit_credentials(cls, client_config: dict | None) -> bool: return False return any(k in client_config for k in cls.CREDENTIAL_KEYS) + @classmethod + def _bucket_needs_anon(cls, name: str) -> bool | None: + return cls._anon_buckets.get((cls.protocol, name)) + + @classmethod + def _mark_bucket_anon(cls, name: str, anon: bool) -> None: + cls._anon_buckets[(cls.protocol, name)] = anon + def __init__(self, name: str, fs_kwargs: dict[str, Any], cache: Cache) -> None: self.name = name self.fs_kwargs = fs_kwargs @@ -232,7 +387,10 @@ def split_url(cls, url: str) -> tuple[str, str]: @property def fs(self) -> "AbstractFileSystem": if not self._fs: - self._fs = self.create_fs(**self.fs_kwargs) + if type(self)._ANON_FALLBACK: + self._fs = AnonFallbackFS(self) # type: ignore[assignment] + else: + self._fs = self.create_fs(**self.fs_kwargs) return self._fs def url( diff --git a/src/datachain/client/gcs.py b/src/datachain/client/gcs.py index f870b3ff8..7d854f877 100644 --- a/src/datachain/client/gcs.py +++ b/src/datachain/client/gcs.py @@ -30,6 +30,7 @@ class GCSClient(Client): PREFIX = "gs://" protocol = "gs" CREDENTIAL_KEYS = frozenset({"token"}) + _ANON_FALLBACK = True @classmethod def create_fs(cls, **kwargs) -> GCSFileSystem: diff --git a/src/datachain/client/s3.py b/src/datachain/client/s3.py index 5dc0a6ade..557f7efb9 100644 --- a/src/datachain/client/s3.py +++ b/src/datachain/client/s3.py @@ -21,6 +21,7 @@ class ClientS3(Client): CREDENTIAL_KEYS = frozenset( {"key", "secret", "token", "aws_key", "aws_secret", "aws_token"} ) + _ANON_FALLBACK = True @staticmethod def _normalize_s3_kwargs(kwargs: dict) -> dict: diff --git a/tests/func/test_storage_auto_anon.py b/tests/func/test_storage_auto_anon.py index 8c770ad27..91cb05373 100644 --- a/tests/func/test_storage_auto_anon.py +++ b/tests/func/test_storage_auto_anon.py @@ -3,7 +3,10 @@ import pytest import datachain as dc +from datachain.cache import Cache +from datachain.client import Client from datachain.client.azure import AzureClient +from datachain.client.fsspec import AnonFallbackFS from datachain.client.gcs import GCSClient from datachain.client.s3 import ClientS3 @@ -71,3 +74,22 @@ def test_explicit_anon_skips_auto_detect(probe, explicit, tmp_dir, catalog): chain = dc.read_storage(tmp_dir.as_uri(), anon=explicit) probe.assert_not_called() assert chain.session.catalog.client_config.get("anon") is explicit + + +@pytest.mark.parametrize("cloud_type", ["s3", "gs"], indirect=True) +def test_anon_fallback_proxy_integration( + cloud_server, cloud_server_credentials, tmp_path +): + cache = Cache(str(tmp_path / "cache"), str(tmp_path / "tmp")) + client = Client.get_client( + cloud_server.src_uri, cache, **cloud_server.client_config + ) + + # Building fs returns the proxy for cloud clients. + _ = client.fs + assert isinstance(client._fs, AnonFallbackFS) + + # End-to-end op through proxy → real fsspec → emulator. + info = client.get_file_info("description") + assert info.path == "description" + assert info.size and info.size > 0 diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 21a1bc2c9..3335d7e5f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1,11 +1,14 @@ import os import sys from pathlib import Path +from unittest.mock import AsyncMock, MagicMock import pytest from datachain.client import Client +from datachain.client.gcs import GCSClient from datachain.client.local import FileClient +from datachain.lib.file import File def test_bad_protocol(): @@ -31,3 +34,189 @@ def test_parse_file_path_ends_with_slash(cloud_type): uri, rel_part = Client.parse_url("./animals/".replace("/", os.sep)) assert uri == (Path().absolute() / Path("animals")).as_uri() assert rel_part == "" + + +@pytest.fixture +def _clear_anon_cache(): + Client._anon_buckets.clear() + yield + Client._anon_buckets.clear() + + +def _gcs_client(bucket: str = "foo", **fs_kwargs) -> GCSClient: + return GCSClient(bucket, fs_kwargs, MagicMock()) + + +def _info_ok(): + return AsyncMock( + return_value={ + "name": "gs://foo/x.txt", + "size": 1, + "etag": "e", + "updated": "2024-01-01T00:00:00Z", + } + ) + + +def test_anon_fallback_no_error_no_retry(monkeypatch, _clear_anon_cache): + auth_fs = MagicMock() + auth_fs._info = _info_ok() + create_fs = MagicMock(return_value=auth_fs) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + client.get_file_info("x.txt") + + create_fs.assert_called_once() + assert create_fs.call_args.kwargs.get("anon") is None + assert GCSClient._bucket_needs_anon("foo") is None + + +def test_anon_fallback_retry_succeeds_marks_bucket(monkeypatch, _clear_anon_cache): + auth_fs = MagicMock() + auth_fs._info = AsyncMock(side_effect=PermissionError) + anon_fs = MagicMock() + anon_fs._info = _info_ok() + create_fs = MagicMock(side_effect=[auth_fs, anon_fs]) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + client.get_file_info("x.txt") + + assert GCSClient._bucket_needs_anon("foo") is True + assert create_fs.call_count == 2 + assert create_fs.call_args_list[1].kwargs.get("anon") is True + + +def test_anon_fallback_retry_also_fails_marks_as_failed(monkeypatch, _clear_anon_cache): + auth_fs = MagicMock() + auth_fs._info = AsyncMock(side_effect=PermissionError) + anon_fs = MagicMock() + anon_fs._info = AsyncMock(side_effect=PermissionError) + create_fs = MagicMock(side_effect=[auth_fs, anon_fs]) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + with pytest.raises(PermissionError): + client.get_file_info("x.txt") + + assert GCSClient._bucket_needs_anon("foo") is False + assert create_fs.call_count == 2 + + +def test_anon_fallback_cached_as_failed_skips_retry(monkeypatch, _clear_anon_cache): + GCSClient._mark_bucket_anon("foo", False) + auth_fs = MagicMock() + auth_fs._info = AsyncMock(side_effect=PermissionError) + create_fs = MagicMock(return_value=auth_fs) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + with pytest.raises(PermissionError): + client.get_file_info("x.txt") + + create_fs.assert_called_once() + assert create_fs.call_args.kwargs.get("anon") is None + + +def test_anon_fallback_cached_bucket_uses_anon_directly(monkeypatch, _clear_anon_cache): + GCSClient._mark_bucket_anon("foo", True) + create_fs = MagicMock(return_value=MagicMock()) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + _ = _gcs_client().fs._info + + create_fs.assert_called_once() + assert create_fs.call_args.kwargs.get("anon") is True + + +def test_anon_fallback_explicit_anon_no_retry(monkeypatch, _clear_anon_cache): + auth_fs = MagicMock() + auth_fs._info = AsyncMock(side_effect=PermissionError) + create_fs = MagicMock(return_value=auth_fs) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client(anon=True) + with pytest.raises(PermissionError): + client.get_file_info("x.txt") + + create_fs.assert_called_once() + assert GCSClient._bucket_needs_anon("foo") is None + + +def test_anon_fallback_open_object_retry_succeeds(monkeypatch, _clear_anon_cache): + auth_fs = MagicMock() + auth_fs.open = MagicMock(side_effect=PermissionError) + anon_fs = MagicMock() + anon_fs.open = MagicMock(return_value=MagicMock()) + create_fs = MagicMock(side_effect=[auth_fs, anon_fs]) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + client.cache.get_path = MagicMock(return_value=None) + client.open_object(File(source="gs://foo", path="x.txt")) + + assert GCSClient._bucket_needs_anon("foo") is True + + +def test_anon_fallback_explicit_creds_ignore_cache(monkeypatch, _clear_anon_cache): + # A previous no-creds caller cached the bucket as anon-needed. + GCSClient._mark_bucket_anon("foo", True) + + # New client with explicit creds: should NOT use anon directly, should + # try with creds first, and should NOT overwrite the cache. + auth_fs = MagicMock() + auth_fs._info = _info_ok() + anon_fs = MagicMock() + create_fs = MagicMock(side_effect=[auth_fs, anon_fs]) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client(token="explicit-token") # noqa: S106 + client.get_file_info("x.txt") + + # Only the auth fs was built, anon was never instantiated. + create_fs.assert_called_once() + assert create_fs.call_args.kwargs.get("anon") is None + # Cache was not touched - still True from the earlier no-creds caller. + assert GCSClient._bucket_needs_anon("foo") is True + + +def test_anon_fallback_explicit_creds_anon_retry_does_not_cache( + monkeypatch, _clear_anon_cache +): + # Client with explicit creds whose creds fail - anon retry succeeds, + # but the success must NOT pollute the shared cache. + auth_fs = MagicMock() + auth_fs._info = AsyncMock(side_effect=PermissionError) + anon_fs = MagicMock() + anon_fs._info = _info_ok() + create_fs = MagicMock(side_effect=[auth_fs, anon_fs]) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client(token="explicit-token") # noqa: S106 + client.get_file_info("x.txt") + + # Cache untouched. + assert GCSClient._bucket_needs_anon("foo") is None + + +def test_anon_fallback_write_open_no_retry_no_cache_poisoning( + monkeypatch, _clear_anon_cache +): + # A write-mode open that fails with PermissionError must NOT trigger + # an anon retry (anon can never write) and must NOT poison the cache + # with False - otherwise future legitimate reads would lose fallback. + auth_fs = MagicMock() + auth_fs.open = MagicMock(side_effect=PermissionError) + create_fs = MagicMock(return_value=auth_fs) + monkeypatch.setattr(GCSClient, "create_fs", create_fs) + + client = _gcs_client() + with pytest.raises(PermissionError): + client.fs.open("gs://foo/x.txt", "wb") + + # No anon fs was built (no retry attempted). + create_fs.assert_called_once() + assert create_fs.call_args.kwargs.get("anon") is None + # Cache is clean - subsequent reads can still fall back. + assert GCSClient._bucket_needs_anon("foo") is None