diff --git a/src/datachain/catalog/catalog.py b/src/datachain/catalog/catalog.py index f8708ae78..cc1e73cda 100644 --- a/src/datachain/catalog/catalog.py +++ b/src/datachain/catalog/catalog.py @@ -48,7 +48,7 @@ NamespaceNotFoundError, ProjectNotFoundError, ) -from datachain.lib.listing import get_listing +from datachain.lib.listing import get_listing, is_listing_dataset from datachain.node import DirType, Node, NodeWithPath from datachain.nodes_thread_pool import NodesThreadPool from datachain.progress import tqdm @@ -1083,14 +1083,25 @@ def complete_dataset_version( as COMPLETE. """ self.update_dataset_version_with_warehouse_info(dataset, version, **kwargs) - self.metastore.update_dataset_status( - dataset, - DatasetStatus.COMPLETE, - version=version, - error_message=error_message, - error_stack=error_stack, - script_output=script_output, - ) + # Guard the version-level write: only flip to COMPLETE if the + # version is still in a saveable state. Prevents a late-arriving + # completion from stomping a concurrent removal. + try: + self.metastore.update_dataset_status( + dataset, + DatasetStatus.COMPLETE, + version=version, + error_message=error_message, + error_stack=error_stack, + script_output=script_output, + expected_status=DatasetStatus.CREATED, + ) + except DataChainError as e: + raise DataChainError( + f"Could not finalize {dataset.name}@{version}: " + "the version was removed or modified before save completed. " + "This usually means it was deleted concurrently - please retry." + ) from e def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: """Updates dataset fields.""" @@ -1099,26 +1110,124 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: return dataset_updated def remove_dataset_version( - self, dataset: DatasetRecord, version: str, drop_rows: bool | None = True + self, + dataset: DatasetRecord, + version: str, + keep_metadata: bool, ) -> None: + """Remove a single dataset version. + + ``keep_metadata=True``: drop rows table, mark REMOVED (semver + + lineage kept). Requires user-facing dataset (not ``lst__*`` / + ``session_*``) and status in COMPLETE/REMOVING/REMOVED. + + ``keep_metadata=False``: drop rows table, delete the version row. + Allowed from any status except REMOVING. """ - Deletes one single dataset version. - If it was last version, it removes dataset completely. + try: + v = dataset.get_version(version) + except DatasetVersionNotFoundError: + return + + if keep_metadata and dataset.is_internal: + raise DataChainError( + f"Internal dataset {dataset.name} cannot be removed " + "while keeping metadata" + ) + if keep_metadata and v.status not in ( + DatasetStatus.COMPLETE, + DatasetStatus.REMOVING, + DatasetStatus.REMOVED, + ): + raise DataChainError( + f"Cannot remove {dataset.name}@{version} while keeping " + f"metadata: current status is {v.status}, expected " + "COMPLETE or REMOVING" + ) + if ( + not keep_metadata + and v.status == DatasetStatus.REMOVING + and not dataset.is_internal + ): + raise DataChainError( + f"Cannot remove {dataset.name}@{version} entirely: " + "a removal that keeps metadata is already in progress" + ) + + self._claim_and_remove( + dataset, version, expected_status=v.status, keep_metadata=keep_metadata + ) + + def _claim_and_remove( + self, + dataset: DatasetRecord, + version: str, + *, + expected_status: int, + keep_metadata: bool, + ) -> None: + """Claim the version with a transient status, drop its rows table, + then finalize. ``keep_metadata=True`` finalizes as a REMOVED tombstone + (semver + lineage preserved); ``False`` deletes the version row. + A REMOVED-to-REMOVED keep is a no-op since the tombstone is already + the final state. """ - if not dataset.has_version(version): + if keep_metadata and expected_status == DatasetStatus.REMOVED: return - self.metastore.update_dataset_version( - dataset, version, status=DatasetStatus.REMOVING + + transient = ( + DatasetStatus.REMOVING if keep_metadata else DatasetStatus.REMOVING_TOTAL ) - if drop_rows: - self.warehouse.drop_dataset_rows_table(dataset, version) - dataset = self.metastore.remove_dataset_version(dataset, version) + claimed = self.metastore.update_dataset_version( + dataset, + version, + expected_status=expected_status, + status=transient, + ) + if claimed is None: + logger.debug( + "Skipped remove of %s@%s: another caller is already handling it", + dataset.name, + version, + ) + return + + self.warehouse.drop_dataset_rows_table(dataset, version) - def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int: + if keep_metadata: + self.metastore.update_dataset_version( + dataset, + version, + status=DatasetStatus.REMOVED, + removed_at=datetime.now(timezone.utc), + ) + else: + self.metastore.remove_dataset_version(dataset, version) + + def _remove_versions( + self, + pairs: Iterable[tuple[DatasetRecord, str]], + *, + keep_metadata: bool | None = None, + ) -> int: + """Bulk remove versions (GC, session cleanup, CLI cleanup, job cleanup, + user-facing bulk delete). When ``keep_metadata`` is None, infers per + version: resume soft delete if REMOVING, else wipe. When given + explicitly, honors the caller's intent for every version. + """ num_removed = 0 for dataset, version in pairs: try: - self.remove_dataset_version(dataset, version) + v = dataset.get_version(version) + if v.status == DatasetStatus.REMOVED: + continue + if keep_metadata is None: + keep = ( + not dataset.is_internal and v.status == DatasetStatus.REMOVING + ) + else: + keep = keep_metadata + self.remove_dataset_version(dataset, version, keep_metadata=keep) num_removed += 1 except Exception as e: # noqa: BLE001 logger.warning( @@ -1130,13 +1239,17 @@ def _remove_versions(self, pairs: Iterable[tuple[DatasetRecord, str]]) -> int: return num_removed def remove_dataset_versions( - self, job_id: str | None = None, version_ids: list[int] | None = None + self, + job_id: str | None = None, + version_ids: list[int] | None = None, + *, + keep_metadata: bool | None = None, ) -> int: versions_to_remove = self.metastore.get_dataset_versions( job_id=job_id, version_ids=version_ids, ) - return self._remove_versions(versions_to_remove) + return self._remove_versions(versions_to_remove, keep_metadata=keep_metadata) def get_temp_table_names(self) -> list[str]: return self.warehouse.get_temp_table_names() @@ -1155,7 +1268,7 @@ def cleanup_dataset_versions(self, job_id: str | None = None) -> int: Clean up dataset versions that are no longer needed. Removes dataset versions that: - - Have status CREATED, FAILED, STALE, or REMOVING + - Have status CREATED, FAILED, STALE, REMOVING, or REMOVING_TOTAL - Belong to completed/failed/canceled jobs (not running) - Are session_* datasets from finished jobs (orphaned intermediates) @@ -1267,7 +1380,6 @@ def get_dataset( include_incomplete: bool = True, include_preview: bool = False, ) -> DatasetRecord: - from datachain.lib.listing import is_listing_dataset namespace_name = namespace_name or self.metastore.default_namespace_name project_name = project_name or self.metastore.default_project_name @@ -1423,10 +1535,7 @@ def get_dataset_dependencies( dataset_version_id = dataset_version.id if not indirect: - return self.metastore.get_direct_dataset_dependencies( - dataset, - version, - ) + return self.metastore.get_direct_dataset_dependencies(dataset, version) return self.get_dataset_dependencies_by_ids( dataset_id, @@ -1512,7 +1621,7 @@ def listings(self, prefix: str | None = None) -> list["ListingInfo"]: Returns list of ListingInfo objects which are representing specific storage listing datasets """ - from datachain.lib.listing import LISTING_PREFIX, is_listing_dataset + from datachain.lib.listing import LISTING_PREFIX from datachain.lib.listing_info import ListingInfo if prefix and not prefix.startswith(LISTING_PREFIX): @@ -1590,6 +1699,7 @@ def export_dataset_table( namespace_name=project.namespace.name if project else None, project_name=project.name if project else None, versions=[version], + include_incomplete=False, ) self.warehouse.export_dataset_table( @@ -1607,6 +1717,7 @@ def remove_dataset( project: Project | None = None, version: str | None = None, force: bool | None = False, + keep_metadata: bool = True, ): dataset = self.get_dataset( name, @@ -1621,15 +1732,26 @@ def remove_dataset( f"Dataset {name} doesn't have version {version}" ) - if version: - self.remove_dataset_version(dataset, version) - return - - for v in dataset.versions: - version = v.version + versions = [version] if version else [v.version for v in dataset.versions] + for ver in versions: + v = dataset.get_version(ver) + # keep_metadata only has meaning for user-facing datasets whose + # version reached COMPLETE; elsewhere there's no semver/lineage + # to preserve, so downgrade to wipe transparently. + effective_keep = ( + keep_metadata + and not dataset.is_internal + and v.status + in ( + DatasetStatus.COMPLETE, + DatasetStatus.REMOVING, + DatasetStatus.REMOVED, + ) + ) self.remove_dataset_version( dataset, - version, + ver, + keep_metadata=effective_keep, ) def edit_dataset( @@ -1801,7 +1923,7 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912 return print("Cleaning up stale existing dataset version") - self.remove_dataset_version(ds, ver.version) + self.remove_dataset_version(ds, ver.version, keep_metadata=False) except DatasetNotFoundError: pass @@ -1836,6 +1958,13 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912 ) if local_dataset.has_version(local_ds_version): local_ver = local_dataset.get_version(local_ds_version) + if local_ver.is_removed: + raise DataChainError( + f"Local dataset {local_ds_uri} was removed; " + "the version number is reserved. Pull into a " + "different version or remove the tombstone " + "explicitly first." + ) if local_ver.status != DatasetStatus.COMPLETE: # Stale incomplete version from a different UUID — # clean it up so this pull can proceed. @@ -1843,7 +1972,9 @@ def pull_dataset( # noqa: C901, PLR0915, PLR0912 "Cleaning up stale incomplete version " f"(uuid={local_ver.uuid})" ) - self.remove_dataset_version(local_dataset, local_ds_version) + self.remove_dataset_version( + local_dataset, local_ds_version, keep_metadata=False + ) else: raise DataChainError( f"Local dataset {local_ds_uri} already exists with" diff --git a/src/datachain/catalog/dependency.py b/src/datachain/catalog/dependency.py index 39c5c7375..985a0e0c1 100644 --- a/src/datachain/catalog/dependency.py +++ b/src/datachain/catalog/dependency.py @@ -21,6 +21,7 @@ class DatasetDependencyNode: source_dataset_id: int source_dataset_version_id: int | None depth: int + dataset_version_status: int | None = None @classmethod def parse( @@ -36,6 +37,7 @@ def parse( source_dataset_id: int, source_dataset_version_id: int | None, depth: int, + dataset_version_status: int | None = None, ) -> "DatasetDependencyNode | None": return cls( namespace, @@ -49,6 +51,7 @@ def parse( source_dataset_id, source_dataset_version_id, depth, + dataset_version_status, ) def to_dependency(self) -> "DatasetDependency | None": @@ -61,6 +64,7 @@ def to_dependency(self) -> "DatasetDependency | None": dataset_name=self.dataset_name, dataset_version=self.dataset_version, dataset_version_created_at=self.created_at, + dataset_version_status=self.dataset_version_status, ) diff --git a/src/datachain/data_storage/metastore.py b/src/datachain/data_storage/metastore.py index 1abd99007..255c4c190 100644 --- a/src/datachain/data_storage/metastore.py +++ b/src/datachain/data_storage/metastore.py @@ -319,9 +319,16 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: @abstractmethod def update_dataset_version( - self, dataset: DatasetRecord, version: str, **kwargs - ) -> DatasetVersion: - """Updates dataset version fields.""" + self, + dataset: DatasetRecord, + version: str, + *, + expected_status: int | None = None, + **kwargs, + ) -> DatasetVersion | None: + """Updates dataset version fields. When ``expected_status`` is given + the UPDATE only applies if the row's current status equals that value; + returns None if no row matched.""" @abstractmethod def remove_dataset_version( @@ -347,7 +354,8 @@ def get_dataset_versions_to_clean( - the associated job has finished, or - there is no associated job (job_id is NULL) and the version is older than STALE_CREATED_THRESHOLD_HOURS - - Status REMOVING: marked for deletion + - Status REMOVING / REMOVING_TOTAL: removal in progress + (GC resumes to REMOVED or to full row deletion respectively) Returns: List of (DatasetRecord, version_string) tuples. Each DatasetRecord @@ -433,8 +441,12 @@ def update_dataset_status( error_message="", error_stack="", script_output="", + expected_status: int | None = None, ) -> DatasetRecord: - """Updates dataset status and appropriate fields related to status.""" + """Updates dataset status and appropriate fields related to status. + When ``expected_status`` is given the version-level UPDATE is + guarded by ``status = :expected``; if no version row matches, raises + :class:`DataChainError` before touching the dataset-level row.""" @abstractmethod def mark_job_dataset_versions_as_failed(self, job_id: str) -> None: @@ -474,9 +486,13 @@ def update_dataset_dependency_source( @abstractmethod def get_direct_dataset_dependencies( - self, dataset: DatasetRecord, version: str + self, + dataset: DatasetRecord, + version: str, ) -> list[DatasetDependency | None]: - """Gets direct dataset dependencies.""" + """Gets direct dataset dependencies. Each returned ``DatasetDependency`` + carries a ``removed`` flag so callers can filter or badge tombstoned + deps without a separate query.""" @abstractmethod def get_dataset_dependency_nodes( @@ -848,6 +864,7 @@ def _datasets_versions_columns(cls) -> list["SchemaItem"]: Column("schema", JSON, nullable=True), Column("job_id", Text, nullable=True), Column("content_hash", Text, nullable=True), + Column("removed_at", DateTime(timezone=True), nullable=True), UniqueConstraint("dataset_id", "version"), ] @@ -1394,9 +1411,14 @@ def update_dataset(self, dataset: DatasetRecord, **kwargs) -> DatasetRecord: return result_ds def update_dataset_version( - self, dataset: DatasetRecord, version: str, **kwargs - ) -> DatasetVersion: - """Updates dataset fields.""" + self, + dataset: DatasetRecord, + version: str, + *, + expected_status: int | None = None, + **kwargs, + ) -> DatasetVersion | None: + """Updates dataset version fields.""" logger.debug( "Metastore.update_dataset_version called for %s@%s: " "num_objects=%s, size=%s, preview_len=%s, all_fields=%s", @@ -1453,10 +1475,10 @@ def update_dataset_version( values[field] = value version_values[field] = value - dataset_version = dataset.get_version(version) + version_obj = dataset.get_version(version) if not values: - return dataset_version + return version_obj logger.debug( "Writing to database for %s@%s: num_objects=%s, size=%s, " @@ -1470,23 +1492,26 @@ def update_dataset_version( ) dv = self._datasets_versions - self.db.execute( - self._datasets_versions_update() - .where(dv.c.dataset_id == dataset.id, dv.c.version == version) - .values(values), - ) # type: ignore [attr-defined] + update_stmt = self._datasets_versions_update().where( + dv.c.dataset_id == dataset.id, dv.c.version == version + ) + if expected_status is not None: + update_stmt = update_stmt.where(dv.c.status == expected_status) + result = self.db.execute(update_stmt.values(values)) # type: ignore [attr-defined] + if expected_status is not None and result.rowcount == 0: # type: ignore [attr-defined] + return None - dataset_version.update(**version_values) + version_obj.update(**version_values) logger.debug( "Dataset version updated successfully: %s@%s, " "final_num_objects=%s, final_size=%s, has_preview=%s", dataset.name, version, - dataset_version.num_objects, - dataset_version.size, - bool(getattr(dataset_version, "_preview_data", None)), + version_obj.num_objects, + version_obj.size, + bool(getattr(version_obj, "_preview_data", None)), ) - return dataset_version + return version_obj def _parse_dataset( self, @@ -1771,8 +1796,17 @@ def remove_dataset_version( ) ) - if dataset.versions and len(dataset.versions) == 1: - # had only one version, fully deleting dataset + # Count in DB: in-memory dataset.versions may hold only this + # version (GC fetches versions one-by-one), so its length is + # unreliable. + remaining = next( + self.db.execute( + select(f.count()) + .select_from(dv) + .where(dv.c.dataset_id == dataset.id) + ) + )[0] + if remaining == 0: self.db.execute(self._datasets_delete().where(d.c.id == dataset.id)) dataset.remove_version(version) @@ -1839,6 +1873,7 @@ def get_dataset_versions_to_clean( DatasetStatus.FAILED, DatasetStatus.STALE, DatasetStatus.REMOVING, + DatasetStatus.REMOVING_TOTAL, ] ), or_( @@ -1903,6 +1938,7 @@ def update_dataset_status( error_message="", error_stack="", script_output="", + expected_status: int | None = None, ) -> DatasetRecord: """ Updates dataset status and appropriate fields related to status @@ -1919,29 +1955,43 @@ def update_dataset_status( update_data["error_message"] = error_message update_data["error_stack"] = error_stack - dataset = self.update_dataset(dataset, **update_data) - - if version: - self.update_dataset_version(dataset, version, **update_data) + with self.db.transaction(): + if version: + # Update the version row first. If a status guard was requested + # and the row's status no longer matches, abort before touching + # the dataset-level (denormalized) row. + updated = self.update_dataset_version( + dataset, + version, + expected_status=expected_status, + **update_data, + ) + if expected_status is not None and updated is None: + raise DataChainError( + f"Could not update status of {dataset.name}@{version}: " + f"current status is not {expected_status}" + ) - return dataset + return self.update_dataset(dataset, **update_data) def mark_job_dataset_versions_as_failed(self, job_id: str) -> None: """ - Mark all non-COMPLETE dataset versions created by a job as FAILED. + Finalize dataset versions still in CREATED for a failed job as FAILED. - This is called when a job fails to ensure that any dataset versions - it was creating are marked as failed rather than left in CREATED state. + Only flips the in-flight CREATED state. Terminal and removal states + (COMPLETE, FAILED, REMOVING, REMOVED, REMOVING_TOTAL) must not be + overwritten - otherwise tombstones from a user-issued soft delete + inside the failing job would be resurrected as FAILED and then + wiped by GC. Args: job_id: ID of the failed job whose dataset versions should be marked """ dv = self._datasets_versions - # Update status to FAILED for all non-COMPLETE versions with this job_id update_stmt = ( dv.update() - .where((dv.c.job_id == job_id) & (dv.c.status != DatasetStatus.COMPLETE)) + .where((dv.c.job_id == job_id) & (dv.c.status == DatasetStatus.CREATED)) .values( status=DatasetStatus.FAILED, finished_at=datetime.now(timezone.utc), @@ -2019,7 +2069,9 @@ def _dataset_dependency_nodes_select_columns( """ def get_direct_dataset_dependencies( - self, dataset: DatasetRecord, version: str + self, + dataset: DatasetRecord, + version: str, ) -> list[DatasetDependency | None]: n = self._namespaces p = self._projects @@ -2031,6 +2083,10 @@ def get_direct_dataset_dependencies( select_cols = self._dataset_dependencies_select_columns() + where_clause = (dd.c.source_dataset_id == dataset.id) & ( + dd.c.source_dataset_version_id == dataset_version.id + ) + query = ( self._datasets_dependencies_select(*select_cols) .select_from( @@ -2039,10 +2095,7 @@ def get_direct_dataset_dependencies( .join(p, d.c.project_id == p.c.id, isouter=True) .join(n, p.c.namespace_id == n.c.id, isouter=True) ) - .where( - (dd.c.source_dataset_id == dataset.id) - & (dd.c.source_dataset_version_id == dataset_version.id) - ) + .where(where_clause) ) return [self.dependency_class.parse(*r) for r in self.db.execute(query)] @@ -2843,6 +2896,7 @@ def _get_dataset_version_for_job_ancestry_query( self._projects.c.name == project_name, self._dataset_version_jobs.c.job_id.in_(job_ancestry), self._dataset_version_jobs.c.is_creator.is_(True), + self._datasets_versions.c.status == DatasetStatus.COMPLETE, ) .order_by(desc(self._dataset_version_jobs.c.created_at)) .limit(1) diff --git a/src/datachain/data_storage/sqlite.py b/src/datachain/data_storage/sqlite.py index f5b8c64ee..6d357f57c 100644 --- a/src/datachain/data_storage/sqlite.py +++ b/src/datachain/data_storage/sqlite.py @@ -668,6 +668,7 @@ def _dataset_dependencies_select_columns(self) -> list["SchemaItem"]: self._datasets.c.name, self._datasets_versions.c.version, self._datasets_versions.c.created_at, + self._datasets_versions.c.status, ] def _dataset_dependency_nodes_select_columns( @@ -688,6 +689,7 @@ def _dataset_dependency_nodes_select_columns( dependency_tree_cte.c.source_dataset_id, dependency_tree_cte.c.source_dataset_version_id, dependency_tree_cte.c.depth, + self._datasets_versions.c.status, ] def _jobs_insert(self) -> "Insert": diff --git a/src/datachain/data_storage/warehouse.py b/src/datachain/data_storage/warehouse.py index 1dd1f724e..621237b53 100644 --- a/src/datachain/data_storage/warehouse.py +++ b/src/datachain/data_storage/warehouse.py @@ -469,6 +469,11 @@ def rename_dataset_tables( for version in [v.version for v in dataset_updated.versions]: if not dataset.has_version(version): continue + # Removed versions have no rows table (dropped at soft/hard delete + # time), so renaming would raise mid-loop and strand the rest of + # the dataset in a half-renamed state. + if dataset_updated.get_version(version).is_removed: + continue src = self.dataset_table_name(dataset, version) dest = self.dataset_table_name(dataset_updated, version) if src == dest: diff --git a/src/datachain/dataset.py b/src/datachain/dataset.py index cd825d054..fa4756ef0 100644 --- a/src/datachain/dataset.py +++ b/src/datachain/dataset.py @@ -31,6 +31,7 @@ DATASET_PREFIX = "ds://" QUERY_DATASET_PREFIX = "ds_query_" LISTING_PREFIX = "lst__" +SESSION_DATASET_PREFIX = "session_" DEFAULT_DATASET_VERSION = "1.0.0" DATASET_NAME_RESERVED_CHARS = [".", "@"] @@ -183,6 +184,7 @@ class DatasetDependency: version: str created_at: datetime dependencies: list["DatasetDependency | None"] + removed: bool = False def to_dict(self) -> dict[str, Any]: return { @@ -217,6 +219,7 @@ def parse( dataset_name: str | None, dataset_version: str | None, dataset_version_created_at: datetime | None, + dataset_version_status: int | None = None, ) -> "DatasetDependency | None": from datachain.lib.listing import is_listing_dataset @@ -235,9 +238,10 @@ def parse( namespace_name, project_name, dataset_name, - dataset_version or None, # type: ignore[arg-type] + dataset_version, # type: ignore[arg-type] dataset_version_created_at, # type: ignore[arg-type] [], + removed=dataset_version_status in REMOVED_STATUSES, ) @property @@ -264,7 +268,14 @@ class DatasetStatus: FAILED = 3 COMPLETE = 4 STALE = 6 - REMOVING = 7 + REMOVING = 7 # keep-metadata removal in progress; resumes to REMOVED + REMOVED = 8 + REMOVING_TOTAL = 9 # wipe in progress; resumes to row deletion + + +REMOVED_STATUSES: frozenset[int] = frozenset( + {DatasetStatus.REMOVING, DatasetStatus.REMOVED, DatasetStatus.REMOVING_TOTAL} +) @dataclass @@ -296,6 +307,7 @@ class DatasetVersion: query_script: str = "" job_id: str | None = None content_hash: str | None = None + removed_at: datetime | None = None @classmethod def parse( # noqa: PLR0913 @@ -319,6 +331,7 @@ def parse( # noqa: PLR0913 query_script: str = "", job_id: str | None = None, content_hash: str | None = None, + removed_at: datetime | None = None, *, preview_loaded: bool = True, ): @@ -347,6 +360,7 @@ def parse( # noqa: PLR0913 query_script=query_script, job_id=job_id, content_hash=content_hash, + removed_at=removed_at, _preview_loaded=preview_loaded, ) @@ -373,8 +387,17 @@ def is_final_status(self) -> bool: DatasetStatus.COMPLETE, DatasetStatus.STALE, DatasetStatus.REMOVING, + DatasetStatus.REMOVING_TOTAL, + DatasetStatus.REMOVED, ] + @property + def is_removed(self) -> bool: + """True if the version is in any removal state (in-flight or + terminal): REMOVING, REMOVED, REMOVING_TOTAL. The rows table is + gone or being dropped in all three cases.""" + return self.status in REMOVED_STATUSES + def update(self, **kwargs): for key, value in kwargs.items(): if hasattr(self, key): @@ -577,6 +600,7 @@ def parse( # noqa: PLR0913 version_schema: str | None = None, version_job_id: str | None = None, version_content_hash: str | None = None, + version_removed_at: datetime | None = None, *, versions_loaded: bool = True, preview_loaded: bool = True, @@ -635,6 +659,7 @@ def parse( # noqa: PLR0913 version_query_script or "", version_job_id, version_content_hash, + version_removed_at, preview_loaded=preview_loaded, ) versions_list = [dataset_version] @@ -664,6 +689,14 @@ def parse( # noqa: PLR0913 def full_name(self) -> str: return f"{self.project.namespace.name}.{self.project.name}.{self.name}" + @property + def is_internal(self) -> bool: + """True for non-user-facing datasets (listing ``lst__*`` and + session ``session_*`` intermediates).""" + return self.name.startswith(LISTING_PREFIX) or self.name.startswith( + SESSION_DATASET_PREFIX + ) + def get_schema(self, version: str) -> dict[str, SQLType | type[SQLType]]: return self.get_version(version).schema if version else self.schema @@ -678,15 +711,13 @@ def has_version(self, version: str) -> bool: def is_valid_next_version(self, version: str) -> bool: """ Checks if a number can be a valid next latest version for dataset. - The only rule is that it cannot be lower than current latest version + Compares against the highest version ever used, including REMOVED + ones — once a semver has been claimed it is permanently reserved. """ if not self.versions: return True - return not ( - self.latest_version - and semver.value(self.latest_version) >= semver.value(version) - ) + return self._max_version_value < semver.value(version) def get_version(self, version: str) -> DatasetVersion: if not self.has_version(version): @@ -733,42 +764,64 @@ def uri(self, version: str) -> str: def next_version_major(self) -> str: """ Returns the next auto-incremented version if the major part is being bumped. + Skips past REMOVED versions so deleted semvers are never reclaimed. """ if not self.versions: return "1.0.0" - major, _, _ = semver.parse(self.latest_version) + major, _, _ = semver.parse(self._max_version) return semver.create(major + 1, 0, 0) @property def next_version_minor(self) -> str: """ Returns the next auto-incremented version if the minor part is being bumped. + Skips past REMOVED versions so deleted semvers are never reclaimed. """ if not self.versions: return "1.0.0" - major, minor, _ = semver.parse(self.latest_version) + major, minor, _ = semver.parse(self._max_version) return semver.create(major, minor + 1, 0) @property def next_version_patch(self) -> str: """ Returns the next auto-incremented version if the patch part is being bumped. + Skips past REMOVED versions so deleted semvers are never reclaimed. """ if not self.versions: return "1.0.0" - major, minor, patch = semver.parse(self.latest_version) + major, minor, patch = semver.parse(self._max_version) return semver.create(major, minor, patch + 1) @property - def latest_version(self) -> str: - """Returns latest version of a dataset""" - if not self.versions: - raise DatasetVersionNotFoundError("Dataset has no versions") + def _live_versions(self) -> list[DatasetVersion]: + """Versions excluding REMOVED ones.""" + return [v for v in self.versions if v.status != DatasetStatus.REMOVED] + + @property + def _max_version(self) -> str: + """Highest semver across all versions including REMOVED ones. Used for + collision avoidance — once a semver is claimed it's reserved forever, + even after removal.""" return max(self.versions).version + @property + def _max_version_value(self) -> int: + return semver.value(self._max_version) + + @property + def latest_version(self) -> str: + """Latest non-REMOVED version.""" + live = self._live_versions + if not live: + raise DatasetVersionNotFoundError( + f"Dataset {self.name} has no live versions" + ) + return max(live).version + @property def latest_complete_version(self) -> str | None: """Returns latest COMPLETE version, or None if there isn't one.""" @@ -787,7 +840,9 @@ def latest_major_version(self, major: int) -> str | None: and we call `.latest_major_version(2)` it will return: "2.4.0". If no major version is find with input value, None will be returned """ - versions = [v for v in self.versions if semver.parse(v.version)[0] == major] + versions = [ + v for v in self._live_versions if semver.parse(v.version)[0] == major + ] if not versions: return None return max(versions).version @@ -815,7 +870,7 @@ def latest_compatible_version(self, version_spec: str) -> str | None: # Convert dataset versions to packaging.Version objects # and filter compatible ones compatible_versions = [] - for v in self.versions: + for v in self._live_versions: pkg_version = Version(v.version) if spec_set.contains(pkg_version): compatible_versions.append(v) @@ -950,7 +1005,12 @@ def full_name(self) -> str: return f"{self.project.namespace.name}.{self.project.name}.{self.name}" def latest_version(self) -> DatasetListVersion: - return max(self.versions, key=lambda v: v.version_value) + live = [v for v in self.versions if v.status != DatasetStatus.REMOVED] + if not live: + raise DatasetVersionNotFoundError( + f"Dataset {self.name} has no live versions" + ) + return max(live, key=lambda v: v.version_value) @property def is_bucket_listing(self) -> bool: diff --git a/src/datachain/delta.py b/src/datachain/delta.py index 2a48fec5e..ec8fcb359 100644 --- a/src/datachain/delta.py +++ b/src/datachain/delta.py @@ -393,13 +393,17 @@ def delta_retry_update( for source in delta_sources: source.resolve_listing() - dependencies = catalog.get_dataset_dependencies( - name, - latest_version, - namespace_name=namespace_name, - project_name=project_name, - indirect=False, - ) + dependencies: list[DatasetDependency | None] = [ + d + for d in catalog.get_dataset_dependencies( + name, + latest_version, + namespace_name=namespace_name, + project_name=project_name, + indirect=False, + ) + if d is None or not d.removed + ] latest_dataset = datachain.read_dataset( name, namespace=namespace_name, diff --git a/src/datachain/lib/dc/datasets.py b/src/datachain/lib/dc/datasets.py index ae1e1130a..48d0e3ce3 100644 --- a/src/datachain/lib/dc/datasets.py +++ b/src/datachain/lib/dc/datasets.py @@ -332,6 +332,9 @@ def delete_dataset( """Removes specific dataset version or all dataset versions, depending on a force flag. + The rows table is dropped but the version metadata is kept so the semver + stays reserved and dependents can still resolve lineage. + Args: name: The dataset name, which can be a fully qualified name including the namespace and project. Alternatively, it can be a regular name, in which diff --git a/src/datachain/query/dataset.py b/src/datachain/query/dataset.py index 017d3254b..2a04318a4 100644 --- a/src/datachain/query/dataset.py +++ b/src/datachain/query/dataset.py @@ -3498,7 +3498,7 @@ def _reuse_listing_if_unchanged( if old_fp != new_fp: return None - self.catalog.remove_dataset_version(full_dataset, version) + self.catalog.remove_dataset_version(full_dataset, version, keep_metadata=False) # updating TTL of a bucket listing self.catalog.metastore.update_dataset_version( full_dataset, prev_version, finished_at=datetime.now(timezone.utc) diff --git a/src/datachain/query/session.py b/src/datachain/query/session.py index ee3155607..3fa0c0f92 100644 --- a/src/datachain/query/session.py +++ b/src/datachain/query/session.py @@ -11,6 +11,7 @@ from datachain.catalog import get_catalog from datachain.data_storage import JobQueryType, JobStatus +from datachain.dataset import SESSION_DATASET_PREFIX from datachain.error import DataChainError, JobNotFoundError, TableMissingError if TYPE_CHECKING: @@ -67,7 +68,6 @@ class Session: _JOB_HOOKS_REGISTERED: ClassVar[bool] = False _JOB_FINALIZE_HOOK: ClassVar[Callable[[], None] | None] = None - DATASET_PREFIX = "session_" GLOBAL_SESSION_NAME = "global" SESSION_UUID_LEN = 6 TEMP_TABLE_UUID_LEN = 6 @@ -268,11 +268,11 @@ def generate_temp_dataset_name(self) -> str: return self.get_temp_prefix() + uuid4().hex[: self.TEMP_TABLE_UUID_LEN] def get_temp_prefix(self) -> str: - return f"{self.DATASET_PREFIX}{self.name}_" + return f"{SESSION_DATASET_PREFIX}{self.name}_" @classmethod def is_temp_dataset(cls, name) -> bool: - return name.startswith(cls.DATASET_PREFIX) + return name.startswith(SESSION_DATASET_PREFIX) def _cleanup_temp_datasets(self) -> None: prefix = self.get_temp_prefix() @@ -282,7 +282,10 @@ def _cleanup_temp_datasets(self) -> None: prefix, include_incomplete=True ) ): - self.catalog.remove_dataset(dataset.name, dataset.project, force=True) + # Session datasets are always wiped — never soft-deleted. + self.catalog.remove_dataset( + dataset.name, dataset.project, force=True, keep_metadata=False + ) # suppress error when metastore has been reset during testing except TableMissingError: pass diff --git a/tests/func/checkpoints/test_checkpoint_workflows.py b/tests/func/checkpoints/test_checkpoint_workflows.py index 544576cf1..43018c1a6 100644 --- a/tests/func/checkpoints/test_checkpoint_workflows.py +++ b/tests/func/checkpoints/test_checkpoint_workflows.py @@ -258,20 +258,20 @@ def test_checkpoint_with_deleted_dataset_version(test_session, nums_dataset): catalog.remove_dataset("nums_deleted", version="1.0.0", force=True) + # v1.0.0 is no longer visible via the user-facing read path with pytest.raises(DatasetNotFoundError): - catalog.get_dataset("nums_deleted") + catalog.get_dataset("nums_deleted", include_incomplete=False) # -------------- SECOND RUN: Checkpoint exists but version gone reset_session_job_state() chain.save("nums_deleted") job2_id = test_session.get_or_create_job().id - # Should create a NEW version since old one was deleted + # The REMOVED 1.0.0 slot is reserved forever — the new save auto-bumps to 1.0.1. dataset = catalog.get_dataset("nums_deleted", versions=None) - assert len(dataset.versions) == 1 - assert dataset.latest_version == "1.0.0" + assert dataset.latest_version == "1.0.1" - new_version = dataset.get_version("1.0.0") + new_version = dataset.get_version("1.0.1") assert new_version.job_id == job2_id diff --git a/tests/func/test_dataset_query.py b/tests/func/test_dataset_query.py index 1813bbb01..3df02990e 100644 --- a/tests/func/test_dataset_query.py +++ b/tests/func/test_dataset_query.py @@ -1063,6 +1063,7 @@ def test_dataset_dependencies_one_storage_as_dependency( "version": "1.0.0", "created_at": listing.created_at, "dependencies": [], + "removed": False, } ] @@ -1089,6 +1090,7 @@ def test_dataset_dependencies_one_dataset_as_dependency( "version": "1.0.0", "created_at": dogs_dataset.get_version("1.0.0").created_at, "dependencies": [], + "removed": False, } ] @@ -1103,6 +1105,7 @@ def test_dataset_dependencies_one_dataset_as_dependency( "version": "1.0.0", "created_at": listing.created_at, "dependencies": [], + "removed": False, } ] @@ -1111,9 +1114,14 @@ def test_dataset_dependencies_one_dataset_as_dependency( for d in catalog.get_dataset_dependencies(ds_name, "1.0.0", indirect=indirect) ] == expected + # Removing keeps the dependency record so dependents can still + # render lineage to the removed source - now flagged with removed=True. catalog.remove_dataset(dogs_dataset.name, force=True) - # None means dependency was there but was removed in the meantime - assert catalog.get_dataset_dependencies(ds_name, "1.0.0") == [None] + expected[0]["removed"] = True + assert [ + dataset_dependency_asdict(d) + for d in catalog.get_dataset_dependencies(ds_name, "1.0.0") + ] == expected @pytest.mark.parametrize("method", ["union", "join"]) @@ -1144,6 +1152,7 @@ def test_dataset_dependencies_multiple_direct_dataset_dependencies( "version": "1.0.0", "created_at": listing.created_at, "dependencies": [], + "removed": False, } expected = [ @@ -1156,6 +1165,7 @@ def test_dataset_dependencies_multiple_direct_dataset_dependencies( "version": "1.0.0", "created_at": dogs_dataset.get_version("1.0.0").created_at, "dependencies": [storage_depenedncy], + "removed": False, }, { "id": ANY, @@ -1166,6 +1176,7 @@ def test_dataset_dependencies_multiple_direct_dataset_dependencies( "version": "1.0.0", "created_at": cats_dataset.get_version("1.0.0").created_at, "dependencies": [storage_depenedncy], + "removed": False, }, ] @@ -1177,22 +1188,27 @@ def test_dataset_dependencies_multiple_direct_dataset_dependencies( key=lambda d: d["name"], ) == sorted(expected, key=lambda d: d["name"]) - # check when removing one dependency + # Removing keeps dependency records intact: the dependent's lineage + # still resolves but the removed entry is flagged. catalog.remove_dataset(dogs_dataset.name, force=True) - expected[0] = None - expected[1]["dependencies"] = [] - + expected[0]["removed"] = True assert sorted( ( dataset_dependency_asdict(d) - for d in catalog.get_dataset_dependencies(ds_name, "1.0.0") + for d in catalog.get_dataset_dependencies(ds_name, "1.0.0", indirect=True) ), - key=lambda d: d["name"] if d else "", - ) == sorted(expected, key=lambda d: d["name"] if d else "") + key=lambda d: d["name"], + ) == sorted(expected, key=lambda d: d["name"]) - # check when removing the other dependency catalog.remove_dataset(cats_dataset.name, force=True) - assert catalog.get_dataset_dependencies(ds_name, "1.0.0") == [None, None] + expected[1]["removed"] = True + assert sorted( + ( + dataset_dependency_asdict(d) + for d in catalog.get_dataset_dependencies(ds_name, "1.0.0", indirect=True) + ), + key=lambda d: d["name"], + ) == sorted(expected, key=lambda d: d["name"]) def test_dataset_dependencies_multiple_union( @@ -1218,6 +1234,7 @@ def test_dataset_dependencies_multiple_union( "version": "1.0.0", "created_at": listing.created_at, "dependencies": [], + "removed": False, } expected = [ @@ -1230,6 +1247,7 @@ def test_dataset_dependencies_multiple_union( "version": "1.0.0", "created_at": dogs_dataset.get_version("1.0.0").created_at, "dependencies": [storage_depenedncy], + "removed": False, }, { "id": ANY, @@ -1240,6 +1258,7 @@ def test_dataset_dependencies_multiple_union( "version": "1.0.0", "created_at": cats_dataset.get_version("1.0.0").created_at, "dependencies": [storage_depenedncy], + "removed": False, }, ] diff --git a/tests/func/test_datasets.py b/tests/func/test_datasets.py index 55d67b07a..6e39d6af1 100644 --- a/tests/func/test_datasets.py +++ b/tests/func/test_datasets.py @@ -361,20 +361,21 @@ def test_create_dataset_whole_bucket(listed_bucket, cloud_test_catalog, project) def test_remove_dataset(test_session, saved_dataset): catalog = test_session.catalog + warehouse = catalog.warehouse dataset_version = saved_dataset.get_version("1.0.0") assert dataset_version.num_objects - catalog.remove_dataset(saved_dataset.name, saved_dataset.project, force=True) - with pytest.raises(DatasetNotFoundError): - catalog.get_dataset(saved_dataset.name) + rows_table = warehouse.dataset_table_name(saved_dataset, "1.0.0") + assert table_row_count(warehouse.db, rows_table) is not None - dataset_table_name = catalog.warehouse.dataset_table_name(saved_dataset, "1.0.0") - assert table_row_count(catalog.warehouse.db, dataset_table_name) is None + catalog.remove_dataset(saved_dataset.name, saved_dataset.project, force=True) - assert ( - catalog.metastore.get_direct_dataset_dependencies(saved_dataset, "1.0.0") == [] - ) + # Dataset row stays with REMOVED versions; rows table gone. + ds = catalog.get_dataset(saved_dataset.name, versions=None, include_incomplete=True) + assert not ds._live_versions + assert all(v.status == DatasetStatus.REMOVED for v in ds.versions) + assert table_row_count(warehouse.db, rows_table) is None def test_remove_dataset_with_multiple_versions(test_session, saved_dataset): @@ -384,18 +385,25 @@ def test_remove_dataset_with_multiple_versions(test_session, saved_dataset): updated_dataset, _ = catalog.create_dataset_version( saved_dataset, "2.0.0", columns=columns ) + catalog.metastore.update_dataset_version( + updated_dataset, "2.0.0", status=DatasetStatus.COMPLETE + ) updated_dataset = catalog.get_dataset(saved_dataset.name, versions=None) assert updated_dataset.has_version("2.0.0") assert updated_dataset.has_version("1.0.0") catalog.remove_dataset(updated_dataset.name, saved_dataset.project, force=True) - with pytest.raises(DatasetNotFoundError): - catalog.get_dataset(updated_dataset.name) - assert ( - catalog.metastore.get_direct_dataset_dependencies(updated_dataset, "1.0.0") - == [] + # Both COMPLETE versions become REMOVED tombstones; dataset row stays. + ds = catalog.get_dataset( + updated_dataset.name, versions=None, include_incomplete=True + ) + assert not ds._live_versions + removed = sorted( + (v for v in ds.versions if v.status == DatasetStatus.REMOVED), + key=lambda v: v.version, ) + assert [v.version for v in removed] == ["1.0.0", "2.0.0"] def test_remove_dataset_dataset_not_found(test_session, project): @@ -903,5 +911,6 @@ def test_dataset_storage_dependencies(cloud_test_catalog, cloud_type, indirect): "version": "1.0.0", "created_at": lst_dataset.get_version("1.0.0").created_at, "dependencies": [], + "removed": False, } ] diff --git a/tests/func/test_delta.py b/tests/func/test_delta.py index 8ff836fbb..17609d8ac 100644 --- a/tests/func/test_delta.py +++ b/tests/func/test_delta.py @@ -140,14 +140,9 @@ def record_processing(id: int) -> int: with pytest.raises(DatasetNotFoundError): dc.read_dataset(source_ds, session=test_session, version="1.0.0") - deps_after_removal = catalog.get_dataset_dependencies( - delta_ds, - "1.0.0", - namespace_name=catalog.metastore.default_project.namespace.name, - project_name=catalog.metastore.default_project.name, - indirect=False, - ) - assert deps_after_removal == [None] + # The dep row still points at the now-REMOVED v1.0.0 of the source + # so the dependent's lineage view still resolves. + assert _get_dependencies(catalog, delta_ds, "1.0.0") == [(source_ds, "1.0.0")] dc.read_dataset( source_ds, diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index 6991a1025..a1cda8ada 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -5009,8 +5009,10 @@ def test_delete_dataset_and_create_with_same_name(test_session): chain.save("nums", version="1.0.0") dc.delete_dataset("nums", force=True, session=test_session) assert "nums" not in dc.datasets(session=test_session).to_values("name") - chain.save("nums", version="1.0.0") + # Removed semver is reserved; auto-bump claims the next slot. + new_chain = chain.save("nums") assert "nums" in dc.datasets(session=test_session).to_values("name") + assert new_chain.dataset.latest_version != "1.0.0" def test_union_does_not_break_schema_order(test_session): diff --git a/tests/unit/test_dataset.py b/tests/unit/test_dataset.py index 8d3d63d4b..b3f508d51 100644 --- a/tests/unit/test_dataset.py +++ b/tests/unit/test_dataset.py @@ -411,10 +411,221 @@ def test_preview_raises_when_not_loaded(dataset_record): def test_latest_version_empty_raises(dataset_record): record = replace(dataset_record, _versions=[], _versions_loaded=True) - with pytest.raises(DatasetVersionNotFoundError, match="has no versions"): + with pytest.raises(DatasetVersionNotFoundError, match="has no live versions"): _ = record.latest_version +def _versions_with_statuses(dataset_record, statuses): + return [ + replace( + dataset_record.versions[0], + id=i + 1, + version=f"{i + 1}.0.0", + status=status, + ) + for i, status in enumerate(statuses) + ] + + +def test_latest_version_returns_max_live(dataset_record): + versions = _versions_with_statuses( + dataset_record, + [DatasetStatus.COMPLETE, DatasetStatus.COMPLETE, DatasetStatus.CREATED], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_version == "3.0.0" + + +def test_latest_version_skips_removed(dataset_record): + versions = _versions_with_statuses( + dataset_record, + [DatasetStatus.COMPLETE, DatasetStatus.COMPLETE, DatasetStatus.REMOVED], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_version == "2.0.0" + + +def test_latest_version_all_removed_raises(dataset_record): + versions = _versions_with_statuses( + dataset_record, [DatasetStatus.REMOVED, DatasetStatus.REMOVED] + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + with pytest.raises(DatasetVersionNotFoundError, match="has no live versions"): + _ = record.latest_version + + +@pytest.mark.parametrize( + "name,expected", + [ + ("lst__s3://bucket/", True), + ("session_abc_123", True), + ("my_dataset", False), + ("ds_session_lookalike", False), + ("lst_no_double_underscore", False), + ], +) +def test_is_internal(dataset_record, name, expected): + record = replace(dataset_record, name=name) + assert record.is_internal is expected + + +@pytest.mark.parametrize( + "status,expected", + [ + (DatasetStatus.REMOVING, True), + (DatasetStatus.REMOVED, True), + (DatasetStatus.REMOVING_TOTAL, True), + (DatasetStatus.COMPLETE, False), + (DatasetStatus.CREATED, False), + (DatasetStatus.FAILED, False), + (DatasetStatus.STALE, False), + (DatasetStatus.PENDING, False), + ], +) +def test_is_removed(dataset_record, status, expected): + version = replace(dataset_record.versions[0], status=status) + assert version.is_removed is expected + + +def _versions_from_pairs(dataset_record, pairs): + return [ + replace( + dataset_record.versions[0], + id=i + 1, + version=version, + status=status, + ) + for i, (version, status) in enumerate(pairs) + ] + + +def test_latest_major_version_returns_max_within_major(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [ + ("1.4.1", DatasetStatus.COMPLETE), + ("2.0.1", DatasetStatus.COMPLETE), + ("2.1.1", DatasetStatus.COMPLETE), + ("2.4.0", DatasetStatus.COMPLETE), + ], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_major_version(2) == "2.4.0" + assert record.latest_major_version(1) == "1.4.1" + + +def test_latest_major_version_missing_returns_none(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [("1.0.0", DatasetStatus.COMPLETE), ("2.0.0", DatasetStatus.COMPLETE)], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_major_version(3) is None + + +def test_latest_major_version_skips_removed(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [ + ("2.0.0", DatasetStatus.COMPLETE), + ("2.1.0", DatasetStatus.COMPLETE), + ("2.4.0", DatasetStatus.REMOVED), + ], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_major_version(2) == "2.1.0" + + +def test_latest_compatible_version_returns_max_matching(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [ + ("1.0.0", DatasetStatus.COMPLETE), + ("1.5.2", DatasetStatus.COMPLETE), + ("2.0.0", DatasetStatus.COMPLETE), + ("2.5.0", DatasetStatus.COMPLETE), + ], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_compatible_version(">=1.0.0,<2.0.0") == "1.5.2" + assert record.latest_compatible_version(">=2.0.0") == "2.5.0" + + +def test_latest_compatible_version_no_match_returns_none(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [("1.0.0", DatasetStatus.COMPLETE), ("2.0.0", DatasetStatus.COMPLETE)], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_compatible_version(">=3.0.0") is None + + +def test_latest_compatible_version_skips_removed(dataset_record): + versions = _versions_from_pairs( + dataset_record, + [ + ("1.0.0", DatasetStatus.COMPLETE), + ("1.5.0", DatasetStatus.REMOVED), + ], + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.latest_compatible_version(">=1.0.0,<2.0.0") == "1.0.0" + + +def test_dataset_list_record_latest_version_returns_max_live(dataset_list_record): + assert dataset_list_record.latest_version().version == "2.0.0" + + +def test_dataset_list_record_latest_version_skips_removed(dataset_list_record): + v3 = replace( + dataset_list_record.versions[0], + id=3, + version="3.0.0", + status=DatasetStatus.REMOVED, + ) + record = replace( + dataset_list_record, + versions=[*dataset_list_record.versions, v3], + ) + assert record.latest_version().version == "2.0.0" + + +def test_dataset_list_record_latest_version_all_removed_raises(dataset_list_record): + versions = [ + replace(v, status=DatasetStatus.REMOVED) for v in dataset_list_record.versions + ] + record = replace(dataset_list_record, versions=versions) + with pytest.raises(DatasetVersionNotFoundError, match="has no live versions"): + record.latest_version() + + +@pytest.mark.parametrize( + "prop,expected", + [ + ("next_version_major", "1.0.0"), + ("next_version_minor", "1.0.0"), + ("next_version_patch", "1.0.0"), + ], +) +def test_next_version_empty(dataset_record, prop, expected): + record = replace(dataset_record, _versions=[], _versions_loaded=True) + assert getattr(record, prop) == expected + + +def test_next_version_skips_removed(dataset_record): + # Existing semvers: 1.0.0 (COMPLETE), 2.0.0 (REMOVED). The REMOVED slot is + # the highest one - bumps must skip it. Counter-example mutation that + # ignores REMOVED would compute bumps off the live max (1.0.0) and return + # 2.0.0 / 1.1.0 / 1.0.1; the assertions below force the real reservation. + versions = _versions_with_statuses( + dataset_record, [DatasetStatus.COMPLETE, DatasetStatus.REMOVED] + ) + record = replace(dataset_record, _versions=versions, _versions_loaded=True) + assert record.next_version_major == "3.0.0" + assert record.next_version_minor == "2.1.0" + assert record.next_version_patch == "2.0.1" + + @pytest.mark.parametrize( "statuses,expected", [ diff --git a/tests/unit/test_dataset_status_management.py b/tests/unit/test_dataset_status_management.py index 62444a7a8..6d7f4c434 100644 --- a/tests/unit/test_dataset_status_management.py +++ b/tests/unit/test_dataset_status_management.py @@ -6,9 +6,14 @@ import sqlalchemy as sa import datachain as dc +from datachain import semver from datachain.data_storage import JobStatus -from datachain.dataset import DatasetRecord, DatasetStatus -from datachain.error import DatasetNotFoundError +from datachain.dataset import ( + SESSION_DATASET_PREFIX, + DatasetRecord, + DatasetStatus, +) +from datachain.error import DataChainError, DatasetNotFoundError from datachain.job import Job from datachain.lib.dc.datasets import ( datasets, @@ -16,6 +21,7 @@ move_dataset, read_dataset, ) +from datachain.lib.listing import LISTING_PREFIX from datachain.sql.types import String @@ -53,7 +59,6 @@ def dataset_complete(test_session, job) -> DatasetRecord: def test_mark_job_dataset_versions_as_failed(test_session, job, dataset_created): - """Test that mark_job_dataset_versions_as_failed marks versions as FAILED.""" # Verify initial status is CREATED dataset = test_session.catalog.get_dataset(dataset_created.name, versions=None) dataset_version = dataset.get_version(dataset.latest_version) @@ -73,7 +78,6 @@ def test_mark_job_dataset_versions_as_failed(test_session, job, dataset_created) def test_mark_job_dataset_versions_as_failed_skips_complete( test_session, job, dataset_complete ): - """Test that mark_job_dataset_versions_as_failed skips COMPLETE versions.""" # Verify initial status is COMPLETE dataset = test_session.catalog.get_dataset(dataset_complete.name, versions=None) dataset_version = dataset.get_version(dataset_complete.latest_version) @@ -89,13 +93,35 @@ def test_mark_job_dataset_versions_as_failed_skips_complete( assert dataset_version.status == DatasetStatus.COMPLETE +def test_mark_job_dataset_versions_as_failed_preserves_tombstones( + test_session, job, dataset_complete +): + """A REMOVED tombstone from a soft delete issued inside the failing job + must survive `mark_job_dataset_versions_as_failed` - otherwise the + tombstone gets flipped to FAILED and then wiped by GC, breaking the + soft-delete permanence guarantee.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + catalog.remove_dataset(dataset_complete.name, version=version, keep_metadata=True) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert _find_removed(ds, version) is not None + + catalog.metastore.mark_job_dataset_versions_as_failed(job.id) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + tombstone = _find_removed(ds, version) + assert tombstone is not None + assert tombstone.status == DatasetStatus.REMOVED + + def test_finalize_job_as_failed_removes_incomplete_dataset_versions( test_session, job, dataset_created, dataset_failed, dataset_complete ): - """ - Test that _finalize_job_as_failed marks dataset versions as FAILED and removes - them right away. - """ from datachain.query.session import Session # Set up Session state as if job is running @@ -128,7 +154,6 @@ def test_finalize_job_as_failed_removes_incomplete_dataset_versions( def test_status_filtering_hides_non_complete_versions( test_session, job, dataset_created, dataset_failed, dataset_complete ): - """Test that non-COMPLETE dataset versions are hidden from queries.""" # Test with include_incomplete=False (what public API/CLI uses) datasets = list(test_session.catalog.ls_datasets()) dataset_names = {d.name for d in datasets} @@ -142,7 +167,6 @@ def test_status_filtering_hides_non_complete_versions( def test_get_dataset_versions_to_clean( test_session, job, dataset_created, dataset_failed, dataset_complete ): - """Test get_dataset_versions_to_clean.""" # Mark job as failed test_session.catalog.metastore.set_job_status(job.id, JobStatus.FAILED) @@ -164,7 +188,6 @@ def test_get_dataset_versions_to_clean( def test_get_dataset_versions_to_clean_skips_running_jobs( test_session, job, dataset_created ): - """Test that gc skips versions whose job is still running.""" # Job is RUNNING — its versions should NOT be returned to_clean = test_session.catalog.metastore.get_dataset_versions_to_clean() assert dataset_created.name not in {ds.name for ds, _ in to_clean} @@ -178,7 +201,6 @@ def test_get_dataset_versions_to_clean_skips_running_jobs( def test_get_dataset_versions_to_clean_scoped_to_job( test_session, job, dataset_created ): - """Test that get_dataset_versions_to_clean with job_id scopes to that job.""" test_session.catalog.metastore.set_job_status(job.id, JobStatus.FAILED) to_clean = test_session.catalog.metastore.get_dataset_versions_to_clean( job_id=job.id @@ -209,6 +231,25 @@ def test_remove_dataset_versions_bulk( test_session.catalog.get_dataset(dataset_failed.name) +def test_remove_dataset_versions_explicit_keep_metadata_tombstones( + test_session, dataset_complete +): + """User-facing bulk delete (``keep_metadata=True``) must tombstone COMPLETE + versions, not wipe them — overrides the GC inference path.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = catalog.get_dataset(dataset_complete.name, versions=None) + vid = ds.get_version(version).id + + n = catalog.remove_dataset_versions(version_ids=[vid], keep_metadata=True) + assert n == 1 + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert _find_removed(ds, version) is not None + + def test_remove_dataset_versions_job_id_filter(test_session, job, dataset_created): test_session.catalog.metastore.set_job_status(job.id, JobStatus.FAILED) ds = test_session.catalog.get_dataset(dataset_created.name, versions=None) @@ -263,7 +304,6 @@ def test_get_dataset_versions_to_clean_finds_no_job_id(test_session): def test_cleanup_dataset_versions(test_session, job, dataset_failed): - """Test cleanup_dataset_versions removes datasets and returns IDs.""" # Mark job as failed test_session.catalog.metastore.set_job_status(job.id, JobStatus.FAILED) @@ -279,7 +319,6 @@ def test_cleanup_dataset_versions(test_session, job, dataset_failed): def test_save_sets_complete_status_at_end(test_session, dataset_complete): - """Test that save() sets COMPLETE status only after all operations.""" # Verify status is COMPLETE dataset_version = dataset_complete.get_version(dataset_complete.latest_version) assert dataset_version.status == DatasetStatus.COMPLETE @@ -292,7 +331,6 @@ def test_save_sets_complete_status_at_end(test_session, dataset_complete): def test_public_api_datasets_filters_non_complete( test_session, dataset_created, dataset_failed, dataset_complete ): - """Test that dc.datasets() filters out non-COMPLETE datasets.""" ds_chain = datasets(session=test_session, column="dataset") dataset_names = {ds.name for (ds,) in ds_chain.to_iter("dataset")} @@ -303,7 +341,6 @@ def test_public_api_datasets_filters_non_complete( @pytest.mark.parametrize("is_studio", [True]) def test_public_api_read_dataset_rejects_non_complete(test_session, studio_job): - """Test that dc.read_dataset() rejects non-COMPLETE datasets.""" ds_created = test_session.catalog.create_dataset( "ds_created_read", columns=(sa.Column("name", String),), job_id=studio_job ) @@ -326,7 +363,6 @@ def test_public_api_read_dataset_rejects_non_complete(test_session, studio_job): def test_public_api_delete_dataset_rejects_non_complete( test_session, dataset_created, dataset_failed ): - """Test that dc.delete_dataset() rejects non-COMPLETE datasets.""" # Should raise error for CREATED dataset with pytest.raises(DatasetNotFoundError): delete_dataset(dataset_created.name, session=test_session) @@ -339,7 +375,6 @@ def test_public_api_delete_dataset_rejects_non_complete( def test_public_api_move_dataset_rejects_non_complete( test_session, dataset_created, dataset_failed ): - """Test that dc.move_dataset() rejects non-COMPLETE datasets.""" # Should raise error for CREATED dataset with pytest.raises(DatasetNotFoundError): move_dataset(dataset_created.name, "new_name_created", session=test_session) @@ -358,7 +393,6 @@ def test_public_api_move_dataset_rejects_non_complete( ids=["finished-job", "running-job"], ) def test_cleanup_session_dataset_versions(test_session, job, job_status, should_clean): - """Test that cleanup_dataset_versions also cleans session_* datasets.""" ds = dc.read_values(value=["a", "b"], session=test_session).save( "session_test_abc123" ) @@ -386,7 +420,7 @@ def dataset_marked_for_removal(test_session, job) -> DatasetRecord: test_session.catalog.metastore.db.execute( dv.update() .where(dv.c.dataset_id == dataset.id) - .values(status=DatasetStatus.REMOVING) + .values(status=DatasetStatus.REMOVING_TOTAL) ) return test_session.catalog.get_dataset(dataset.name, include_incomplete=True) @@ -411,3 +445,543 @@ def test_cleanup_dataset_versions_removes_marked_for_removal( with pytest.raises(DatasetNotFoundError): test_session.catalog.get_dataset(dataset_marked_for_removal.name) + + +def _find_removed(ds: DatasetRecord, version: str): + """Find a REMOVED version by its semver.""" + for v in ds.versions: + if v.status == DatasetStatus.REMOVED and v.version == version: + return v + return None + + +def test_remove_keeps_version_row_and_drops_rows_table(test_session, dataset_complete): + catalog = test_session.catalog + warehouse = catalog.warehouse + version = dataset_complete.latest_version + rows_table = warehouse.dataset_table_name(dataset_complete, version) + assert warehouse.db.has_table(rows_table) + + catalog.remove_dataset_version(dataset_complete, version, keep_metadata=True) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + removed = _find_removed(ds, version) + assert removed is not None + assert removed.status == DatasetStatus.REMOVED + assert removed.removed_at is not None + assert not warehouse.db.has_table(rows_table) + + +def test_remove_preserves_dependencies(test_session, dataset_complete): + catalog = test_session.catalog + metastore = catalog.metastore + + src_version = dataset_complete.latest_version + dep_chain = dc.read_dataset(dataset_complete.name, session=test_session).save( + "ds_dependent" + ) + dep_ds = dep_chain.dataset + assert dep_ds is not None + dep_version = dep_ds.latest_version + assert len(metastore.get_direct_dataset_dependencies(dep_ds, dep_version)) == 1 + + catalog.remove_dataset_version(dataset_complete, src_version, keep_metadata=True) + + deps = metastore.get_direct_dataset_dependencies(dep_ds, dep_version) + assert len(deps) == 1 + assert deps[0] is not None + + +def test_remove_is_idempotent_on_already_removed(test_session, dataset_complete): + catalog = test_session.catalog + version = dataset_complete.latest_version + + catalog.remove_dataset_version(dataset_complete, version, keep_metadata=True) + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + first = _find_removed(ds, version) + assert first is not None + first_removed_at = first.removed_at + + # Second call finds the same row already REMOVED → no-op. + catalog.remove_dataset_version(ds, version, keep_metadata=True) + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert len([v for v in ds.versions if v.status == DatasetStatus.REMOVED]) == 1 + assert _find_removed(ds, version).removed_at == first_removed_at + + +def test_save_after_remove_skips_removed_version(test_session, dataset_complete): + """A removed semver is permanently reserved — the next save auto-bumps + past it instead of reclaiming the slot.""" + catalog = test_session.catalog + name = dataset_complete.name + first_version = dataset_complete.latest_version + + catalog.remove_dataset_version(dataset_complete, first_version, keep_metadata=True) + + new_chain = dc.read_values(value=["new1"], session=test_session).save(name) + assert new_chain.dataset is not None + assert new_chain.dataset.latest_version != first_version + assert semver.value(new_chain.dataset.latest_version) > semver.value(first_version) + + # The old row lives on as a REMOVED record for lineage. + ds = catalog.get_dataset(name, versions=None, include_incomplete=True) + assert _find_removed(ds, first_version) is not None + + +def test_remove_keep_metadata_false_wipes_already_removed_version( + test_session, dataset_complete +): + """keep_metadata=False wipes a REMOVED record completely + (version row gone, dataset row gone if it was the last).""" + catalog = test_session.catalog + name = dataset_complete.name + version = dataset_complete.latest_version + + catalog.remove_dataset_version(dataset_complete, version, keep_metadata=True) + ds = catalog.get_dataset(name, versions=None, include_incomplete=True) + assert _find_removed(ds, version) is not None + + catalog.remove_dataset_version(ds, version, keep_metadata=False) + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_remove_keep_metadata_false_wipes_live_complete_version( + test_session, dataset_complete +): + """keep_metadata=False wipes a fresh COMPLETE version without leaving a record.""" + catalog = test_session.catalog + name = dataset_complete.name + version = dataset_complete.latest_version + + catalog.remove_dataset_version(dataset_complete, version, keep_metadata=False) + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_save_explicit_removed_version_rejected(test_session, dataset_complete): + """Saving with an explicit version that matches a REMOVED one must fail.""" + catalog = test_session.catalog + name = dataset_complete.name + version = dataset_complete.latest_version + + catalog.remove_dataset_version(dataset_complete, version, keep_metadata=True) + + with pytest.raises(RuntimeError, match=f"already has version {version}"): + dc.read_values(value=["new1"], session=test_session).save(name, version=version) + + +def test_latest_version_skips_removed(test_session, dataset_complete): + catalog = test_session.catalog + name = dataset_complete.name + v1 = dataset_complete.latest_version + + dc.read_values(value=["v2-a", "v2-b"], session=test_session).save(name) + ds = catalog.get_dataset(name, versions=None, include_incomplete=True) + v2 = ds.latest_version + assert v2 != v1 + + catalog.remove_dataset_version(ds, v2, keep_metadata=True) + + ds = catalog.get_dataset(name, versions=None, include_incomplete=True) + assert ds.latest_version == v1 + + +def test_listing_excludes_removed_only_dataset(test_session, dataset_complete): + catalog = test_session.catalog + name = dataset_complete.name + + catalog.remove_dataset_version( + dataset_complete, dataset_complete.latest_version, keep_metadata=True + ) + + ds_chain = datasets(session=test_session, column="dataset") + assert name not in {ds.name for (ds,) in ds_chain.to_iter("dataset")} + + +def test_read_dataset_after_remove_raises( + test_session, dataset_complete, no_studio_dataset +): + catalog = test_session.catalog + name = dataset_complete.name + + catalog.remove_dataset_version( + dataset_complete, dataset_complete.latest_version, keep_metadata=True + ) + + with pytest.raises(DatasetNotFoundError): + read_dataset(name, session=test_session) + + +def test_janitor_still_hard_deletes_created_version(test_session, job, dataset_created): + """The cleanup path must still hard-delete non-COMPLETE versions — we + don't want REMOVED rows piling up for failed/abandoned saves.""" + catalog = test_session.catalog + catalog.metastore.set_job_status(job.id, JobStatus.FAILED) + + catalog.cleanup_dataset_versions() + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(dataset_created.name, include_incomplete=True) + + +def test_remove_non_complete_version_is_hard_delete(test_session, dataset_failed): + catalog = test_session.catalog + name = dataset_failed.name + catalog.remove_dataset_version( + dataset_failed, dataset_failed.latest_version, keep_metadata=False + ) + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def _make_completed_dataset(catalog, name: str, project=None): + listing = name.startswith(LISTING_PREFIX) + ds = catalog.create_dataset( + name, + project=project, + columns=(sa.Column("name", String),), + listing=listing, + ) + catalog.metastore.update_dataset_version( + ds, ds.latest_version, status=DatasetStatus.COMPLETE + ) + return catalog.get_dataset( + name, + namespace_name=project.namespace.name if project else None, + project_name=project.name if project else None, + versions=None, + include_incomplete=True, + ) + + +def test_listing_dataset_never_keeps_metadata(test_session): + """`lst__*` listing datasets must never keep metadata on remove — they're + internal cache, keeping a record serves nobody and wastes rows.""" + catalog = test_session.catalog + metastore = catalog.metastore + listing_project = metastore.get_project( + metastore.listing_project_name, metastore.system_namespace_name + ) + name = f"{LISTING_PREFIX}internal_test" + ds = _make_completed_dataset(catalog, name, project=listing_project) + + with pytest.raises(DataChainError, match="while keeping metadata"): + catalog.remove_dataset_version(ds, ds.latest_version, keep_metadata=True) + + catalog.remove_dataset_version(ds, ds.latest_version, keep_metadata=False) + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_session_dataset_never_keeps_metadata(test_session): + """`session_*` intermediates are throwaway too — always fully removed.""" + catalog = test_session.catalog + name = f"{SESSION_DATASET_PREFIX}internal_test" + ds = _make_completed_dataset(catalog, name) + + with pytest.raises(DataChainError, match="while keeping metadata"): + catalog.remove_dataset_version(ds, ds.latest_version, keep_metadata=True) + + catalog.remove_dataset_version(ds, ds.latest_version, keep_metadata=False) + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_save_skips_reserved_semver_after_delete(test_session): + """After save -> save -> soft delete latest -> save, the new save must + skip the REMOVED slot and auto-bump past it (semver permanently + reserved).""" + catalog = test_session.catalog + dc.read_values(value=["a"], session=test_session).save("reserve_test") + second = dc.read_values(value=["b"], session=test_session).save("reserve_test") + removed_version = second.dataset.latest_version + catalog.remove_dataset("reserve_test", version=removed_version, keep_metadata=True) + + third = dc.read_values(value=["c"], session=test_session).save("reserve_test") + assert third.dataset.latest_version != removed_version + assert semver.value(third.dataset.latest_version) > semver.value(removed_version) + + +def test_dependency_removed_flag(test_session): + """A dataset dependency pointing at a soft-deleted version is returned + with ``removed=True`` so delta-style consumers can filter it without a + separate query.""" + catalog = test_session.catalog + source = dc.read_values(value=["a", "b"], session=test_session).save("dep_source") + dc.read_dataset("dep_source", session=test_session).save("dep_target") + catalog.remove_dataset(source.dataset.name, version="1.0.0", keep_metadata=True) + + deps = catalog.get_dataset_dependencies("dep_target", "1.0.0") + assert len(deps) == 1 + assert deps[0] is not None + assert deps[0].name == "dep_source" + assert deps[0].removed is True + + +def test_dependency_after_wipe_returns_none(test_session): + """When the source version is fully wiped (``keep_metadata=False``), the + dependency row is left with a broken reference and the lineage view + surfaces it as ``None`` - contrasts with the tombstone path covered by + [[test_dependency_removed_flag]].""" + catalog = test_session.catalog + source = dc.read_values(value=["a", "b"], session=test_session).save("wipe_source") + dc.read_dataset("wipe_source", session=test_session).save("wipe_target") + catalog.remove_dataset(source.dataset.name, version="1.0.0", keep_metadata=False) + + deps = catalog.get_dataset_dependencies("wipe_target", "1.0.0") + assert deps == [None] + + +def test_export_dataset_table_refuses_tombstone(test_session, dataset_complete): + """Exporting a REMOVED tombstone must raise upfront instead of running + and crashing later against the dropped rows table.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + catalog.remove_dataset(dataset_complete.name, version=version, keep_metadata=True) + + with pytest.raises(DatasetNotFoundError): + catalog.export_dataset_table( + bucket="s3://does-not-matter", + name=dataset_complete.name, + version=version, + base_file_name="export", + ) + + +def test_bulk_wipe_does_not_cascade_dataset_row(test_session, dataset_complete): + """A GC-shaped wipe of a single version (whose in-memory DatasetRecord + only carries that one version) must not delete the dataset row when + other versions still exist in DB - otherwise FK cascade takes REMOVED + tombstones and live versions down with it.""" + catalog = test_session.catalog + # Add a second version, then mark it FAILED so GC will pick it up. + columns = tuple( + sa.Column(name, typ) for name, typ in dataset_complete.schema.items() + ) + second, _ = catalog.create_dataset_version( + dataset_complete, "2.0.0", columns=columns + ) + catalog.metastore.update_dataset_version( + second, "2.0.0", status=DatasetStatus.FAILED + ) + + # Soft-delete v1.0.0 to get a tombstone we expect to survive. + catalog.remove_dataset(dataset_complete.name, version="1.0.0", keep_metadata=True) + + # GC wipes v2.0.0 via the bulk path - this builds a single-version + # DatasetRecord per row internally. + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + v2_id = ds.get_version("2.0.0").id + catalog.remove_dataset_versions(version_ids=[v2_id]) + + # Dataset row must survive; tombstone must survive. + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert _find_removed(ds, "1.0.0") is not None + assert not ds.has_version("2.0.0") + + +def test_rename_dataset_across_tombstone(test_session, dataset_complete): + """Renaming a dataset that has a REMOVED tombstone version must succeed: + the tombstone has no rows table, so its rename is skipped, and live + versions are renamed normally.""" + catalog = test_session.catalog + second = dc.read_values(value=["v2a", "v2b"], session=test_session).save( + dataset_complete.name + ) + live_version = second.dataset.latest_version + + catalog.remove_dataset(dataset_complete.name, version="1.0.0", keep_metadata=True) + + catalog.edit_dataset(dataset_complete.name, new_name="ds_complete_renamed") + + renamed = dc.read_dataset( + "ds_complete_renamed", version=live_version, session=test_session + ) + assert sorted(renamed.to_values("value")) == ["v2a", "v2b"] + + +def test_remove_dataset_force_keep_metadata_mixed_versions( + test_session, dataset_complete +): + """`remove_dataset(force=True, keep_metadata=True)` on a mixed-state + dataset tombstones soft-deletable versions and transparently wipes the + rest - keep_metadata is meaningful only where there is semver/lineage + worth preserving.""" + catalog = test_session.catalog + columns = tuple( + sa.Column(name, typ) for name, typ in dataset_complete.schema.items() + ) + updated, _ = catalog.create_dataset_version( + dataset_complete, "2.0.0", columns=columns + ) + catalog.metastore.update_dataset_version( + updated, "2.0.0", status=DatasetStatus.FAILED + ) + + catalog.remove_dataset(dataset_complete.name, force=True, keep_metadata=True) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + # COMPLETE version -> tombstoned; FAILED version -> wiped. + assert _find_removed(ds, "1.0.0") is not None + assert not ds.has_version("2.0.0") + + +def test_remove_dataset_force_keep_metadata_internal_downgrades_to_wipe(test_session): + """Internal datasets (`lst__*`, `session_*`) have no semver/lineage to + preserve, so `keep_metadata=True` transparently downgrades to a full wipe. + """ + catalog = test_session.catalog + name = f"{SESSION_DATASET_PREFIX}force_test" + ds = _make_completed_dataset(catalog, name) + + catalog.remove_dataset(ds.name, force=True, keep_metadata=True) + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(ds.name, include_incomplete=True) + + +def _force_status(catalog, dataset: DatasetRecord, version: str, status: int): + """Put a version into a specific status directly. Simulates a mid-flight + removal that crashed, or another caller having claimed the transition.""" + catalog.metastore.update_dataset_version(dataset, version, status=status) + return catalog.get_dataset(dataset.name, versions=None, include_incomplete=True) + + +def test_gc_resumes_stuck_removing(test_session, dataset_complete): + """A version stuck in REMOVING (previous soft-delete crashed mid-flight) + is resumed to REMOVED by the GC path — _remove_versions picks the soft + path from the current status.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVING) + vid = ds.get_version(version).id + + catalog.remove_dataset_versions(version_ids=[vid]) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert _find_removed(ds, version) is not None + + +def test_gc_skips_removed_tombstones(test_session, dataset_complete): + """A REMOVED version handed to the GC path must be a no-op - the + tombstone has to be preserved, not wiped through inferred + keep_metadata=False.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVED) + vid = ds.get_version(version).id + + catalog.remove_dataset_versions(version_ids=[vid]) + + ds = catalog.get_dataset( + dataset_complete.name, versions=None, include_incomplete=True + ) + assert _find_removed(ds, version) is not None + + +def test_gc_wipes_internal_stuck_in_removing(test_session): + """An internal `session_*` dataset stuck in REMOVING (crashed mid-flight) + must be wiped by the GC path - inference must not pick the soft path + for internal datasets, since they can't be kept as metadata tombstones.""" + catalog = test_session.catalog + name = f"{SESSION_DATASET_PREFIX}gc_stuck_test" + ds = _make_completed_dataset(catalog, name) + version = ds.latest_version + ds = _force_status(catalog, ds, version, DatasetStatus.REMOVING) + vid = ds.get_version(version).id + + catalog.remove_dataset_versions(version_ids=[vid]) + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_gc_resumes_stuck_removing_total(test_session, dataset_complete): + """A version stuck in REMOVING_TOTAL is resumed to a full wipe by the GC + path — _remove_versions picks the wipe path.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + name = dataset_complete.name + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVING_TOTAL) + vid = ds.get_version(version).id + + catalog.remove_dataset_versions(version_ids=[vid]) + + with pytest.raises(DatasetNotFoundError): + catalog.get_dataset(name, include_incomplete=True) + + +def test_remove_explicit_keep_on_inflight_wipe_raises(test_session, dataset_complete): + """If a wipe is in flight (REMOVING_TOTAL) and a caller explicitly asks + to keep metadata, raise — don't silently downgrade the in-flight wipe.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVING_TOTAL) + + with pytest.raises(DataChainError, match="while keeping metadata"): + catalog.remove_dataset_version(ds, version, keep_metadata=True) + + +def test_remove_explicit_wipe_on_inflight_keep_raises(test_session, dataset_complete): + """If a soft delete is in flight (REMOVING) and a caller explicitly asks + to wipe, raise — don't silently escalate the in-flight soft delete.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVING) + + with pytest.raises(DataChainError, match="entirely"): + catalog.remove_dataset_version(ds, version, keep_metadata=False) + + +def test_complete_raises_when_version_removed_concurrently( + test_session, dataset_complete +): + """If a version is removed before completion finishes, the guarded + final status flip refuses to stomp it and raises a clean error + instead of silently corrupting state.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVED) + + with pytest.raises(DataChainError, match="Could not update status"): + catalog.metastore.update_dataset_status( + ds, + DatasetStatus.COMPLETE, + version=version, + expected_status=DatasetStatus.CREATED, + ) + + +def test_complete_dataset_version_raises_friendly_when_removed_concurrently( + test_session, dataset_complete +): + """End-to-end: when a version is flipped to REMOVING mid-save (e.g. by GC), + catalog.complete_dataset_version surfaces a user-friendly DataChainError + that explains the concurrent removal and suggests a retry.""" + catalog = test_session.catalog + version = dataset_complete.latest_version + ds = _force_status(catalog, dataset_complete, version, DatasetStatus.REMOVING) + + with pytest.raises(DataChainError, match="deleted concurrently") as exc_info: + catalog.complete_dataset_version(ds, version) + + # original guard error is chained as the cause + assert isinstance(exc_info.value.__cause__, DataChainError) + assert "Could not update status" in str(exc_info.value.__cause__) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 8d6882590..29a75c4f7 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -3,7 +3,7 @@ import pytest import datachain as dc -from datachain.dataset import DatasetStatus +from datachain.dataset import SESSION_DATASET_PREFIX, DatasetStatus from datachain.error import DataChainError, DatasetNotFoundError from datachain.query.session import Session @@ -31,7 +31,7 @@ def test_ephemeral_dataset_naming(catalog, project): session_uuid = f"[0-9a-fA-F]{{{Session.SESSION_UUID_LEN}}}" table_uuid = f"[0-9a-fA-F]{{{Session.TEMP_TABLE_UUID_LEN}}}" - name_prefix = f"{Session.DATASET_PREFIX}{session_name}" + name_prefix = f"{SESSION_DATASET_PREFIX}{session_name}" pattern = rf"^{name_prefix}_{session_uuid}_{table_uuid}$" assert re.match(pattern, ds_tmp.name) is not None @@ -46,7 +46,7 @@ def test_global_session_naming(catalog, project): dc.read_values(name=["a"], session=global_session).save(fqn) tmp_name = global_session.generate_temp_dataset_name() ds_tmp = dc.read_dataset(fqn, session=global_session).save(tmp_name) - global_prefix = f"{Session.DATASET_PREFIX}{Session.GLOBAL_SESSION_NAME}" + global_prefix = f"{SESSION_DATASET_PREFIX}{Session.GLOBAL_SESSION_NAME}" pattern = rf"^{global_prefix}_{session_uuid}_{table_uuid}$" assert re.match(pattern, ds_tmp.name) is not None @@ -79,7 +79,7 @@ def test_ephemeral_dataset_lifecycle(catalog, project): assert ds_tmp.name != "my_test_ds12" assert ds_tmp.name is not None - assert ds_tmp.name.startswith(Session.DATASET_PREFIX) + assert ds_tmp.name.startswith(SESSION_DATASET_PREFIX) assert session_name in ds_tmp.name ds = catalog.get_dataset(ds_tmp.name)