Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
73e3c77
working on dataset soft delete
ilongin May 13, 2026
dc34aa7
adding logic and fixing tests
ilongin May 14, 2026
b0dfe90
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
d88917b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 15, 2026
a7f0b15
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 15, 2026
236af57
fixing test
ilongin May 15, 2026
7737683
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 18, 2026
425ac01
changing logic of solf delete to reserve removed datasets
ilongin May 18, 2026
e71555e
adding purge option
ilongin May 18, 2026
19c45b1
refactoring naming
ilongin May 20, 2026
9a57d47
refactoring
ilongin May 20, 2026
5bd2d5c
refactoring
ilongin May 21, 2026
4caecd8
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 22, 2026
ed6c6b2
fixing issue
ilongin May 22, 2026
280ab68
refactoring
ilongin May 22, 2026
c1945ab
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
9271ce3
refactor
ilongin May 25, 2026
5f92614
adding new flag
ilongin May 25, 2026
c478b11
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 25, 2026
090b06e
refactoring
ilongin May 25, 2026
ad7092a
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 26, 2026
c0e51a4
refactoring
ilongin May 26, 2026
445b4dd
Merge branch 'ilongin/12872-dataset-soft-delete' of github.com:datach…
ilongin May 26, 2026
c908d4b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin May 27, 2026
9b3c9c0
introducing op_uuid
ilongin May 28, 2026
f00a5ad
added a fix
ilongin May 28, 2026
7100869
merging two metastore methods into one
ilongin May 29, 2026
0bad54e
Merge branch 'main' into ilongin/12872-dataset-soft-delete
Jun 1, 2026
92e92ea
added missing tests
ilongin Jun 1, 2026
651ccb5
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 3, 2026
6393f99
added more tests
ilongin Jun 3, 2026
4683de8
removing list of statuses
ilongin Jun 4, 2026
0c21ad1
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 5, 2026
0ada7dd
fixing complete_dataset_version
ilongin Jun 5, 2026
08cb6de
refactoring
ilongin Jun 5, 2026
ce3e272
refactoring
ilongin Jun 5, 2026
85bf48f
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 8, 2026
96a8779
refactoring
ilongin Jun 8, 2026
da3b1ca
updating logs
ilongin Jun 8, 2026
80fcd45
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 9, 2026
27455ea
refactoring
ilongin Jun 9, 2026
06db87c
removed op_uuid
ilongin Jun 9, 2026
bccfeed
refactoring
ilongin Jun 10, 2026
301810a
refactoring
ilongin Jun 10, 2026
d19f0fd
refactoring
ilongin Jun 10, 2026
845cdb6
adde missing flag
ilongin Jun 12, 2026
822328f
refactoring
ilongin Jun 12, 2026
52d557e
refactoring
ilongin Jun 12, 2026
5c9461d
fix race condition
ilongin Jun 12, 2026
cdace00
refactoring
ilongin Jun 13, 2026
c991878
added transaction
ilongin Jun 13, 2026
4874900
fixing export
ilongin Jun 15, 2026
2d4b9e6
refactoring
ilongin Jun 15, 2026
99833bd
refactoring dependencies
ilongin Jun 15, 2026
40e6152
removed purge option on interface
ilongin Jun 15, 2026
664d9ab
added 2 more tests
ilongin Jun 15, 2026
fac7aee
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
f22bfc3
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 15, 2026
352822e
simplifying constants
ilongin Jun 15, 2026
5cbe55b
Merge branch 'main' into ilongin/12872-dataset-soft-delete
ilongin Jun 18, 2026
fe1a214
fixing PR comments
ilongin Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 158 additions & 19 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
NamespaceNotFoundError,
ProjectNotFoundError,
)
from datachain.lib.listing import get_listing
from datachain.lib.listing import get_listing, is_listing_dataset
from datachain.node import DirType, Node, NodeWithPath
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.progress import tqdm
Expand Down Expand Up @@ -1079,14 +1079,25 @@ def complete_dataset_version(
as COMPLETE.
"""
self.update_dataset_version_with_warehouse_info(dataset, version, **kwargs)
self.metastore.update_dataset_status(
dataset,
DatasetStatus.COMPLETE,
version=version,
error_message=error_message,
error_stack=error_stack,
script_output=script_output,
)
# Guard the version-level write: only flip to COMPLETE if the
Comment thread
shcheklein marked this conversation as resolved.
# version is still in a saveable state. Prevents a late-arriving
# completion from stomping a concurrent removal.
try:
self.metastore.update_dataset_status(
dataset,
DatasetStatus.COMPLETE,
version=version,
error_message=error_message,
error_stack=error_stack,
script_output=script_output,
expected_status=DatasetStatus.CREATED,
)
except DataChainError as e:
raise DataChainError(
f"Could not finalize {dataset.name}@{version}: "
"the version was removed or modified before save completed. "
"This usually means it was deleted concurrently - please retry."
) from e

def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
"""Updates dataset fields."""
Expand All @@ -1095,20 +1106,144 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord:
return dataset_updated

def remove_dataset_version(
self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True
self,
dataset: DatasetRecord,
version: str,
keep_metadata: bool | None = None,
) -> None:
Comment thread
shcheklein marked this conversation as resolved.
"""Remove a single dataset version.

``keep_metadata=True`` keeps a REMOVED tombstone (the rows table is
dropped). ``keep_metadata=False`` fully wipes the version row. When
explicit, the call raises if the version's current status is not
compatible with the requested mode.

``keep_metadata=None`` (default) infers the mode from the version's
current status: in-progress removals resume in their existing mode,
incomplete or internal-dataset rows are wiped.
"""
Deletes one single dataset version.
If it was last version, it removes dataset completely.
"""
from datachain.query.session import Session

if not dataset.has_version(version):
return
v = dataset.get_version(version)

is_internal = is_listing_dataset(dataset.name) or dataset.name.startswith(
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Comment thread
shcheklein marked this conversation as resolved.
Outdated
Session.DATASET_PREFIX
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)

if keep_metadata is None:
# Inferred mode (used by GC/cleanup paths): REMOVING resumes soft,
Comment thread
shcheklein marked this conversation as resolved.
Outdated
# everything else gets wiped.
keep_metadata = v.status == DatasetStatus.REMOVING
else:
Comment thread
ilongin marked this conversation as resolved.
Outdated
# Explicit mode: validate against current status / dataset kind.
if keep_metadata and is_internal:
raise DataChainError(
f"Internal dataset {dataset.name} cannot be removed "
"while keeping metadata"
)
if keep_metadata and v.status not in (
DatasetStatus.COMPLETE,
DatasetStatus.REMOVING,
DatasetStatus.REMOVED,
):
raise DataChainError(
f"Cannot remove {dataset.name}@{version} while keeping "
f"metadata: current status is {v.status}, expected "
"COMPLETE or REMOVING"
)
if not keep_metadata and v.status == DatasetStatus.REMOVING:
raise DataChainError(
f"Cannot remove {dataset.name}@{version} entirely: "
"a removal that keeps metadata is already in progress"
)

if keep_metadata:
self._remove_version_keep_metadata(dataset, version, v.status)
else:
self._remove_version_wipe_metadata(dataset, version, v.status)

def _try_claim_transition(
self,
dataset: DatasetRecord,
version: str,
*,
target_status: int,
expected_status: int,
) -> bool:
"""Atomically transition a version's status; signal win/lost via UUID.

Two callers matching the same source can both UPDATE; the second one's
``op_uuid`` overwrites the first's. Re-reading after the write reveals
which UUID landed - matching ours means we won.
"""
my_uuid = str(uuid4())
self.metastore.update_dataset_version(
dataset,
version,
expected_status=expected_status,
status=target_status,
op_uuid=my_uuid,
)
try:
fresh = self.metastore.get_dataset(
Comment thread
shcheklein marked this conversation as resolved.
Outdated
dataset.name,
namespace_name=dataset.project.namespace.name,
project_name=dataset.project.name,
versions=[version],
include_incomplete=True,
)
return fresh.get_version(version).op_uuid == my_uuid
except (DatasetNotFoundError, DatasetVersionNotFoundError):
return False

def _remove_version_keep_metadata(
self, dataset: DatasetRecord, version: str, expected_status: int
) -> None:
"""Drop rows table, mark version REMOVED (keeps semver + lineage)."""
if expected_status == DatasetStatus.REMOVED:
return
if not self._try_claim_transition(
dataset,
version,
target_status=DatasetStatus.REMOVING,
expected_status=expected_status,
):
logger.debug(
"Skipped remove of %s@%s: another caller is already handling it",
dataset.name,
version,
)
return

self.warehouse.drop_dataset_rows_table(dataset, version)
self.metastore.update_dataset_version(
dataset, version, status=DatasetStatus.REMOVING
dataset,
version,
status=DatasetStatus.REMOVED,
removed_at=datetime.now(timezone.utc),
)
if drop_rows:
self.warehouse.drop_dataset_rows_table(dataset, version)
dataset = self.metastore.remove_dataset_version(dataset, version)

def _remove_version_wipe_metadata(
self, dataset: DatasetRecord, version: str, expected_status: int
) -> None:
"""Drop rows table and delete the version row entirely."""
if not self._try_claim_transition(
dataset,
version,
target_status=DatasetStatus.REMOVING_TOTAL,
expected_status=expected_status,
):
logger.debug(
"Skipped remove of %s@%s: another caller is already handling it",
dataset.name,
version,
)
return

self.warehouse.drop_dataset_rows_table(dataset, version)
self.metastore.remove_dataset_version(dataset, version)

def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int:
Comment thread
shcheklein marked this conversation as resolved.
Outdated
num_removed = 0
Expand Down Expand Up @@ -1151,7 +1286,7 @@ def cleanup_dataset_versions(self, job_id: str | None = None) -> int:
Clean up dataset versions that are no longer needed.

Removes dataset versions that:
- Have status CREATED, FAILED, STALE, or REMOVING
- Have status CREATED, FAILED, STALE, REMOVING, or REMOVING_TOTAL
- Belong to completed/failed/canceled jobs (not running)
- Are session_* datasets from finished jobs (orphaned intermediates)

Expand Down Expand Up @@ -1406,6 +1541,7 @@ def get_dataset_dependencies(
namespace_name: str | None = None,
project_name: str | None = None,
indirect=False,
include_removed: bool = True,
) -> list[DatasetDependency | None]:
dataset = self.get_dataset(
name,
Expand All @@ -1422,6 +1558,7 @@ def get_dataset_dependencies(
return self.metastore.get_direct_dataset_dependencies(
dataset,
version,
include_removed=include_removed,
)

return self.get_dataset_dependencies_by_ids(
Expand Down Expand Up @@ -1603,6 +1740,7 @@ def remove_dataset(
project: Project | None = None,
version: str | None = None,
force: bool | None = False,
keep_metadata: bool = True,
):
dataset = self.get_dataset(
name,
Expand All @@ -1618,14 +1756,15 @@ def remove_dataset(
)

if version:
self.remove_dataset_version(dataset, version)
self.remove_dataset_version(dataset, version, keep_metadata=keep_metadata)
return

for v in dataset.versions:
Comment thread
ilongin marked this conversation as resolved.
Outdated
version = v.version
self.remove_dataset_version(
dataset,
version,
keep_metadata=keep_metadata,
)

def edit_dataset(
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def handle_dataset_command(args, catalog):
args.name,
version=args.version,
force=args.force,
keep_metadata=args.keep_metadata,
studio=args.studio,
team=args.team,
),
Expand All @@ -204,6 +205,7 @@ def handle_dataset_command(args, catalog):
args.name,
version=args.version,
force=args.force,
keep_metadata=args.keep_metadata,
studio=args.studio,
team=args.team,
),
Expand Down
9 changes: 8 additions & 1 deletion src/datachain/cli/commands/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def rm_dataset(
name: str,
version: str | None = None,
force: bool | None = False,
keep_metadata: bool = True,
studio: bool | None = False,
team: str | None = None,
) -> None:
Expand All @@ -178,7 +179,13 @@ def rm_dataset(
else:
try:
project = catalog.metastore.get_project(project_name, namespace_name)
catalog.remove_dataset(name, project, version=version, force=force)
catalog.remove_dataset(
name,
project,
version=version,
force=force,
keep_metadata=keep_metadata,
)
except DatasetNotFoundError:
print("Dataset not found in local", file=sys.stderr)

Expand Down
11 changes: 11 additions & 0 deletions src/datachain/cli/parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,17 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
action=BooleanOptionalAction,
help="Force delete registered dataset with all of its versions",
)
rm_dataset_parser.add_argument(
"--keep-metadata",
default=True,
action=BooleanOptionalAction,
help=(
"Keep version metadata after removal so the semver stays "
"reserved and dependents can still resolve lineage. "
"Pass --no-keep-metadata to fully wipe the version "
"(rows, dependencies, and version row)."
),
)
rm_dataset_parser.add_argument(
"--studio",
action="store_true",
Expand Down
Loading
Loading