-
Notifications
You must be signed in to change notification settings - Fork 147
Ilongin/12872 dataset soft delete #1770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 45 commits
73e3c77
dc34aa7
b0dfe90
d88917b
a7f0b15
236af57
7737683
425ac01
e71555e
19c45b1
9a57d47
5bd2d5c
4caecd8
ed6c6b2
280ab68
c1945ab
9271ce3
5f92614
c478b11
090b06e
ad7092a
c0e51a4
445b4dd
c908d4b
9b3c9c0
f00a5ad
7100869
0bad54e
92e92ea
651ccb5
6393f99
4683de8
0c21ad1
0ada7dd
08cb6de
ce3e272
85bf48f
96a8779
da3b1ca
80fcd45
27455ea
06db87c
bccfeed
301810a
d19f0fd
845cdb6
822328f
52d557e
5c9461d
cdace00
c991878
4874900
2d4b9e6
99833bd
40e6152
664d9ab
fac7aee
f22bfc3
352822e
5cbe55b
fe1a214
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,7 +48,7 @@ | |
| NamespaceNotFoundError, | ||
| ProjectNotFoundError, | ||
| ) | ||
| from datachain.lib.listing import get_listing | ||
| from datachain.lib.listing import get_listing, is_listing_dataset | ||
| from datachain.node import DirType, Node, NodeWithPath | ||
| from datachain.nodes_thread_pool import NodesThreadPool | ||
| from datachain.progress import tqdm | ||
|
|
@@ -1079,14 +1079,25 @@ def complete_dataset_version( | |
| as COMPLETE. | ||
| """ | ||
| self.update_dataset_version_with_warehouse_info(dataset, version, **kwargs) | ||
| self.metastore.update_dataset_status( | ||
| dataset, | ||
| DatasetStatus.COMPLETE, | ||
| version=version, | ||
| error_message=error_message, | ||
| error_stack=error_stack, | ||
| script_output=script_output, | ||
| ) | ||
| # Guard the version-level write: only flip to COMPLETE if the | ||
| # version is still in a saveable state. Prevents a late-arriving | ||
| # completion from stomping a concurrent removal. | ||
| try: | ||
| self.metastore.update_dataset_status( | ||
| dataset, | ||
| DatasetStatus.COMPLETE, | ||
| version=version, | ||
| error_message=error_message, | ||
| error_stack=error_stack, | ||
| script_output=script_output, | ||
| expected_status=DatasetStatus.CREATED, | ||
| ) | ||
| except DataChainError as e: | ||
| raise DataChainError( | ||
| f"Could not finalize {dataset.name}@{version}: " | ||
| "the version was removed or modified before save completed. " | ||
| "This usually means it was deleted concurrently - please retry." | ||
| ) from e | ||
|
|
||
| def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: | ||
| """Updates dataset fields.""" | ||
|
|
@@ -1095,26 +1106,117 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: | |
| return dataset_updated | ||
|
|
||
| def remove_dataset_version( | ||
| self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True | ||
| self, | ||
| dataset: DatasetRecord, | ||
| version: str, | ||
| keep_metadata: bool, | ||
| ) -> None: | ||
|
shcheklein marked this conversation as resolved.
|
||
| """ | ||
| Deletes one single dataset version. | ||
| If it was last version, it removes dataset completely. | ||
| """Remove a single dataset version. | ||
|
|
||
| ``keep_metadata=True``: drop rows table, mark REMOVED (semver + | ||
| lineage kept). Requires user-facing dataset (not ``lst__*`` / | ||
| ``session_*``) and status in COMPLETE/REMOVING/REMOVED. | ||
|
|
||
| ``keep_metadata=False``: drop rows table, delete the version row. | ||
| Allowed from any status except REMOVING. | ||
| """ | ||
| if not dataset.has_version(version): | ||
| return | ||
| v = dataset.get_version(version) | ||
|
|
||
| if keep_metadata and dataset.is_internal: | ||
| raise DataChainError( | ||
| f"Internal dataset {dataset.name} cannot be removed " | ||
| "while keeping metadata" | ||
| ) | ||
| if keep_metadata and not v.is_soft_deletable: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call it is_internal? or even is_system? is soft deletable again is not reusable - we are just leaking removal business logic outside
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was it addressed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I completely removed this method as it's not really needed so no need for figuring out naming. I agree that soft delete should not be used anywhere but not sure what is substitute for that to be honest .. |
||
| raise DataChainError( | ||
| f"Cannot remove {dataset.name}@{version} while keeping " | ||
| f"metadata: current status is {v.status}, expected " | ||
| "COMPLETE or REMOVING" | ||
| ) | ||
| if ( | ||
| not keep_metadata | ||
| and v.status == DatasetStatus.REMOVING | ||
|
shcheklein marked this conversation as resolved.
|
||
| and not dataset.is_internal | ||
| ): | ||
| raise DataChainError( | ||
| f"Cannot remove {dataset.name}@{version} entirely: " | ||
| "a removal that keeps metadata is already in progress" | ||
| ) | ||
|
|
||
| if keep_metadata: | ||
| self._remove_version_keep_metadata(dataset, version, v.status) | ||
| else: | ||
| self._remove_version_wipe_metadata(dataset, version, v.status) | ||
|
|
||
| def _remove_version_keep_metadata( | ||
| self, dataset: DatasetRecord, version: str, expected_status: int | ||
| ) -> None: | ||
| """Drop rows table, mark version REMOVED (keeps semver + lineage).""" | ||
| if expected_status == DatasetStatus.REMOVED: | ||
| return | ||
| claimed = self.metastore.update_dataset_version( | ||
| dataset, | ||
| version, | ||
| expected_status=expected_status, | ||
| status=DatasetStatus.REMOVING, | ||
| ) | ||
| if claimed is None: | ||
| logger.debug( | ||
| "Skipped remove of %s@%s: another caller is already handling it", | ||
| dataset.name, | ||
| version, | ||
| ) | ||
| return | ||
|
|
||
| self.warehouse.drop_dataset_rows_table(dataset, version) | ||
| self.metastore.update_dataset_version( | ||
| dataset, version, status=DatasetStatus.REMOVING | ||
| dataset, | ||
| version, | ||
| status=DatasetStatus.REMOVED, | ||
| removed_at=datetime.now(timezone.utc), | ||
| ) | ||
| if drop_rows: | ||
| self.warehouse.drop_dataset_rows_table(dataset, version) | ||
| dataset = self.metastore.remove_dataset_version(dataset, version) | ||
|
|
||
| def _remove_version_wipe_metadata( | ||
| self, dataset: DatasetRecord, version: str, expected_status: int | ||
| ) -> None: | ||
| """Drop rows table and delete the version row entirely.""" | ||
| claimed = self.metastore.update_dataset_version( | ||
| dataset, | ||
| version, | ||
| expected_status=expected_status, | ||
| status=DatasetStatus.REMOVING_TOTAL, | ||
| ) | ||
| if claimed is None: | ||
| logger.debug( | ||
| "Skipped remove of %s@%s: another caller is already handling it", | ||
| dataset.name, | ||
| version, | ||
| ) | ||
| return | ||
|
|
||
| self.warehouse.drop_dataset_rows_table(dataset, version) | ||
| self.metastore.remove_dataset_version(dataset, version) | ||
|
|
||
| def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int: | ||
|
shcheklein marked this conversation as resolved.
Outdated
|
||
| """Bulk remove orphaned versions (GC, session cleanup, CLI cleanup, | ||
| job cleanup). Infers ``keep_metadata`` per version from status: | ||
| resume soft delete if REMOVING, else wipe. Explicit single-version | ||
| removes go through ``remove_dataset_version`` directly. | ||
| """ | ||
| num_removed = 0 | ||
| for dataset, version in pairs: | ||
| try: | ||
| self.remove_dataset_version(dataset, version) | ||
| v = dataset.get_version(version) | ||
| if v.status == DatasetStatus.REMOVED: | ||
| continue | ||
| keep_metadata = ( | ||
| not dataset.is_internal and v.status == DatasetStatus.REMOVING | ||
| ) | ||
| self.remove_dataset_version( | ||
| dataset, version, keep_metadata=keep_metadata | ||
| ) | ||
| num_removed += 1 | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning( | ||
|
|
@@ -1151,7 +1253,7 @@ def cleanup_dataset_versions(self, job_id: str | None = None) -> int: | |
| Clean up dataset versions that are no longer needed. | ||
|
|
||
| Removes dataset versions that: | ||
| - Have status CREATED, FAILED, STALE, or REMOVING | ||
| - Have status CREATED, FAILED, STALE, REMOVING, or REMOVING_TOTAL | ||
| - Belong to completed/failed/canceled jobs (not running) | ||
| - Are session_* datasets from finished jobs (orphaned intermediates) | ||
|
|
||
|
|
@@ -1263,7 +1365,6 @@ def get_dataset( | |
| include_incomplete: bool = True, | ||
| include_preview: bool = False, | ||
| ) -> DatasetRecord: | ||
| from datachain.lib.listing import is_listing_dataset | ||
|
|
||
| namespace_name = namespace_name or self.metastore.default_namespace_name | ||
| project_name = project_name or self.metastore.default_project_name | ||
|
|
@@ -1406,6 +1507,7 @@ def get_dataset_dependencies( | |
| namespace_name: str | None = None, | ||
| project_name: str | None = None, | ||
| indirect=False, | ||
| include_removed: bool = True, | ||
| ) -> list[DatasetDependency | None]: | ||
| dataset = self.get_dataset( | ||
| name, | ||
|
|
@@ -1422,6 +1524,7 @@ def get_dataset_dependencies( | |
| return self.metastore.get_direct_dataset_dependencies( | ||
| dataset, | ||
| version, | ||
| include_removed=include_removed, | ||
| ) | ||
|
|
||
| return self.get_dataset_dependencies_by_ids( | ||
|
|
@@ -1508,7 +1611,7 @@ def listings(self, prefix: str | None = None) -> list["ListingInfo"]: | |
| Returns list of ListingInfo objects which are representing specific | ||
| storage listing datasets | ||
| """ | ||
| from datachain.lib.listing import LISTING_PREFIX, is_listing_dataset | ||
| from datachain.lib.listing import LISTING_PREFIX | ||
| from datachain.lib.listing_info import ListingInfo | ||
|
|
||
| if prefix and not prefix.startswith(LISTING_PREFIX): | ||
|
|
@@ -1603,6 +1706,7 @@ def remove_dataset( | |
| project: Project | None = None, | ||
| version: str | None = None, | ||
| force: bool | None = False, | ||
| keep_metadata: bool = True, | ||
| ): | ||
| dataset = self.get_dataset( | ||
| name, | ||
|
|
@@ -1618,14 +1722,28 @@ def remove_dataset( | |
| ) | ||
|
|
||
| if version: | ||
| self.remove_dataset_version(dataset, version) | ||
| self.remove_dataset_version(dataset, version, keep_metadata=keep_metadata) | ||
| return | ||
|
|
||
| if keep_metadata: | ||
| bad = [v.version for v in dataset.versions if not v.is_soft_deletable] | ||
| if dataset.is_internal or bad: | ||
| reason = ( | ||
| "internal datasets must be fully removed" | ||
| if dataset.is_internal | ||
| else f"versions {bad} are not in a soft-deletable state" | ||
| ) | ||
| raise DataChainError( | ||
| f"Cannot remove dataset {name} while keeping metadata: " | ||
| f"{reason}. Use keep_metadata=False, or remove eligible " | ||
| "versions individually." | ||
| ) | ||
|
|
||
| for v in dataset.versions: | ||
|
ilongin marked this conversation as resolved.
Outdated
|
||
| version = v.version | ||
| self.remove_dataset_version( | ||
| dataset, | ||
| version, | ||
| v.version, | ||
| keep_metadata=keep_metadata, | ||
| ) | ||
|
|
||
| def edit_dataset( | ||
|
|
@@ -1797,7 +1915,7 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912 | |
| return | ||
|
|
||
| print("Cleaning up stale existing dataset version") | ||
| self.remove_dataset_version(ds, ver.version) | ||
| self.remove_dataset_version(ds, ver.version, keep_metadata=False) | ||
| except DatasetNotFoundError: | ||
| pass | ||
|
|
||
|
|
@@ -1839,7 +1957,9 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912 | |
| "Cleaning up stale incomplete version " | ||
| f"(uuid={local_ver.uuid})" | ||
| ) | ||
| self.remove_dataset_version(local_dataset, local_ds_version) | ||
| self.remove_dataset_version( | ||
| local_dataset, local_ds_version, keep_metadata=False | ||
| ) | ||
| else: | ||
| raise DataChainError( | ||
| f"Local dataset {local_ds_uri} already exists with" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.