Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
73e3c77
working on dataset soft delete
ilongin May 13, 2026
dc34aa7
adding logic and fixing tests
ilongin May 14, 2026
b0dfe90
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
d88917b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
a7f0b15
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 15, 2026
236af57
fixing test
ilongin May 15, 2026
7737683
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 18, 2026
425ac01
changing logic of solf delete to reserve removed datasets
ilongin May 18, 2026
e71555e
adding purge option
ilongin May 18, 2026
19c45b1
refactoring naming
ilongin May 20, 2026
9a57d47
refactoring
ilongin May 20, 2026
5bd2d5c
refactoring
ilongin May 21, 2026
4caecd8
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 22, 2026
ed6c6b2
fixing issue
ilongin May 22, 2026
280ab68
refactoring
ilongin May 22, 2026
c1945ab
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
9271ce3
refactor
ilongin May 25, 2026
5f92614
adding new flag
ilongin May 25, 2026
c478b11
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
090b06e
refactoring
ilongin May 25, 2026
ad7092a
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 26, 2026
c0e51a4
refactoring
ilongin May 26, 2026
445b4dd
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 26, 2026
c908d4b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 27, 2026
9b3c9c0
introducing op_uuid
ilongin May 28, 2026
f00a5ad
added a fix
ilongin May 28, 2026
7100869
merging two metastore methods into one
ilongin May 29, 2026
0bad54e
Merge branch 'main' into ilongin/12872-dataset-soft-delete
Jun 1, 2026
92e92ea
added missing tests
ilongin Jun 1, 2026
651ccb5
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 3, 2026
6393f99
added more tests
ilongin Jun 3, 2026
4683de8
removing list of statuses
ilongin Jun 4, 2026
0c21ad1
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 5, 2026
0ada7dd
fixing complete_dataset_version
ilongin Jun 5, 2026
08cb6de
refactoring
ilongin Jun 5, 2026
ce3e272
refactoring
ilongin Jun 5, 2026
85bf48f
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 8, 2026
96a8779
refactoring
ilongin Jun 8, 2026
da3b1ca
updating logs
ilongin Jun 8, 2026
80fcd45
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 9, 2026
27455ea
refactoring
ilongin Jun 9, 2026
06db87c
removed op_uuid
ilongin Jun 9, 2026
bccfeed
refactoring
ilongin Jun 10, 2026
301810a
refactoring
ilongin Jun 10, 2026
d19f0fd
refactoring
ilongin Jun 10, 2026
845cdb6
adde missing flag
ilongin Jun 12, 2026
822328f
refactoring
ilongin Jun 12, 2026
52d557e
refactoring
ilongin Jun 12, 2026
5c9461d
fix race condition
ilongin Jun 12, 2026
cdace00
refactoring
ilongin Jun 13, 2026
c991878
added transaction
ilongin Jun 13, 2026
4874900
fixing export
ilongin Jun 15, 2026
2d4b9e6
refactoring
ilongin Jun 15, 2026
99833bd
refactoring dependencies
ilongin Jun 15, 2026
40e6152
removed purge option on interface
ilongin Jun 15, 2026
664d9ab
added 2 more tests
ilongin Jun 15, 2026
fac7aee
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
f22bfc3
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
352822e
simplifying constants
ilongin Jun 15, 2026
5cbe55b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 18, 2026
fe1a214
fixing PR comments
ilongin Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DATASET_PREFIX,
DEFAULT_DATASET_VERSION,
QUERY_DATASET_PREFIX,
REMOVED_VERSION_SUFFIX,
DatasetDependency,
DatasetListRecord,
DatasetRecord,
Expand All @@ -48,7 +49,7 @@
NamespaceNotFoundError,
ProjectNotFoundError,
)
from datachain.lib.listing import get_listing
from datachain.lib.listing import get_listing, is_listing_dataset
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.progress import tqdm
Expand Down Expand Up @@ -1094,21 +1095,52 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
self.warehouse.rename_dataset_tables(dataset, dataset_updated)
return dataset_updated

