Skip to content
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
73e3c77
working on dataset soft delete
ilongin May 13, 2026
dc34aa7
adding logic and fixing tests
ilongin May 14, 2026
b0dfe90
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
d88917b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
a7f0b15
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 15, 2026
236af57
fixing test
ilongin May 15, 2026
7737683
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 18, 2026
425ac01
changing logic of solf delete to reserve removed datasets
ilongin May 18, 2026
e71555e
adding purge option
ilongin May 18, 2026
19c45b1
refactoring naming
ilongin May 20, 2026
9a57d47
refactoring
ilongin May 20, 2026
5bd2d5c
refactoring
ilongin May 21, 2026
4caecd8
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 22, 2026
ed6c6b2
fixing issue
ilongin May 22, 2026
280ab68
refactoring
ilongin May 22, 2026
c1945ab
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
9271ce3
refactor
ilongin May 25, 2026
5f92614
adding new flag
ilongin May 25, 2026
c478b11
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
090b06e
refactoring
ilongin May 25, 2026
ad7092a
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 26, 2026
c0e51a4
refactoring
ilongin May 26, 2026
445b4dd
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 26, 2026
c908d4b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 27, 2026
9b3c9c0
introducing op_uuid
ilongin May 28, 2026
f00a5ad
added a fix
ilongin May 28, 2026
7100869
merging two metastore methods into one
ilongin May 29, 2026
0bad54e
Merge branch 'main' into ilongin/12872-dataset-soft-delete
Jun 1, 2026
92e92ea
added missing tests
ilongin Jun 1, 2026
651ccb5
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 3, 2026
6393f99
added more tests
ilongin Jun 3, 2026
4683de8
removing list of statuses
ilongin Jun 4, 2026
0c21ad1
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 5, 2026
0ada7dd
fixing complete_dataset_version
ilongin Jun 5, 2026
08cb6de
refactoring
ilongin Jun 5, 2026
ce3e272
refactoring
ilongin Jun 5, 2026
85bf48f
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 8, 2026
96a8779
refactoring
ilongin Jun 8, 2026
da3b1ca
updating logs
ilongin Jun 8, 2026
80fcd45
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 9, 2026
27455ea
refactoring
ilongin Jun 9, 2026
06db87c
removed op_uuid
ilongin Jun 9, 2026
bccfeed
refactoring
ilongin Jun 10, 2026
301810a
refactoring
ilongin Jun 10, 2026
d19f0fd
refactoring
ilongin Jun 10, 2026
845cdb6
adde missing flag
ilongin Jun 12, 2026
822328f
refactoring
ilongin Jun 12, 2026
52d557e
refactoring
ilongin Jun 12, 2026
5c9461d
fix race condition
ilongin Jun 12, 2026
cdace00
refactoring
ilongin Jun 13, 2026
c991878
added transaction
ilongin Jun 13, 2026
4874900
fixing export
ilongin Jun 15, 2026
2d4b9e6
refactoring
ilongin Jun 15, 2026
99833bd
refactoring dependencies
ilongin Jun 15, 2026
40e6152
removed purge option on interface
ilongin Jun 15, 2026
664d9ab
added 2 more tests
ilongin Jun 15, 2026
fac7aee
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
f22bfc3
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
352822e
simplifying constants
ilongin Jun 15, 2026
5cbe55b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 18, 2026
fe1a214
fixing PR comments
ilongin Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 157 additions & 38 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
NamespaceNotFoundError,
ProjectNotFoundError,
)
from datachain.lib.listing import get_listing
from datachain.lib.listing import get_listing, is_listing_dataset
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.progress import tqdm
Expand Down Expand Up @@ -1083,14 +1083,25 @@ def complete_dataset_version(
as COMPLETE.
"""
self.update_dataset_version_with_warehouse_info(dataset, version, **kwargs)
self.metastore.update_dataset_status(
dataset,
DatasetStatus.COMPLETE,
version=version,
error_message=error_message,
error_stack=error_stack,
script_output=script_output,
)
# Guard the version-level write: only flip to COMPLETE if the
Comment thread
shcheklein marked this conversation as resolved.
# version is still in a saveable state. Prevents a late-arriving
# completion from stomping a concurrent removal.
try:
self.metastore.update_dataset_status(
dataset,
DatasetStatus.COMPLETE,
version=version,
error_message=error_message,
error_stack=error_stack,
script_output=script_output,
expected_status=DatasetStatus.CREATED,
)
except DataChainError as e:
raise DataChainError(
f"Could not finalize {dataset.name}@{version}: "
"the version was removed or modified before save completed. "
"This usually means it was deleted concurrently - please retry."
) from e

def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
"""Updates dataset fields."""
Expand All @@ -1099,26 +1110,119 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
return dataset_updated

def remove_dataset_version(
self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True
self,
dataset: DatasetRecord,
version: str,
keep_metadata: bool,
) -> None:
Comment thread
shcheklein marked this conversation as resolved.
"""
Deletes one single dataset version.
If it was last version, it removes dataset completely.
"""Remove a single dataset version.

``keep_metadata=True``: drop rows table, mark REMOVED (semver +
lineage kept). Requires user-facing dataset (not ``lst__*`` /
``session_*``) and status in COMPLETE/REMOVING/REMOVED.

``keep_metadata=False``: drop rows table, delete the version row.
Allowed from any status except REMOVING.
"""
if not dataset.has_version(version):
return
self.metastore.update_dataset_version(
dataset, version, status=DatasetStatus.REMOVING
v = dataset.get_version(version)
Comment thread
ilongin marked this conversation as resolved.
Outdated

if keep_metadata and dataset.is_internal:
raise DataChainError(
f"Internal dataset {dataset.name} cannot be removed "
"while keeping metadata"
)
if keep_metadata and not v.is_soft_deletable:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call it is_internal? or even is_system?

is soft deletable again is not reusable - we are just leaking removal business logic outside

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was it addressed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely removed this method as it's not really needed so no need for figuring out naming. I agree that soft delete should not be used anywhere but not sure what is substitute for that to be honest ..

raise DataChainError(
f"Cannot remove {dataset.name}@{version} while keeping "
f"metadata: current status is {v.status}, expected "
"COMPLETE or REMOVING"
)
if (
not keep_metadata
and v.status == DatasetStatus.REMOVING

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other statuses? REMOVING_TOTAL, REMOVED?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about checking if someone wants to fully remove / wipe dataset version but in the same time default removing with keeping metadata is present...in that way we should raise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so REMOVING is ongoing removal where we keep metadata. REMOVING_TOTAL is ongoing removal status which ends up with removing metadata and actual data table

and not dataset.is_internal
):
raise DataChainError(
f"Cannot remove {dataset.name}@{version} entirely: "
"a removal that keeps metadata is already in progress"
)

self._claim_and_remove(
dataset, version, expected_status=v.status, keep_metadata=keep_metadata
)
if drop_rows:
self.warehouse.drop_dataset_rows_table(dataset, version)
dataset = self.metastore.remove_dataset_version(dataset, version)

def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int:
def _claim_and_remove(
self,
dataset: DatasetRecord,
version: str,
*,
expected_status: int,
keep_metadata: bool,
) -> None:
"""Claim the version with a transient status, drop its rows table,
then finalize. ``keep_metadata=True`` finalizes as a REMOVED tombstone
(semver + lineage preserved); ``False`` deletes the version row.
A REMOVED-to-REMOVED keep is a no-op since the tombstone is already
the final state.
"""
if keep_metadata and expected_status == DatasetStatus.REMOVED:
return

transient = (
DatasetStatus.REMOVING if keep_metadata else DatasetStatus.REMOVING_TOTAL
)
claimed = self.metastore.update_dataset_version(
dataset,
version,
expected_status=expected_status,
status=transient,
)
if claimed is None:
logger.debug(
"Skipped remove of %s@%s: another caller is already handling it",
dataset.name,
version,
)
return

self.warehouse.drop_dataset_rows_table(dataset, version)

if keep_metadata:
self.metastore.update_dataset_version(
dataset,
version,
status=DatasetStatus.REMOVED,
removed_at=datetime.now(timezone.utc),
)
else:
self.metastore.remove_dataset_version(dataset, version)

def _remove_versions(
self,
pairs: Iterable[tuple[DatasetRecord, str]],
*,
keep_metadata: bool | None = None,
) -> int:
"""Bulk remove versions (GC, session cleanup, CLI cleanup, job cleanup,
user-facing bulk delete). When ``keep_metadata`` is None, infers per
version: resume soft delete if REMOVING, else wipe. When given
explicitly, honors the caller's intent for every version.
"""
num_removed = 0
for dataset, version in pairs:
try:
self.remove_dataset_version(dataset, version)
v = dataset.get_version(version)
if v.status == DatasetStatus.REMOVED:
continue
if keep_metadata is None:
keep = (
not dataset.is_internal and v.status == DatasetStatus.REMOVING
)
else:
keep = keep_metadata
self.remove_dataset_version(dataset, version, keep_metadata=keep)
num_removed += 1
except Exception as e: # noqa: BLE001
logger.warning(
Expand All @@ -1130,13 +1234,17 @@ def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int:
return num_removed

def remove_dataset_versions(
self, job_id: str | None = None, version_ids: list[int] | None = None
self,
job_id: str | None = None,
version_ids: list[int] | None = None,
*,
keep_metadata: bool | None = None,
) -> int:
versions_to_remove = self.metastore.get_dataset_versions(
job_id=job_id,
version_ids=version_ids,
)
return self._remove_versions(versions_to_remove)
return self._remove_versions(versions_to_remove, keep_metadata=keep_metadata)

def get_temp_table_names(self) -> list[str]:
return self.warehouse.get_temp_table_names()
Expand All @@ -1155,7 +1263,7 @@ def cleanup_dataset_versions(self, job_id: str | None = None) -> int:
Clean up dataset versions that are no longer needed.

Removes dataset versions that:
- Have status CREATED, FAILED, STALE, or REMOVING
- Have status CREATED, FAILED, STALE, REMOVING, or REMOVING_TOTAL
- Belong to completed/failed/canceled jobs (not running)
- Are session_* datasets from finished jobs (orphaned intermediates)

Expand Down Expand Up @@ -1267,7 +1375,6 @@ def get_dataset(
include_incomplete: bool = True,
include_preview: bool = False,
) -> DatasetRecord:
from datachain.lib.listing import is_listing_dataset

namespace_name = namespace_name or self.metastore.default_namespace_name
project_name = project_name or self.metastore.default_project_name
Expand Down Expand Up @@ -1423,10 +1530,7 @@ def get_dataset_dependencies(
dataset_version_id = dataset_version.id

if not indirect:
return self.metastore.get_direct_dataset_dependencies(
dataset,
version,
)
return self.metastore.get_direct_dataset_dependencies(dataset, version)

return self.get_dataset_dependencies_by_ids(
dataset_id,
Expand Down Expand Up @@ -1512,7 +1616,7 @@ def listings(self, prefix: str | None = None) -> list["ListingInfo"]:
Returns list of ListingInfo objects which are representing specific
storage listing datasets
"""
from datachain.lib.listing import LISTING_PREFIX, is_listing_dataset
from datachain.lib.listing import LISTING_PREFIX
from datachain.lib.listing_info import ListingInfo

if prefix and not prefix.startswith(LISTING_PREFIX):
Expand Down Expand Up @@ -1590,6 +1694,7 @@ def export_dataset_table(
namespace_name=project.namespace.name if project else None,
project_name=project.name if project else None,
versions=[version],
include_incomplete=False,
)

self.warehouse.export_dataset_table(
Expand All @@ -1607,6 +1712,7 @@ def remove_dataset(
project: Project | None = None,
version: str | None = None,
force: bool | None = False,
keep_metadata: bool = True,
):
dataset = self.get_dataset(
name,
Expand All @@ -1621,15 +1727,19 @@ def remove_dataset(
f"Dataset {name} doesn't have version {version}"
)

if version:
self.remove_dataset_version(dataset, version)
return

for v in dataset.versions:
version = v.version
versions = [version] if version else [v.version for v in dataset.versions]
for ver in versions:
v = dataset.get_version(ver)
# keep_metadata only has meaning for user-facing datasets with
# soft-deletable versions; elsewhere there's no semver/lineage to
# preserve, so downgrade to wipe transparently.
effective_keep = (
keep_metadata and not dataset.is_internal and v.is_soft_deletable
)
self.remove_dataset_version(
dataset,
version,
ver,
keep_metadata=effective_keep,
)

def edit_dataset(
Expand Down Expand Up @@ -1801,7 +1911,7 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912
return

print("Cleaning up stale existing dataset version")
self.remove_dataset_version(ds, ver.version)
self.remove_dataset_version(ds, ver.version, keep_metadata=False)
except DatasetNotFoundError:
pass

Expand Down Expand Up @@ -1836,14 +1946,23 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912
)
if local_dataset.has_version(local_ds_version):
local_ver = local_dataset.get_version(local_ds_version)
if local_ver.is_removed:
raise DataChainError(
f"Local dataset {local_ds_uri} was removed; "
"the version number is reserved. Pull into a "
"different version or remove the tombstone "
"explicitly first."
)
if local_ver.status != DatasetStatus.COMPLETE:
# Stale incomplete version from a different UUID —
# clean it up so this pull can proceed.
print(
"Cleaning up stale incomplete version "
f"(uuid={local_ver.uuid})"
)
self.remove_dataset_version(local_dataset, local_ds_version)
self.remove_dataset_version(
local_dataset, local_ds_version, keep_metadata=False
)
else:
raise DataChainError(
f"Local dataset {local_ds_uri} already exists with"
Expand Down
4 changes: 4 additions & 0 deletions src/datachain/catalog/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DatasetDependencyNode:
source_dataset_id: int
source_dataset_version_id: int | None
depth: int
dataset_version_status: int | None = None

@classmethod
def parse(
Expand All @@ -36,6 +37,7 @@ def parse(
source_dataset_id: int,
source_dataset_version_id: int | None,
depth: int,
dataset_version_status: int | None = None,
) -> "DatasetDependencyNode | None":
return cls(
namespace,
Expand All @@ -49,6 +51,7 @@ def parse(
source_dataset_id,
source_dataset_version_id,
depth,
dataset_version_status,
)

def to_dependency(self) -> "DatasetDependency | None":
Expand All @@ -61,6 +64,7 @@ def to_dependency(self) -> "DatasetDependency | None":
dataset_name=self.dataset_name,
dataset_version=self.dataset_version,
dataset_version_created_at=self.created_at,
dataset_version_status=self.dataset_version_status,
)


Expand Down
Loading
Loading