Skip to content
1 change: 1 addition & 0 deletions src/datachain/client/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AzureClient(Client):
"tenant_id",
}
)
_ANON_FALLBACK = True

@classmethod
def bucket_status(cls, name: str, **kwargs) -> BucketStatus:
Expand Down
160 changes: 159 additions & 1 deletion src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,160 @@ class BucketStatus(NamedTuple):
error: str | None = None


class AnonFallbackFS:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, way to complicated for the task we are trying to solve :( really don't like the complexity and amount of code just to deal with anon that is needed only for demos / examples :(

(don't have a better solution in mind though)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure what is the less complex approach. I started adding decorator on Client methods, honestly that maybe seem less complex than this "generic" approach where more code is needed....

"""Proxy for an fsspec ``AbstractFileSystem`` that retries read methods
on ``PermissionError`` with ``anon=True``, and caches the decision per
``(protocol, bucket)`` so future calls go straight to the right mode.

Only read methods are wrapped - anon is read-only on every cloud
backend, so retrying writes would always fail and would mis-mark the
bucket as anon-impossible, defeating the feature for later reads.
"""

# fsspec read-side methods we wrap. ``open`` / ``_open`` are also here
# but the wrapper checks ``mode`` and skips retry for write modes.
READ_METHODS: ClassVar[frozenset[str]] = frozenset(
{
"_info",
"_open",
"open",
"_cat_file",
"_ls",
"_get_file",
"_glob",
"_walk",
"_find",
"_du",
"_size",
"_exists",
"_isdir",
"_isfile",
"_modified",
"sign",
}
)

def __init__(self, client: "Client") -> None:
self._client = client
kwargs = dict(client.fs_kwargs)
if (
self._should_use_anon_cache()
and client._bucket_needs_anon(client.name) is True
):
kwargs["anon"] = True
self._inner_fs = type(client).create_fs(**kwargs)

@property # type: ignore[misc]
def __class__(self):
# Pretend to be the underlying fsspec class so isinstance checks
# (e.g. pyarrow's ``isinstance(fs, AbstractFileSystem)``) pass.
return type(self._inner_fs)

Comment on lines +110 to +114

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this isinstance trick? feels hacky to pretend we're the underlying fs class

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we just return class name instead of the type itself?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trick is needed because external code (pyarrow, fsspec internals) does isinstance(fs, AbstractFileSystem) and we can't patch those checks. Subclassing isn't clean either - the concrete class is dynamic per-client (S3FileSystem / GCSFileSystem / ...).

On returning the class name instead of the type - __class__ has to be a class object; returning a string would break isinstance entirely.

def __getattr__(self, name: str):
attr = getattr(self._inner_fs, name)
if not callable(attr) or name not in self.READ_METHODS:
return attr
if asyncio.iscoroutinefunction(attr):
return self._wrap_async(name, attr)
return self._wrap_sync(name, attr)
Comment thread
ilongin marked this conversation as resolved.

@staticmethod
def _is_write_open(args, kwargs) -> bool:
"""``open`` / ``_open`` mode arg indicates write/append/exclusive."""
mode = args[1] if len(args) >= 2 else kwargs.get("mode", "rb")
return any(c in mode for c in "wax+")

def _should_use_anon_cache(self) -> bool:
# Clients with explicit creds skip the shared anon cache - reading
# could lock them out of paths their creds cover, writing could
# mis-flag a bucket as anon-needed for future no-creds callers.
return not type(self._client).has_explicit_credentials(self._client.fs_kwargs)

def _can_retry(self) -> bool:
if self._client.fs_kwargs.get("anon"):
return False
if not self._should_use_anon_cache():
return True
return self._client._bucket_needs_anon(self._client.name) is None

def _swap_to_anon(self) -> None:
self._inner_fs = type(self._client).create_fs(
**{**self._client.fs_kwargs, "anon": True}
)

def _wrap_sync(self, name, attr):
def call(*args, **kwargs):
if name in ("open", "_open") and self._is_write_open(args, kwargs):
return attr(*args, **kwargs)
try:
return attr(*args, **kwargs)
except PermissionError:
if not self._can_retry():
raise
saved = self._inner_fs
self._swap_to_anon()
keep_anon = False
try:
result = getattr(self._inner_fs, name)(*args, **kwargs)
keep_anon = True
if self._should_use_anon_cache():
self._client._mark_bucket_anon(self._client.name, True)
return result
except PermissionError:
if self._should_use_anon_cache():
self._client._mark_bucket_anon(self._client.name, False)
raise
finally:
if not keep_anon:
self._inner_fs = saved

return call

def _wrap_async(self, name, attr):
async def call(*args, **kwargs):
if name in ("open", "_open") and self._is_write_open(args, kwargs):
return await attr(*args, **kwargs)
try:
return await attr(*args, **kwargs)
except PermissionError:
if not self._can_retry():
raise
saved = self._inner_fs
self._swap_to_anon()
keep_anon = False
try:
result = await getattr(self._inner_fs, name)(*args, **kwargs)
keep_anon = True
if self._should_use_anon_cache():
self._client._mark_bucket_anon(self._client.name, True)
return result
except PermissionError:
if self._should_use_anon_cache():
self._client._mark_bucket_anon(self._client.name, False)
raise
finally:
if not keep_anon:
self._inner_fs = saved

return call
Comment thread
ilongin marked this conversation as resolved.
Comment on lines +147 to +201

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we abstract some logic? Most part looks similar in both sync and asynchronous one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplication is structural (Python won't let await be conditional, so sync and async need separate function bodies). Extracting helpers just moves the duplication into helpers without removing it, and the wrappers become indirection that hurts readability. I'd rather keep both inline and add a comment noting the deliberate parallel.



class Client(ABC):
MAX_THREADS = multiprocessing.cpu_count()
FS_CLASS: ClassVar[type["AbstractFileSystem"]]
PREFIX: ClassVar[str]
protocol: ClassVar[str]
# client_config keys this backend treats as credentials.
CREDENTIAL_KEYS: ClassVar[frozenset[str]] = frozenset()
# Whether ``anon=True`` is a meaningful fsspec kwarg for this backend.
# Subclasses for S3/GCS/Azure flip this to True so ``Client.fs`` returns
# an ``AnonFallbackFS`` proxy.
_ANON_FALLBACK: ClassVar[bool] = False
# Process-local cache of (protocol, bucket) anon decisions.
# True = anon retry succeeded, use anon from the start.
# False = anon retry also failed, don't bother retrying again.
# Missing key = no decision yet.
_anon_buckets: ClassVar[dict[tuple[str, str], bool]] = {}

@classmethod
def has_explicit_credentials(cls, client_config: dict | None) -> bool:
Expand All @@ -78,6 +225,14 @@ def has_explicit_credentials(cls, client_config: dict | None) -> bool:
return False
return any(k in client_config for k in cls.CREDENTIAL_KEYS)

@classmethod
def _bucket_needs_anon(cls, name: str) -> bool | None:
return cls._anon_buckets.get((cls.protocol, name))

@classmethod
def _mark_bucket_anon(cls, name: str, anon: bool) -> None:
cls._anon_buckets[(cls.protocol, name)] = anon

def __init__(self, name: str, fs_kwargs: dict[str, Any], cache: Cache) -> None:
self.name = name
self.fs_kwargs = fs_kwargs
Expand Down Expand Up @@ -232,7 +387,10 @@ def split_url(cls, url: str) -> tuple[str, str]:
@property
def fs(self) -> "AbstractFileSystem":
if not self._fs:
self._fs = self.create_fs(**self.fs_kwargs)
if type(self)._ANON_FALLBACK:
self._fs = AnonFallbackFS(self) # type: ignore[assignment]
else:
self._fs = self.create_fs(**self.fs_kwargs)
return self._fs

def url(
Expand Down
1 change: 1 addition & 0 deletions src/datachain/client/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class GCSClient(Client):
PREFIX = "gs://"
protocol = "gs"
CREDENTIAL_KEYS = frozenset({"token"})
_ANON_FALLBACK = True

@classmethod
def create_fs(cls, **kwargs) -> GCSFileSystem:
Expand Down
1 change: 1 addition & 0 deletions src/datachain/client/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ClientS3(Client):
CREDENTIAL_KEYS = frozenset(
{"key", "secret", "token", "aws_key", "aws_secret", "aws_token"}
)
_ANON_FALLBACK = True

@staticmethod
def _normalize_s3_kwargs(kwargs: dict) -> dict:
Expand Down
22 changes: 22 additions & 0 deletions tests/func/test_storage_auto_anon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import pytest

import datachain as dc
from datachain.cache import Cache
from datachain.client import Client
from datachain.client.azure import AzureClient
from datachain.client.fsspec import AnonFallbackFS
from datachain.client.gcs import GCSClient
from datachain.client.s3 import ClientS3

Expand Down Expand Up @@ -71,3 +74,22 @@ def test_explicit_anon_skips_auto_detect(probe, explicit, tmp_dir, catalog):
chain = dc.read_storage(tmp_dir.as_uri(), anon=explicit)
probe.assert_not_called()
assert chain.session.catalog.client_config.get("anon") is explicit


@pytest.mark.parametrize("cloud_type", ["s3", "gs"], indirect=True)
def test_anon_fallback_proxy_integration(
cloud_server, cloud_server_credentials, tmp_path
):
cache = Cache(str(tmp_path / "cache"), str(tmp_path / "tmp"))
client = Client.get_client(
cloud_server.src_uri, cache, **cloud_server.client_config
)

# Building fs returns the proxy for cloud clients.
_ = client.fs
assert isinstance(client._fs, AnonFallbackFS)

# End-to-end op through proxy → real fsspec → emulator.
info = client.get_file_info("description")
assert info.path == "description"
assert info.size and info.size > 0
Loading
Loading