Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
73e3c77
working on dataset soft delete
ilongin May 13, 2026
dc34aa7
adding logic and fixing tests
ilongin May 14, 2026
b0dfe90
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
d88917b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
a7f0b15
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 15, 2026
236af57
fixing test
ilongin May 15, 2026
7737683
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 18, 2026
425ac01
changing logic of solf delete to reserve removed datasets
ilongin May 18, 2026
e71555e
adding purge option
ilongin May 18, 2026
19c45b1
refactoring naming
ilongin May 20, 2026
9a57d47
refactoring
ilongin May 20, 2026
5bd2d5c
refactoring
ilongin May 21, 2026
4caecd8
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 22, 2026
ed6c6b2
fixing issue
ilongin May 22, 2026
280ab68
refactoring
ilongin May 22, 2026
c1945ab
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
9271ce3
refactor
ilongin May 25, 2026
5f92614
adding new flag
ilongin May 25, 2026
c478b11
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
090b06e
refactoring
ilongin May 25, 2026
ad7092a
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 26, 2026
c0e51a4
refactoring
ilongin May 26, 2026
445b4dd
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 26, 2026
c908d4b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 27, 2026
9b3c9c0
introducing op_uuid
ilongin May 28, 2026
f00a5ad
added a fix
ilongin May 28, 2026
7100869
merging two metastore methods into one
ilongin May 29, 2026
0bad54e
Merge branch 'main' into ilongin/12872-dataset-soft-delete
Jun 1, 2026
92e92ea
added missing tests
ilongin Jun 1, 2026
651ccb5
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 3, 2026
6393f99
added more tests
ilongin Jun 3, 2026
4683de8
removing list of statuses
ilongin Jun 4, 2026
0c21ad1
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 5, 2026
0ada7dd
fixing complete_dataset_version
ilongin Jun 5, 2026
08cb6de
refactoring
ilongin Jun 5, 2026
ce3e272
refactoring
ilongin Jun 5, 2026
85bf48f
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 8, 2026
96a8779
refactoring
ilongin Jun 8, 2026
da3b1ca
updating logs
ilongin Jun 8, 2026
80fcd45
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 9, 2026
27455ea
refactoring
ilongin Jun 9, 2026
06db87c
removed op_uuid
ilongin Jun 9, 2026
bccfeed
refactoring
ilongin Jun 10, 2026
301810a
refactoring
ilongin Jun 10, 2026
d19f0fd
refactoring
ilongin Jun 10, 2026
845cdb6
adde missing flag
ilongin Jun 12, 2026
822328f
refactoring
ilongin Jun 12, 2026
52d557e
refactoring
ilongin Jun 12, 2026
5c9461d
fix race condition
ilongin Jun 12, 2026
cdace00
refactoring
ilongin Jun 13, 2026
c991878
added transaction
ilongin Jun 13, 2026
4874900
fixing export
ilongin Jun 15, 2026
2d4b9e6
refactoring
ilongin Jun 15, 2026
99833bd
refactoring dependencies
ilongin Jun 15, 2026
40e6152
removed purge option on interface
ilongin Jun 15, 2026
664d9ab
added 2 more tests
ilongin Jun 15, 2026
fac7aee
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
f22bfc3
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
352822e
simplifying constants
ilongin Jun 15, 2026
5cbe55b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 18, 2026
fe1a214
fixing PR comments
ilongin Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
NamespaceNotFoundError,
ProjectNotFoundError,
)
from datachain.lib.listing import get_listing
from datachain.lib.listing import get_listing, is_listing_dataset
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.progress import tqdm
Expand Down Expand Up @@ -1095,20 +1095,56 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
return dataset_updated