def remove_dataset_version(
self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True
) -> None:
def remove_dataset_version(self, dataset: DatasetRecord, version: str) -> None:
"""
Deletes one single dataset version.
If it was last version, it removes dataset completely.
Remove a single dataset version.

For COMPLETE user-named versions this is a soft delete: rows table is
Comment thread
shcheklein marked this conversation as resolved.
Outdated
dropped, dependencies are preserved, and the version row stays with
status REMOVED so dependents can still render lineage.

For non-COMPLETE versions (CREATED/FAILED/STALE/REMOVING leftovers) and
Comment thread
shcheklein marked this conversation as resolved.
Outdated
for internal datasets (listing `lst__*` / `session_*` intermediates),
this is a hard delete: rows table dropped, dependencies removed,
version row deleted, and dataset row removed if it was the last
version.
"""
from datachain.query.session import Session

if not dataset.has_version(version):
return
v = dataset.get_version(version)
if v.status == DatasetStatus.REMOVED:
return

is_internal = is_listing_dataset(dataset.name) or dataset.name.startswith(
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Session.DATASET_PREFIX
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
soft = v.status == DatasetStatus.COMPLETE and not is_internal

self.metastore.update_dataset_version(
dataset, version, status=DatasetStatus.REMOVING
)
if drop_rows:
self.warehouse.drop_dataset_rows_table(dataset, version)
dataset = self.metastore.remove_dataset_version(dataset, version)
self.warehouse.drop_dataset_rows_table(dataset, version)

if soft:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
# Rename the `version` column to free the (dataset_id, version)
Comment thread
shcheklein marked this conversation as resolved.
Outdated
# uniqueness slot so a future save can reclaim it. The original
# semver is still recoverable via DatasetVersion.display_version.
mangled_version = f"{version}{REMOVED_VERSION_SUFFIX}{v.id}"
Comment thread
shcheklein marked this conversation as resolved.
Outdated
self.metastore.update_dataset_version(
dataset,
version,
version=mangled_version,
status=DatasetStatus.REMOVED,
removed_at=datetime.now(timezone.utc),
)
else:
self.metastore.remove_dataset_version(dataset, version)

def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
num_removed = 0
Expand Down
42 changes: 27 additions & 15 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,14 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:

@abstractmethod
def update_dataset_version(
self, dataset: DatasetRecord, version: str, **kwargs
self, dataset: DatasetRecord, dataset_version: str, **kwargs
Comment thread
shcheklein marked this conversation as resolved.
Outdated
) -> DatasetVersion:
"""Updates dataset version fields."""
"""Updates dataset version fields.

``dataset_version`` is the lookup key (the current value of the row's
``version`` column). Pass ``version=`` in ``kwargs`` to rename the
column itself.
"""

@abstractmethod
def remove_dataset_version(
Expand Down Expand Up @@ -846,6 +851,7 @@ def _datasets_versions_columns(cls) -> list["SchemaItem"]:
Column("schema", JSON, nullable=True),
Column("job_id", Text, nullable=True),
Column("content_hash", Text, nullable=True),
Column("removed_at", DateTime(timezone=True), nullable=True),
UniqueConstraint("dataset_id", "version"),
]

Expand Down Expand Up @@ -1381,14 +1387,19 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
return result_ds

def update_dataset_version(
self, dataset: DatasetRecord, version: str, **kwargs
self, dataset: DatasetRecord, dataset_version: str, **kwargs
) -> DatasetVersion:
"""Updates dataset fields."""
"""Updates dataset version fields.

``dataset_version`` is the lookup key (the current value of the row's
``version`` column). Pass ``version=`` in ``kwargs`` to rename the
column itself.
"""
logger.debug(
"Metastore.update_dataset_version called for %s@%s: "
"num_objects=%s, size=%s, preview_len=%s, all_fields=%s",
dataset.name,
version,
dataset_version,
kwargs.get("num_objects"),
kwargs.get("size"),
len(kwargs["preview"])
Expand Down Expand Up @@ -1440,16 +1451,16 @@ def update_dataset_version(
values[field] = value
version_values[field] = value

dataset_version = dataset.get_version(version)
version_obj = dataset.get_version(dataset_version)

if not values:
return dataset_version
return version_obj

logger.debug(
"Writing to database for %s@%s: num_objects=%s, size=%s, "
"preview_serialized=%s, fields_to_update=%s",
dataset.name,
version,
dataset_version,
values.get("num_objects"),
values.get("size"),
bool(values.get("preview")),
Expand All @@ -1459,21 +1470,21 @@ def update_dataset_version(
dv = self._datasets_versions
self.db.execute(
self._datasets_versions_update()
.where(dv.c.dataset_id == dataset.id, dv.c.version == version)
.where(dv.c.dataset_id == dataset.id, dv.c.version == dataset_version)
.values(values),
) # type: ignore [attr-defined]

dataset_version.update(**version_values)
version_obj.update(**version_values)
logger.debug(
"Dataset version updated successfully: %s@%s, "
"final_num_objects=%s, final_size=%s, has_preview=%s",
dataset.name,
version,
dataset_version.num_objects,
dataset_version.size,
bool(getattr(dataset_version, "_preview_data", None)),
dataset_version,
version_obj.num_objects,
version_obj.size,
bool(getattr(version_obj, "_preview_data", None)),
)
return dataset_version
return version_obj

def _parse_dataset(
self,
Expand Down Expand Up @@ -2830,6 +2841,7 @@ def _get_dataset_version_for_job_ancestry_query(
self._projects.c.name == project_name,
self._dataset_version_jobs.c.job_id.in_(job_ancestry),
self._dataset_version_jobs.c.is_creator.is_(True),
self._datasets_versions.c.status != DatasetStatus.REMOVED,
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
.order_by(desc(self._dataset_version_jobs.c.created_at))
.limit(1)
Expand Down
67 changes: 50 additions & 17 deletions src/datachain/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
DEFAULT_DATASET_VERSION = "1.0.0"
DATASET_NAME_RESERVED_CHARS = [".", "@"]
DATASET_NAME_REPLACEMENT_CHAR = "_"
# Suffix appended to the `version` column on soft-delete so the
# (dataset_id, version) uniqueness slot is freed for reuse. The semver
# part before the suffix is the version originally claimed by the user.
REMOVED_VERSION_SUFFIX = "~removed-"


# StorageURI represents a normalized URI to a valid storage location
Expand Down Expand Up @@ -224,6 +228,13 @@ def parse(

assert dataset_name is not None

# The `version` column on REMOVED tombstones carries a mangle suffix
Comment thread
shcheklein marked this conversation as resolved.
Outdated
# so the (dataset_id, version) slot is reusable. For lineage display
# we surface the original semver.
display_version = dataset_version
if display_version and REMOVED_VERSION_SUFFIX in display_version:
display_version = display_version.split(REMOVED_VERSION_SUFFIX, 1)[0]

return cls(
id,
(
Expand All @@ -234,7 +245,7 @@ def parse(
namespace_name,
project_name,
dataset_name,
dataset_version or None, # type: ignore[arg-type]
display_version or None, # type: ignore[arg-type]
dataset_version_created_at, # type: ignore[arg-type]
[],
)
Expand Down Expand Up @@ -264,6 +275,7 @@ class DatasetStatus:
COMPLETE = 4
STALE = 6
REMOVING = 7
REMOVED = 8


@dataclass
Expand Down Expand Up @@ -295,6 +307,7 @@ class DatasetVersion:
query_script: str = ""
job_id: str | None = None
content_hash: str | None = None
removed_at: datetime | None = None

@classmethod
def parse( # noqa: PLR0913
Expand All @@ -318,6 +331,7 @@ def parse( # noqa: PLR0913
query_script: str = "",
job_id: str | None = None,
content_hash: str | None = None,
removed_at: datetime | None = None,
*,
preview_loaded: bool = True,
):
Expand Down Expand Up @@ -346,12 +360,21 @@ def parse( # noqa: PLR0913
query_script=query_script,
job_id=job_id,
content_hash=content_hash,
removed_at=removed_at,
_preview_loaded=preview_loaded,
)

@property
def version_value(self) -> int:
return semver.value(self.version)
return semver.value(self.display_version)

@property
def display_version(self) -> str:
"""Real semver version. For REMOVED tombstones, strips the internal
mangle suffix that frees the (dataset_id, version) slot."""
if REMOVED_VERSION_SUFFIX in self.version:
return self.version.split(REMOVED_VERSION_SUFFIX, 1)[0]
return self.version

def __eq__(self, other):
if not isinstance(other, DatasetVersion):
Expand All @@ -372,6 +395,7 @@ def is_final_status(self) -> bool:
DatasetStatus.COMPLETE,
DatasetStatus.STALE,
DatasetStatus.REMOVING,
DatasetStatus.REMOVED,
]

def update(self, **kwargs):
Expand Down Expand Up @@ -574,6 +598,7 @@ def parse( # noqa: PLR0913
version_schema: str | None = None,
version_job_id: str | None = None,
version_content_hash: str | None = None,
version_removed_at: datetime | None = None,
*,
versions_loaded: bool = True,
preview_loaded: bool = True,
Expand Down Expand Up @@ -632,6 +657,7 @@ def parse( # noqa: PLR0913
version_query_script or "",
version_job_id,
version_content_hash,
version_removed_at,
preview_loaded=preview_loaded,
)
versions_list = [dataset_version]
Expand Down Expand Up @@ -674,15 +700,14 @@ def has_version(self, version: str) -> bool:
def is_valid_next_version(self, version: str) -> bool:
"""
Checks if a number can be a valid next latest version for dataset.
The only rule is that it cannot be lower than current latest version
The only rule is that it cannot be lower than current latest live
version. REMOVED tombstones free their slot (see
REMOVED_VERSION_SUFFIX) so they don't participate in comparison.
"""
if not self.versions:
if not self.live_versions:
return True

return not (
self.latest_version
and semver.value(self.latest_version) >= semver.value(version)
)
return semver.value(self.latest_version) < semver.value(version)

def get_version(self, version: str) -> DatasetVersion:
if not self.has_version(version):
Expand Down Expand Up @@ -730,7 +755,7 @@ def next_version_major(self) -> str:
"""
Returns the next auto-incremented version if the major part is being bumped.
"""
if not self.versions:
if not self.live_versions:
return "1.0.0"

major, _, _ = semver.parse(self.latest_version)
Expand All @@ -741,7 +766,7 @@ def next_version_minor(self) -> str:
"""
Returns the next auto-incremented version if the minor part is being bumped.
"""
if not self.versions:
if not self.live_versions:
return "1.0.0"

major, minor, _ = semver.parse(self.latest_version)
Expand All @@ -752,18 +777,23 @@ def next_version_patch(self) -> str:
"""
Returns the next auto-incremented version if the patch part is being bumped.
"""
if not self.versions:
if not self.live_versions:
return "1.0.0"

major, minor, patch = semver.parse(self.latest_version)
return semver.create(major, minor, patch + 1)

@property
def live_versions(self) -> list[DatasetVersion]:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
"""Versions excluding REMOVED tombstones."""
return [v for v in self.versions if v.status != DatasetStatus.REMOVED]

@property
def latest_version(self) -> str:
"""Returns latest version of a dataset"""
if not self.versions:
"""Latest user-visible version (skips REMOVED tombstones)."""
if not self.live_versions:
raise DatasetVersionNotFoundError("Dataset has no versions")
Comment thread
ilongin marked this conversation as resolved.
Outdated
return max(self.versions).version
return max(self.live_versions).version

@property
def latest_complete_version(self) -> str | None:
Expand All @@ -783,7 +813,9 @@ def latest_major_version(self, major: int) -> str | None:
and we call `.latest_major_version(2)` it will return: "2.4.0".
If no major version is find with input value, None will be returned
"""
versions = [v for v in self.versions if semver.parse(v.version)[0] == major]
versions = [
v for v in self.live_versions if semver.parse(v.version)[0] == major
]
if not versions:
return None
return max(versions).version
Expand Down Expand Up @@ -811,7 +843,7 @@ def latest_compatible_version(self, version_spec: str) -> str | None:
# Convert dataset versions to packaging.Version objects
# and filter compatible ones
compatible_versions = []
for v in self.versions:
for v in self.live_versions:
pkg_version = Version(v.version)
if spec_set.contains(pkg_version):
compatible_versions.append(v)
Expand Down Expand Up @@ -943,7 +975,8 @@ def full_name(self) -> str:
return f"{self.project.namespace.name}.{self.project.name}.{self.name}"

def latest_version(self) -> DatasetListVersion:
Comment thread
shcheklein marked this conversation as resolved.
return max(self.versions, key=lambda v: v.version_value)
live = [v for v in self.versions if v.status != DatasetStatus.REMOVED]
return max(live or self.versions, key=lambda v: v.version_value)
Comment thread
ilongin marked this conversation as resolved.
Outdated

@property
def is_bucket_listing(self) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions src/datachain/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ def _get_source_info(
include_incomplete=False,
)

# The version the dep points at may have been soft-deleted (REMOVED
# tombstone). Without a readable previous version we can't compute a
# diff; fall back to normal dataset creation, same as a missing dep.
if not source_ds.has_version(source_ds_dep.version):
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Comment thread
shcheklein marked this conversation as resolved.
Outdated
return None, None, None, None

return (
source_ds.name,
source_ds.project,
Expand Down
Loading
Loading