def remove_dataset_version(
self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True
self, dataset: DatasetRecord, version: str, purge: bool = False
Comment thread
shcheklein marked this conversation as resolved.
Outdated
) -> None:
Comment thread
shcheklein marked this conversation as resolved.
"""
Deletes one single dataset version.
If it was last version, it removes dataset completely.
Remove a single dataset version.

For COMPLETE user-named versions this is a soft delete: rows table is
Comment thread
shcheklein marked this conversation as resolved.
Outdated
dropped, dependencies are preserved, and the version row stays with
status REMOVED so dependents can still render lineage. The semver is
permanently reserved — saving the same name again auto-bumps past it.

Comment thread
ilongin marked this conversation as resolved.
Outdated
For non-COMPLETE versions (CREATED/FAILED/STALE/REMOVING leftovers) and
Comment thread
shcheklein marked this conversation as resolved.
Outdated
for internal datasets (listing `lst__*` / `session_*` intermediates),
this is a hard delete: rows table dropped, dependencies removed,
version row deleted, and dataset row removed if it was the last
version.

When ``purge=True`` the soft branch is skipped entirely — everything
Comment thread
shcheklein marked this conversation as resolved.
Outdated
is hard-deleted including any already-REMOVED record. Reserved for
admin tools (e.g. Django admin); not exposed via the CLI or public
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Python API.
"""
from datachain.query.session import Session

if not dataset.has_version(version):
return
self.metastore.update_dataset_version(
dataset, version, status=DatasetStatus.REMOVING
v = dataset.get_version(version)
Comment thread
ilongin marked this conversation as resolved.
Outdated
if v.status == DatasetStatus.REMOVED and not purge:
return

is_internal = is_listing_dataset(dataset.name) or dataset.name.startswith(
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Session.DATASET_PREFIX
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
if drop_rows:
soft = v.status == DatasetStatus.COMPLETE and not is_internal and not purge

# Rows table is already gone for REMOVED records.
if v.status != DatasetStatus.REMOVED:
self.metastore.update_dataset_version(
dataset, version, status=DatasetStatus.REMOVING
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
self.warehouse.drop_dataset_rows_table(dataset, version)
dataset = self.metastore.remove_dataset_version(dataset, version)

if soft:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
self.metastore.update_dataset_version(
dataset,
version,
status=DatasetStatus.REMOVED,
removed_at=datetime.now(timezone.utc),
)
else:
self.metastore.remove_dataset_version(dataset, version)

def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
num_removed = 0
Expand Down
40 changes: 25 additions & 15 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,13 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:

@abstractmethod
def update_dataset_version(
self, dataset: DatasetRecord, version: str, **kwargs
self, dataset: DatasetRecord, dataset_version: str, **kwargs
Comment thread
shcheklein marked this conversation as resolved.
Outdated
) -> DatasetVersion:
"""Updates dataset version fields."""
"""Updates dataset version fields.

``dataset_version`` is the lookup key (the current value of the row's
``version`` column).
"""

@abstractmethod
def remove_dataset_version(
Expand Down Expand Up @@ -846,6 +850,7 @@ def _datasets_versions_columns(cls) -> list["SchemaItem"]:
Column("schema", JSON, nullable=True),
Column("job_id", Text, nullable=True),
Column("content_hash", Text, nullable=True),
Column("removed_at", DateTime(timezone=True), nullable=True),
UniqueConstraint("dataset_id", "version"),
]

Expand Down Expand Up @@ -1381,14 +1386,18 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
return result_ds

def update_dataset_version(
self, dataset: DatasetRecord, version: str, **kwargs
self, dataset: DatasetRecord, dataset_version: str, **kwargs
) -> DatasetVersion:
"""Updates dataset fields."""
"""Updates dataset version fields.

``dataset_version`` is the lookup key (the current value of the row's
``version`` column).
"""
logger.debug(
"Metastore.update_dataset_version called for %s@%s: "
"num_objects=%s, size=%s, preview_len=%s, all_fields=%s",
dataset.name,
version,
dataset_version,
kwargs.get("num_objects"),
kwargs.get("size"),
len(kwargs["preview"])
Expand Down Expand Up @@ -1440,16 +1449,16 @@ def update_dataset_version(
values[field] = value
version_values[field] = value

dataset_version = dataset.get_version(version)
version_obj = dataset.get_version(dataset_version)

if not values:
return dataset_version
return version_obj

logger.debug(
"Writing to database for %s@%s: num_objects=%s, size=%s, "
"preview_serialized=%s, fields_to_update=%s",
dataset.name,
version,
dataset_version,
values.get("num_objects"),
values.get("size"),
bool(values.get("preview")),
Expand All @@ -1459,21 +1468,21 @@ def update_dataset_version(
dv = self._datasets_versions
self.db.execute(
self._datasets_versions_update()
.where(dv.c.dataset_id == dataset.id, dv.c.version == version)
.where(dv.c.dataset_id == dataset.id, dv.c.version == dataset_version)
.values(values),
) # type: ignore [attr-defined]

dataset_version.update(**version_values)
version_obj.update(**version_values)
logger.debug(
"Dataset version updated successfully: %s@%s, "
"final_num_objects=%s, final_size=%s, has_preview=%s",
dataset.name,
version,
dataset_version.num_objects,
dataset_version.size,
bool(getattr(dataset_version, "_preview_data", None)),
dataset_version,
version_obj.num_objects,
version_obj.size,
bool(getattr(version_obj, "_preview_data", None)),
)
return dataset_version
return version_obj

def _parse_dataset(
self,
Expand Down Expand Up @@ -2830,6 +2839,7 @@ def _get_dataset_version_for_job_ancestry_query(
self._projects.c.name == project_name,
self._dataset_version_jobs.c.job_id.in_(job_ancestry),
self._dataset_version_jobs.c.is_creator.is_(True),
self._datasets_versions.c.status != DatasetStatus.REMOVED,
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
.order_by(desc(self._dataset_version_jobs.c.created_at))
.limit(1)
Expand Down
57 changes: 42 additions & 15 deletions src/datachain/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def parse(
namespace_name,
project_name,
dataset_name,
dataset_version or None, # type: ignore[arg-type]
dataset_version, # type: ignore[arg-type]
dataset_version_created_at, # type: ignore[arg-type]
[],
)
Expand Down Expand Up @@ -264,6 +264,7 @@ class DatasetStatus:
COMPLETE = 4
STALE = 6
REMOVING = 7
REMOVED = 8


@dataclass
Expand Down Expand Up @@ -295,6 +296,7 @@ class DatasetVersion:
query_script: str = ""
job_id: str | None = None
content_hash: str | None = None
removed_at: datetime | None = None

@classmethod
def parse( # noqa: PLR0913
Expand All @@ -318,6 +320,7 @@ def parse( # noqa: PLR0913
query_script: str = "",
job_id: str | None = None,
content_hash: str | None = None,
removed_at: datetime | None = None,
*,
preview_loaded: bool = True,
):
Expand Down Expand Up @@ -346,6 +349,7 @@ def parse( # noqa: PLR0913
query_script=query_script,
job_id=job_id,
content_hash=content_hash,
removed_at=removed_at,
_preview_loaded=preview_loaded,
)

Expand All @@ -372,6 +376,7 @@ def is_final_status(self) -> bool:
DatasetStatus.COMPLETE,
DatasetStatus.STALE,
DatasetStatus.REMOVING,
DatasetStatus.REMOVED,
]

def update(self, **kwargs):
Expand Down Expand Up @@ -574,6 +579,7 @@ def parse( # noqa: PLR0913
version_schema: str | None = None,
version_job_id: str | None = None,
version_content_hash: str | None = None,
version_removed_at: datetime | None = None,
*,
versions_loaded: bool = True,
preview_loaded: bool = True,
Expand Down Expand Up @@ -632,6 +638,7 @@ def parse( # noqa: PLR0913
version_query_script or "",
version_job_id,
version_content_hash,
version_removed_at,
preview_loaded=preview_loaded,
)
versions_list = [dataset_version]
Expand Down Expand Up @@ -674,15 +681,13 @@ def has_version(self, version: str) -> bool:
def is_valid_next_version(self, version: str) -> bool:
"""
Checks if a number can be a valid next latest version for dataset.
The only rule is that it cannot be lower than current latest version
Compares against the highest version ever used, including REMOVED
ones — once a semver has been claimed it is permanently reserved.
"""
if not self.versions:
return True

return not (
self.latest_version
and semver.value(self.latest_version) >= semver.value(version)
)
return self._max_version_value < semver.value(version)

def get_version(self, version: str) -> DatasetVersion:
if not self.has_version(version):
Expand Down Expand Up @@ -729,41 +734,60 @@ def uri(self, version: str) -> str:
def next_version_major(self) -> str:
"""
Returns the next auto-incremented version if the major part is being bumped.
Skips past REMOVED versions so deleted semvers are never reclaimed.
"""
if not self.versions:
return "1.0.0"

major, _, _ = semver.parse(self.latest_version)
major, _, _ = semver.parse(self._max_version)
return semver.create(major + 1, 0, 0)

@property
def next_version_minor(self) -> str:
"""
Returns the next auto-incremented version if the minor part is being bumped.
Skips past REMOVED versions so deleted semvers are never reclaimed.
"""
if not self.versions:
return "1.0.0"

major, minor, _ = semver.parse(self.latest_version)
major, minor, _ = semver.parse(self._max_version)
return semver.create(major, minor + 1, 0)

@property
def next_version_patch(self) -> str:
"""
Returns the next auto-incremented version if the patch part is being bumped.
Skips past REMOVED versions so deleted semvers are never reclaimed.
"""
if not self.versions:
return "1.0.0"

major, minor, patch = semver.parse(self.latest_version)
major, minor, patch = semver.parse(self._max_version)
return semver.create(major, minor, patch + 1)

@property
def live_versions(self) -> list[DatasetVersion]:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
"""Versions excluding REMOVED ones."""
return [v for v in self.versions if v.status != DatasetStatus.REMOVED]

@property
def _max_version(self) -> str:
"""Highest semver across all versions including REMOVED ones. Used for
collision avoidance — once a semver is claimed it's reserved forever,
even after soft-delete."""
return max(self.versions).version

@property
def _max_version_value(self) -> int:
return semver.value(self._max_version)

@property
def latest_version(self) -> str:
"""Returns latest version of a dataset"""
if not self.versions:
"""Latest user-visible version (skips REMOVED ones)."""
Comment thread
shcheklein marked this conversation as resolved.
Outdated
if not self.live_versions:
raise DatasetVersionNotFoundError("Dataset has no versions")
Comment thread
ilongin marked this conversation as resolved.
Outdated
return max(self.versions).version
return max(self.live_versions).version

@property
def latest_complete_version(self) -> str | None:
Expand All @@ -783,7 +807,9 @@ def latest_major_version(self, major: int) -> str | None:
and we call `.latest_major_version(2)` it will return: "2.4.0".
If no major version is find with input value, None will be returned
"""
versions = [v for v in self.versions if semver.parse(v.version)[0] == major]
versions = [
v for v in self.live_versions if semver.parse(v.version)[0] == major
]
if not versions:
return None
return max(versions).version
Expand Down Expand Up @@ -811,7 +837,7 @@ def latest_compatible_version(self, version_spec: str) -> str | None:
# Convert dataset versions to packaging.Version objects
# and filter compatible ones
compatible_versions = []
for v in self.versions:
for v in self.live_versions:
pkg_version = Version(v.version)
if spec_set.contains(pkg_version):
compatible_versions.append(v)
Expand Down Expand Up @@ -943,7 +969,8 @@ def full_name(self) -> str:
return f"{self.project.namespace.name}.{self.project.name}.{self.name}"

def latest_version(self) -> DatasetListVersion:
Comment thread
shcheklein marked this conversation as resolved.
return max(self.versions, key=lambda v: v.version_value)
live = [v for v in self.versions if v.status != DatasetStatus.REMOVED]
return max(live or self.versions, key=lambda v: v.version_value)
Comment thread
ilongin marked this conversation as resolved.
Outdated

@property
def is_bucket_listing(self) -> bool:
Expand Down
10 changes: 9 additions & 1 deletion src/datachain/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING, TypeVar

import datachain
from datachain.dataset import DatasetDependency, DatasetRecord
from datachain.dataset import DatasetDependency, DatasetRecord, DatasetStatus
from datachain.error import DatasetNotFoundError, SchemaDriftError
from datachain.project import Project
from datachain.query.dataset import UnionSchemaMismatchError
Expand Down Expand Up @@ -250,6 +250,14 @@ def _get_source_info(
include_incomplete=False,
)

# The version the dep points at may have been soft-deleted. Without a
Comment thread
shcheklein marked this conversation as resolved.
Outdated
# readable previous version we can't compute a diff; fall back to
# normal dataset creation, same as a missing dep.
if not source_ds.has_version(source_ds_dep.version) or (
source_ds.get_version(source_ds_dep.version).status == DatasetStatus.REMOVED
):
return None, None, None, None
Comment thread
ilongin marked this conversation as resolved.
Outdated

return (
source_ds.name,
source_ds.project,
Expand Down
Loading
Loading