diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 1bf1f10db..f862c8974 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -108,6 +108,7 @@ set(ICEBERG_SOURCES update/merging_snapshot_update.cc update/overwrite_files.cc update/pending_update.cc + update/rewrite_manifests.cc update/row_delta.cc update/set_snapshot.cc update/snapshot_manager.cc diff --git a/src/iceberg/manifest/rolling_manifest_writer.cc b/src/iceberg/manifest/rolling_manifest_writer.cc index 1648ca86a..a2d651e74 100644 --- a/src/iceberg/manifest/rolling_manifest_writer.cc +++ b/src/iceberg/manifest/rolling_manifest_writer.cc @@ -54,6 +54,13 @@ Status RollingManifestWriter::WriteExistingEntry( return {}; } +Status RollingManifestWriter::WriteExistingEntry(const ManifestEntry& entry) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + current_file_rows_++; + return {}; +} + Status RollingManifestWriter::WriteDeletedEntry( std::shared_ptr file, int64_t data_sequence_number, std::optional file_sequence_number) { diff --git a/src/iceberg/manifest/rolling_manifest_writer.h b/src/iceberg/manifest/rolling_manifest_writer.h index 6e0211f55..ed0331832 100644 --- a/src/iceberg/manifest/rolling_manifest_writer.h +++ b/src/iceberg/manifest/rolling_manifest_writer.h @@ -76,6 +76,9 @@ class ICEBERG_EXPORT RollingManifestWriter { int64_t data_sequence_number, std::optional file_sequence_number = std::nullopt); + /// \brief Add an existing entry while preserving snapshot and sequence fields. + Status WriteExistingEntry(const ManifestEntry& entry); + /// \brief Add a delete entry for a file. /// /// \param file a deleted data file diff --git a/src/iceberg/manifest/v3_metadata.cc b/src/iceberg/manifest/v3_metadata.cc index 9148e0676..9026ae891 100644 --- a/src/iceberg/manifest/v3_metadata.cc +++ b/src/iceberg/manifest/v3_metadata.cc @@ -194,13 +194,20 @@ Status ManifestFileAdapterV3::Init() { } Status ManifestFileAdapterV3::Append(const ManifestFile& file) { - ICEBERG_RETURN_UNEXPECTED(AppendInternal(file)); if (WrapFirstRowId(file)) { if (!next_row_id_.has_value()) { return InvalidManifestList("Missing next-row-id for file: {}", file.manifest_path); } - next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0) + - file.added_rows_count.value_or(0); + if (!file.existing_rows_count.has_value() || !file.added_rows_count.has_value()) { + return InvalidManifestList("Missing row counts for file: {}", file.manifest_path); + } + } + + ICEBERG_RETURN_UNEXPECTED(AppendInternal(file)); + + if (WrapFirstRowId(file)) { + next_row_id_ = next_row_id_.value() + file.existing_rows_count.value() + + file.added_rows_count.value(); } return {}; } diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 1fa15fa12..9d2d825f3 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -133,6 +133,7 @@ iceberg_sources = files( 'update/merging_snapshot_update.cc', 'update/overwrite_files.cc', 'update/pending_update.cc', + 'update/rewrite_manifests.cc', 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 9dbc5acf7..2b2440f2f 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" +#include "iceberg/update/rewrite_manifests.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -245,6 +246,12 @@ Result> Table::NewOverwrite() { return OverwriteFiles::Make(name().name, std::move(ctx)); } +Result> Table::NewRewriteManifests() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return RewriteManifests::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -360,6 +367,10 @@ Result> StaticTable::NewOverwrite() { return NotSupported("Cannot create an overwrite for a static table"); } +Result> StaticTable::NewRewriteManifests() { + return NotSupported("Cannot create a rewrite manifests for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 64ed21ef8..e777e4ca8 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -188,6 +188,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. virtual Result> NewOverwrite(); + /// \brief Create a new RewriteManifests to rewrite manifest layout. + virtual Result> NewRewriteManifests(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -263,6 +266,8 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewOverwrite() override; + Result> NewRewriteManifests() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fcbc22126..8bd47476c 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -234,6 +234,7 @@ if(ICEBERG_BUILD_BUNDLE) merge_append_test.cc merging_snapshot_update_test.cc name_mapping_update_test.cc + rewrite_manifests_test.cc row_delta_test.cc snapshot_manager_test.cc transaction_test.cc diff --git a/src/iceberg/test/merge_append_test.cc b/src/iceberg/test/merge_append_test.cc index 601d9476e..7af0463d3 100644 --- a/src/iceberg/test/merge_append_test.cc +++ b/src/iceberg/test/merge_append_test.cc @@ -88,10 +88,9 @@ class MergeAppendTestBase : public UpdateTestBase { auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", table_location_, Uuid::GenerateV7().ToString()); ICEBERG_UNWRAP_OR_FAIL( - auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json")); + auto metadata, ReadTableMetadataFromResource("TableMetadataV3ValidMinimal.json")); metadata->format_version = format_version; metadata->location = table_location_; - metadata->next_row_id = TableMetadata::kInitialRowId; ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), IsOk()); diff --git a/src/iceberg/test/rewrite_manifests_test.cc b/src/iceberg/test/rewrite_manifests_test.cc new file mode 100644 index 000000000..01e1bb11d --- /dev/null +++ b/src/iceberg/test/rewrite_manifests_test.cc @@ -0,0 +1,1872 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_manifests.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/executor.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transform.h" +#include "iceberg/update/delete_files.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/row_delta.h" +#include "iceberg/update/update_partition_spec.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class RewriteManifestsTest : public UpdateTestBase, + public ::testing::WithParamInterface { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + table_ident_ = TableIdentifier{.name = TableName()}; + table_location_ = "/warehouse/" + TableName(); + + InitializeFileIO(); + RegisterMinimalTable(GetParam()); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + file_a_ = MakeDataFile("a", 10, 100, 1); + file_b_ = MakeDataFile("b", 20, 200, 2); + file_c_ = MakeDataFile("c", 30, 300, 3); + file_d_ = MakeDataFile("d", 40, 400, 4); + } + + void RegisterMinimalTable(int8_t format_version) { + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL( + auto metadata, ReadTableMetadataFromResource("TableMetadataV3ValidMinimal.json")); + metadata->format_version = format_version; + metadata->location = table_location_; + + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + ICEBERG_UNWRAP_OR_FAIL(table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + } + + std::shared_ptr MakeDataFile(const std::string& name, int64_t record_count, + int64_t size, int64_t partition_value) { + return MakeDataFile(name, spec_, record_count, size, + {Literal::Long(partition_value)}); + } + + std::shared_ptr MakeDataFile(const std::string& name, + std::shared_ptr spec, + int64_t record_count, int64_t size, + std::vector partition_values) { + auto file = std::make_shared(); + file->content = DataFile::Content::kData; + file->file_path = std::format("{}/data/{}.parquet", table_location_, name); + file->file_format = FileFormatType::kParquet; + file->partition = PartitionValues(std::move(partition_values)); + file->record_count = record_count; + file->file_size_in_bytes = size; + file->partition_spec_id = spec->spec_id(); + return file; + } + + std::shared_ptr MakeDeleteFile( + const std::string& name, int64_t partition_value, + std::optional referenced_data_file = std::nullopt) { + auto file = MakeDataFile(name, 7, 50, partition_value); + file->content = DataFile::Content::kPositionDeletes; + file->file_path = std::format("{}/delete/{}.parquet", table_location_, name); + if (table_->metadata()->format_version >= 3) { + file->file_format = FileFormatType::kPuffin; + file->referenced_data_file = referenced_data_file.value_or(file_a_->file_path); + file->content_offset = 0; + file->content_size_in_bytes = 10; + } + return file; + } + + std::shared_ptr MakeEqualityDeleteFile(const std::string& name, + int64_t partition_value) { + auto file = MakeDataFile(name, /*record_count=*/1, /*size=*/10, partition_value); + file->content = DataFile::Content::kEqualityDeletes; + file->file_path = std::format("{}/delete/{}.parquet", table_location_, name); + file->equality_ids = {1}; + return file; + } + + Status AppendFiles(std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto append, table_->NewFastAppend()); + for (const auto& file : files) { + append->AppendFile(file); + } + ICEBERG_RETURN_UNEXPECTED(append->Commit()); + return table_->Refresh(); + } + + Status AppendDeleteFiles(std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto row_delta, table_->NewRowDelta()); + for (const auto& file : files) { + row_delta->AddDeletes(file); + } + ICEBERG_RETURN_UNEXPECTED(row_delta->Commit()); + return table_->Refresh(); + } + + Status RemoveDeleteFiles(std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto row_delta, table_->NewRowDelta()); + for (const auto& file : files) { + row_delta->RemoveDeletes(file); + } + ICEBERG_RETURN_UNEXPECTED(row_delta->Commit()); + return table_->Refresh(); + } + + void SetSnapshotIdInheritanceEnabled() { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kSnapshotIdInheritanceEnabled.key()), "true"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + void SetManifestMergeEnabled(bool enabled) { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestMergeEnabled.key()), + enabled ? "true" : "false"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + void SetCommitRetryProperties(int32_t retries) { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kCommitNumRetries.key()), + std::to_string(retries)); + props->Set(std::string(TableProperties::kCommitMinRetryWaitMs.key()), "1"); + props->Set(std::string(TableProperties::kCommitMaxRetryWaitMs.key()), "1"); + props->Set(std::string(TableProperties::kCommitTotalRetryTimeMs.key()), "1000"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + void BindTableWithFailingCommits(int failures) { + auto mock_catalog = std::make_shared<::testing::NiceMock>(); + std::weak_ptr<::testing::NiceMock> weak_catalog = mock_catalog; + + ON_CALL(*mock_catalog, LoadTable(::testing::_)) + .WillByDefault([this, weak_catalog](const TableIdentifier& identifier) + -> Result> { + ICEBERG_ASSIGN_OR_RAISE(auto loaded, catalog_->LoadTable(identifier)); + auto catalog = weak_catalog.lock(); + ICEBERG_PRECHECK(catalog != nullptr, "Mock catalog expired"); + return Table::Make(loaded->name(), loaded->metadata(), + std::string(loaded->metadata_file_location()), loaded->io(), + catalog); + }); + + ON_CALL(*mock_catalog, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [this, weak_catalog, failures]( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) mutable + -> Result> { + if (failures-- > 0) { + return CommitFailed("Injected failure"); + } + ICEBERG_ASSIGN_OR_RAISE( + auto updated, catalog_->UpdateTable(identifier, requirements, updates)); + auto catalog = weak_catalog.lock(); + ICEBERG_PRECHECK(catalog != nullptr, "Mock catalog expired"); + return Table::Make(updated->name(), updated->metadata(), + std::string(updated->metadata_file_location()), + updated->io(), catalog); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto bound_table, + Table::Make(table_->name(), table_->metadata(), + std::string(table_->metadata_file_location()), + table_->io(), mock_catalog)); + table_ = std::move(bound_table); + mock_catalog_ = std::move(mock_catalog); + } + + void ValidateSummary(const std::shared_ptr& snapshot, int replaced, int kept, + int created, int entry_count) { + ASSERT_NE(snapshot, nullptr); + const auto& summary = snapshot->summary; + EXPECT_THAT(summary, ::testing::Contains(::testing::Pair( + std::string(SnapshotSummaryFields::kManifestsReplaced), + std::to_string(replaced)))); + EXPECT_THAT(summary, ::testing::Contains(::testing::Pair( + std::string(SnapshotSummaryFields::kManifestsKept), + std::to_string(kept)))); + EXPECT_THAT(summary, ::testing::Contains(::testing::Pair( + std::string(SnapshotSummaryFields::kManifestsCreated), + std::to_string(created)))); + EXPECT_THAT(summary, ::testing::Contains(::testing::Pair( + std::string(SnapshotSummaryFields::kEntriesProcessed), + std::to_string(entry_count)))); + } + + void MatchNumberOfManifestFileWithSpecId(std::span manifests, + int32_t partition_spec_id, + int expected_manifest_count) { + const auto matched_manifest_count = std::ranges::count_if( + manifests, [partition_spec_id](const ManifestFile& manifest) { + return manifest.partition_spec_id == partition_spec_id; + }); + EXPECT_EQ(matched_manifest_count, expected_manifest_count); + } + + bool FileExists(const std::string& path) { + auto input_file = file_io_->NewInputFile(path); + if (!input_file.has_value()) { + return false; + } + return input_file.value()->Size().has_value(); + } + + Result ManifestContainingPath( + std::span manifests, const std::string& path) { + for (const auto& manifest : manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto entries, ReadManifestEntries(manifest)); + for (const auto& entry : entries) { + if (entry.data_file != nullptr && entry.data_file->file_path == path) { + return &manifest; + } + } + } + return NotFound("Manifest containing path {} is not found", path); + } + + void ExpectManifestEntryContents(const ManifestFile& manifest, + std::vector expected_paths, + std::vector expected_contents) { + ASSERT_EQ(expected_contents.size(), expected_paths.size()); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadManifestEntries(manifest)); + ASSERT_EQ(entries.size(), expected_paths.size()); + + std::vector> actual; + actual.reserve(entries.size()); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, entry.data_file->content); + } + + std::vector> expected; + expected.reserve(expected_paths.size()); + for (size_t i = 0; i < expected_paths.size(); ++i) { + expected.emplace_back(std::move(expected_paths[i]), expected_contents[i]); + } + EXPECT_THAT(actual, ::testing::UnorderedElementsAreArray(expected)); + } + + Result> CurrentManifests() { + ICEBERG_RETURN_UNEXPECTED(table_->Refresh()); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table_->current_snapshot()); + SnapshotCache cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, cache.DataManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + Result> CurrentDeleteManifests() { + ICEBERG_RETURN_UNEXPECTED(table_->Refresh()); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table_->current_snapshot()); + SnapshotCache cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, cache.DeleteManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + Result WriteExistingManifest( + const std::string& name, int64_t snapshot_id, + const std::vector>& files, + ManifestContent content = ManifestContent::kData, + std::shared_ptr manifest_spec = nullptr, + int64_t data_sequence_number = TableMetadata::kInitialSequenceNumber, + std::optional file_sequence_number = std::nullopt) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + auto spec = manifest_spec != nullptr ? std::move(manifest_spec) : spec_; + std::vector> files_to_write; + files_to_write.reserve(files.size()); + std::optional first_row_id = std::nullopt; + if (table_->metadata()->format_version >= 3 && content == ManifestContent::kData) { + first_row_id = TableMetadata::kInitialRowId; + ICEBERG_ASSIGN_OR_RAISE(auto current_manifests, CurrentManifests()); + std::unordered_map> current_files_by_path; + for (const auto& manifest : current_manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto entries, ReadManifestEntries(manifest)); + for (const auto& entry : entries) { + if (entry.IsAlive() && entry.data_file != nullptr) { + current_files_by_path.emplace(entry.data_file->file_path, + std::make_shared(*entry.data_file)); + } + } + } + for (const auto& file : files) { + auto file_to_write = file; + if (!file_to_write->first_row_id.has_value()) { + if (auto it = current_files_by_path.find(file->file_path); + it != current_files_by_path.end()) { + file_to_write = it->second; + } + } + files_to_write.push_back(std::move(file_to_write)); + } + } else { + files_to_write = files; + } + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(table_->metadata()->format_version, kInvalidSnapshotId, + path, file_io_, spec, schema_, content, first_row_id)); + for (const auto& file : files_to_write) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + file, snapshot_id, data_sequence_number, file_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + struct ExistingManifestEntry { + std::shared_ptr file; + int64_t snapshot_id; + int64_t data_sequence_number; + int64_t file_sequence_number; + }; + + Result WriteExistingDeleteManifest( + const std::string& name, const std::vector& entries) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + kInvalidSnapshotId, path, file_io_, spec_, + schema_, ManifestContent::kDeletes)); + for (const auto& entry : entries) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry.file, entry.snapshot_id, + entry.data_sequence_number, + entry.file_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + Result WriteExistingManifest( + const std::string& name, + const std::vector, int64_t>>& files) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + std::vector, int64_t>> files_to_write; + files_to_write.reserve(files.size()); + std::optional first_row_id = std::nullopt; + if (table_->metadata()->format_version >= 3) { + first_row_id = TableMetadata::kInitialRowId; + ICEBERG_ASSIGN_OR_RAISE(auto current_manifests, CurrentManifests()); + std::unordered_map> current_files_by_path; + for (const auto& manifest : current_manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto entries, ReadManifestEntries(manifest)); + for (const auto& entry : entries) { + if (entry.IsAlive() && entry.data_file != nullptr) { + current_files_by_path.emplace(entry.data_file->file_path, + std::make_shared(*entry.data_file)); + } + } + } + for (const auto& [file, snapshot_id] : files) { + auto file_to_write = file; + if (!file_to_write->first_row_id.has_value()) { + if (auto it = current_files_by_path.find(file->file_path); + it != current_files_by_path.end()) { + file_to_write = it->second; + } + } + files_to_write.emplace_back(std::move(file_to_write), snapshot_id); + } + } else { + files_to_write = files; + } + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter( + table_->metadata()->format_version, kInvalidSnapshotId, path, + file_io_, spec_, schema_, ManifestContent::kData, first_row_id)); + for (const auto& [file, snapshot_id] : files_to_write) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + file, snapshot_id, TableMetadata::kInitialSequenceNumber)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + Result WriteAddedManifest( + const std::string& name, const std::vector>& files, + std::optional entry_snapshot_id = std::nullopt, + std::optional data_sequence_number = std::nullopt) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + entry_snapshot_id, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(file, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest, writer->ToManifestFile()); + manifest.added_snapshot_id = kInvalidSnapshotId; + return manifest; + } + + Result WriteDeletedManifest( + const std::string& name, const std::vector>& files, + std::optional entry_snapshot_id = std::nullopt, + int64_t data_sequence_number = TableMetadata::kInitialSequenceNumber) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + entry_snapshot_id, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(file, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; + std::shared_ptr file_c_; + std::shared_ptr file_d_; + std::shared_ptr<::testing::NiceMock> mock_catalog_; +}; + +TEST_P(RewriteManifestsTest, RewriteManifestsAppendedDirectly) { + SetSnapshotIdInheritanceEnabled(); + ICEBERG_UNWRAP_OR_FAIL(auto new_manifest, + WriteAddedManifest("manifest-file-1", {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + append->AppendManifest(new_manifest); + EXPECT_THAT(append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return ""; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_snapshot_id}); +} + +TEST_P(RewriteManifestsTest, RewriteManifestsWithScanExecutor) { + SetSnapshotIdInheritanceEnabled(); + ICEBERG_UNWRAP_OR_FAIL(auto new_manifest, + WriteAddedManifest("manifest-file-1", {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + append->AppendManifest(new_manifest); + EXPECT_THAT(append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + test::ThreadExecutor executor; + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return ""; }).ScanManifestsWith(executor); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + EXPECT_GT(executor.submit_count(), 0); + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectManifestEntries(manifests[0], {file_a_->file_path}, {ManifestStatus::kExisting}); +} + +TEST_P(RewriteManifestsTest, RewriteManifestsGeneratedAndAppendedDirectly) { + SetSnapshotIdInheritanceEnabled(); + ICEBERG_UNWRAP_OR_FAIL(auto new_manifest, + WriteAddedManifest("manifest-file-1", {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto append_manifest, table_->NewFastAppend()); + append_manifest->AppendManifest(new_manifest); + EXPECT_THAT(append_manifest->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_append_snapshot, table_->current_snapshot()); + const int64_t manifest_append_id = manifest_append_snapshot->snapshot_id; + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto file_append_snapshot, table_->current_snapshot()); + const int64_t file_append_id = file_append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return ""; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectManifestEntriesWithSnapshotIds( + manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}, + {manifest_append_id, file_append_id}); +} + +TEST_P(RewriteManifestsTest, RewriteManifestsWithoutClusterBy) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_id = append_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + EXPECT_NE(manifests[0].manifest_path, before[0].manifest_path); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_id}); +} + +TEST_P(RewriteManifestsTest, RewriteIfWithoutClusterBy) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_a, table_->current_snapshot()); + const int64_t append_id_a = append_snapshot_a->snapshot_id; + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_b, table_->current_snapshot()); + const int64_t append_id_b = append_snapshot_b->snapshot_id; + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_c, table_->current_snapshot()); + const int64_t append_id_c = append_snapshot_c->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = ReadManifestEntries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_a_->file_path; + }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto kept_manifest, + ManifestContainingPath(manifests, file_a_->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto rewritten_manifest, + ManifestContainingPath(manifests, file_b_->file_path)); + + ExpectManifestEntriesWithSnapshotIds(*kept_manifest, {file_a_->file_path}, + {ManifestStatus::kAdded}, {append_id_a}); + ExpectManifestEntriesWithSnapshotIds( + *rewritten_manifest, {file_b_->file_path, file_c_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}, {append_id_b, append_id_c}); +} + +TEST_P(RewriteManifestsTest, ReplaceManifestsSeparate) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_id = append_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile& file) { return file.file_path; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_a, + ManifestContainingPath(manifests, file_a_->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_b, + ManifestContainingPath(manifests, file_b_->file_path)); + ExpectManifestEntriesWithSnapshotIds(*manifest_a, {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_id}); + ExpectManifestEntriesWithSnapshotIds(*manifest_b, {file_b_->file_path}, + {ManifestStatus::kExisting}, {append_id}); +} + +TEST_P(RewriteManifestsTest, ReplaceManifestsConsolidate) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_a, table_->current_snapshot()); + const int64_t append_id_a = append_snapshot_a->snapshot_id; + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_b, table_->current_snapshot()); + const int64_t append_id_b = append_snapshot_b->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectManifestEntriesWithSnapshotIds( + manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}, {append_id_a, append_id_b}); +} + +TEST_P(RewriteManifestsTest, ReplaceManifestsWithFilter) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_a, table_->current_snapshot()); + const int64_t append_id_a = append_snapshot_a->snapshot_id; + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_b, table_->current_snapshot()); + const int64_t append_id_b = append_snapshot_b->snapshot_id; + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_c, table_->current_snapshot()); + const int64_t append_id_c = append_snapshot_c->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 3U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + rewrite->RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = ReadManifestEntries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_a_->file_path; + }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + + const ManifestFile* kept_manifest = nullptr; + const ManifestFile* rewritten_manifest = nullptr; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadManifestEntries(manifest)); + ASSERT_FALSE(entries.empty()); + ASSERT_NE(entries[0].data_file, nullptr); + if (entries[0].data_file->file_path == file_a_->file_path) { + kept_manifest = &manifest; + } else { + rewritten_manifest = &manifest; + } + } + + ASSERT_NE(rewritten_manifest, nullptr); + ExpectManifestEntriesWithSnapshotIds( + *rewritten_manifest, {file_b_->file_path, file_c_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}, {append_id_b, append_id_c}); + + ASSERT_NE(kept_manifest, nullptr); + ExpectManifestEntriesWithSnapshotIds(*kept_manifest, {file_a_->file_path}, + {ManifestStatus::kAdded}, {append_id_a}); +} + +TEST_P(RewriteManifestsTest, ReplaceManifestsMaxSize) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()), "1"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + std::ranges::sort(manifests, {}, &ManifestFile::manifest_path); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}, {append_snapshot_id}); +} + +TEST_P(RewriteManifestsTest, ConcurrentRewriteManifest) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_a, table_->current_snapshot()); + const int64_t append_id_a = append_snapshot_a->snapshot_id; + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_b, table_->current_snapshot()); + const int64_t append_id_b = append_snapshot_b->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(static_cast(*rewrite).Apply(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto concurrent, table_->NewRewriteManifests()); + concurrent->ClusterBy([](const DataFile&) { return "file"; }); + concurrent->RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = ReadManifestEntries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_a_->file_path; + }); + EXPECT_THAT(concurrent->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto after_concurrent, CurrentManifests()); + ASSERT_EQ(after_concurrent.size(), 2U); + + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectManifestEntriesWithSnapshotIds( + manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}, {append_id_a, append_id_b}); +} + +TEST_P(RewriteManifestsTest, AppendDuringRewriteManifest) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_a, table_->current_snapshot()); + const int64_t append_id_a = append_snapshot_a->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(static_cast(*rewrite).Apply(), IsOk()); + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot_b, table_->current_snapshot()); + const int64_t append_id_b = append_snapshot_b->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto after_append, CurrentManifests()); + ASSERT_EQ(after_append.size(), 2U); + + // commit the rewrite manifests in progress + EXPECT_THAT(rewrite->Commit(), IsOk()); + + // the rewrite should only affect the first manifest, so we will end up with 2 manifests + // even though we have a single cluster key, rewritten one should be the first in the + // list + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_id_a}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kAdded}, {append_id_b}); +} + +TEST_P(RewriteManifestsTest, RewriteManifestDuringAppend) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + append->AppendFile(file_b_); + EXPECT_THAT(static_cast(*append).Apply(), IsOk()); + + // rewrite the manifests - only affects the first + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewritten_manifests, CurrentManifests()); + ASSERT_EQ(rewritten_manifests.size(), 1U); + + // commit the append in progress + EXPECT_THAT(append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto file_append_snapshot, table_->current_snapshot()); + const int64_t file_append_id = file_append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_b_->file_path}, + {ManifestStatus::kAdded}, {file_append_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_a_->file_path}, + {ManifestStatus::kExisting}, {append_snapshot_id}); +} + +TEST_P(RewriteManifestsTest, BasicManifestReplacement) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, WriteExistingManifest("rewrite-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, WriteExistingManifest("rewrite-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + if (GetParam() == 1) { + EXPECT_NE(manifests[0].manifest_path, manifest_a.manifest_path); + EXPECT_NE(manifests[1].manifest_path, manifest_b.manifest_path); + } else { + EXPECT_EQ(manifests[0].manifest_path, manifest_a.manifest_path); + EXPECT_EQ(manifests[1].manifest_path, manifest_b.manifest_path); + } + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[2], + {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {second_snapshot_id, second_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto after, table_->current_snapshot()); + ValidateSummary(after, /*replaced=*/1, /*kept=*/1, /*created=*/2, + /*entry_count=*/0); +} + +TEST_P(RewriteManifestsTest, BasicManifestReplacementWithSnapshotIdInheritance) { + SetSnapshotIdInheritanceEnabled(); + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("rewrite-inherit-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, + WriteExistingManifest("rewrite-inherit-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + EXPECT_EQ(manifests[0].manifest_path, manifest_a.manifest_path); + EXPECT_EQ(manifests[1].manifest_path, manifest_b.manifest_path); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[2], + {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {second_snapshot_id, second_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto after, table_->current_snapshot()); + ValidateSummary(after, /*replaced=*/1, /*kept=*/1, /*created=*/2, + /*entry_count=*/0); + + // validate that any subsequent operation does not fail + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFromRowFilter(Expressions::AlwaysTrue()); + EXPECT_THAT(delete_files->Commit(), IsOk()); +} + +TEST_P(RewriteManifestsTest, WithMultiplePartitionSpec) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto initial_manifests, CurrentManifests()); + ASSERT_EQ(initial_manifests.size(), 1U); + const int32_t initial_spec_id = initial_manifests[0].partition_spec_id; + ICEBERG_UNWRAP_OR_FAIL(auto initial_spec, + table_->metadata()->PartitionSpecById(initial_spec_id)); + ASSERT_EQ(initial_spec->fields().size(), 1U); + const int32_t initial_partition_field_id = initial_spec->fields()[0].field_id(); + + // Build the new spec using the table's schema, which uses fresh partition IDs. + ICEBERG_UNWRAP_OR_FAIL(auto update_spec, table_->NewUpdatePartitionSpec()); + update_spec->AddField(Expressions::Bucket("y", 16)) + .AddField(Expressions::Bucket("z", 4)); + EXPECT_THAT(update_spec->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto new_spec, table_->spec()); + ASSERT_EQ(new_spec->fields().size(), 3U); + EXPECT_EQ(new_spec->fields()[1].name(), "y_bucket_16"); + EXPECT_EQ(new_spec->fields()[1].transform()->ToString(), "bucket[16]"); + EXPECT_GT(new_spec->fields()[1].field_id(), initial_partition_field_id); + EXPECT_EQ(new_spec->fields()[2].name(), "z_bucket_4"); + EXPECT_EQ(new_spec->fields()[2].transform()->ToString(), "bucket[4]"); + EXPECT_GT(new_spec->fields()[2].field_id(), new_spec->fields()[1].field_id()); + + std::vector partition_y{Literal::Long(/*x=*/5L), + Literal::Int(/*y_bucket_16=*/2), + Literal::Int(/*z_bucket_4=*/3)}; + auto file_y = MakeDataFile("y", new_spec, /*record_count=*/1, /*size=*/10, + std::move(partition_y)); + std::vector partition_z{Literal::Long(/*x=*/7L), + Literal::Int(/*y_bucket_16=*/2), + Literal::Int(/*z_bucket_4=*/4)}; + auto file_z = MakeDataFile("z", new_spec, /*record_count=*/1, /*size=*/10, + std::move(partition_z)); + ASSERT_THAT(AppendFiles({file_y}), IsOk()); + ASSERT_THAT(AppendFiles({file_z}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 3U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + // try to cluster in 1 manifest file, but because of 2 partition specs + // we should still have 2 manifest files. + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + EXPECT_NE(manifests[1].partition_spec_id, manifests[0].partition_spec_id); + MatchNumberOfManifestFileWithSpecId(manifests, initial_spec_id, + /*expected_manifest_count=*/1); + MatchNumberOfManifestFileWithSpecId(manifests, new_spec->spec_id(), + /*expected_manifest_count=*/1); + + EXPECT_EQ(manifests[0].existing_files_count.value_or(-1), 2); + EXPECT_EQ(manifests[1].existing_files_count.value_or(-1), 2); +} + +TEST_P(RewriteManifestsTest, ManifestSizeWithMultiplePartitionSpec) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto initial_manifests, CurrentManifests()); + ASSERT_EQ(initial_manifests.size(), 1U); + const int32_t initial_spec_id = initial_manifests[0].partition_spec_id; + + // Build the new spec using the table's schema, which uses fresh partition IDs. + ICEBERG_UNWRAP_OR_FAIL(auto update_spec, table_->NewUpdatePartitionSpec()); + update_spec->AddField(Expressions::Bucket("y", 16)) + .AddField(Expressions::Bucket("z", 4)); + EXPECT_THAT(update_spec->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto new_spec, table_->spec()); + + std::vector partition_y{Literal::Long(/*x=*/5L), + Literal::Int(/*y_bucket_16=*/2), + Literal::Int(/*z_bucket_4=*/3)}; + auto file_y = MakeDataFile("y", new_spec, /*record_count=*/1, /*size=*/10, + std::move(partition_y)); + ASSERT_THAT(AppendFiles({file_y}), IsOk()); + + std::vector partition_z{Literal::Long(/*x=*/7L), + Literal::Int(/*y_bucket_16=*/2), + Literal::Int(/*z_bucket_4=*/4)}; + auto file_z = MakeDataFile("z", new_spec, /*record_count=*/1, /*size=*/10, + std::move(partition_z)); + ASSERT_THAT(AppendFiles({file_z}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 3U); + + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()), "1"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 4U); + + MatchNumberOfManifestFileWithSpecId(manifests, initial_spec_id, + /*expected_manifest_count=*/2); + MatchNumberOfManifestFileWithSpecId(manifests, new_spec->spec_id(), + /*expected_manifest_count=*/2); + + EXPECT_EQ(manifests[0].existing_files_count.value_or(-1), 1); + EXPECT_EQ(manifests[1].existing_files_count.value_or(-1), 1); + EXPECT_EQ(manifests[2].existing_files_count.value_or(-1), 1); + EXPECT_EQ(manifests[3].existing_files_count.value_or(-1), 1); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementConcurrentAppend) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("concurrent-append-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, + WriteExistingManifest("concurrent-append-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto after_append, CurrentManifests()); + ASSERT_EQ(after_append.size(), 2U); + + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[2], + {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {second_snapshot_id, second_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto after, table_->current_snapshot()); + ValidateSummary(after, /*replaced=*/1, /*kept=*/1, /*created=*/2, + /*entry_count=*/0); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementConcurrentDelete) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("concurrent-delete-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, + WriteExistingManifest("concurrent-delete-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_c_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto third_snapshot, table_->current_snapshot()); + const int64_t third_snapshot_id = third_snapshot->snapshot_id; + + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + ExpectManifestEntriesWithSnapshotIds( + manifests[2], {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kDeleted, ManifestStatus::kExisting}, + {third_snapshot_id, second_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto after, table_->current_snapshot()); + ValidateSummary(after, /*replaced=*/1, /*kept=*/1, /*created=*/2, + /*entry_count=*/0); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementConcurrentConflictingDelete) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("conflicting-delete-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, + WriteExistingManifest("conflicting-delete-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("could not be found in the latest snapshot")); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementCombinedWithRewrite) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, CurrentManifests()); + ASSERT_EQ(first_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + ASSERT_THAT(AppendFiles({file_d_}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL( + auto new_manifest, + WriteExistingManifest("combined-a", first_snapshot_id, {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_manifests[0]) + .AddManifest(new_manifest) + .ClusterBy([](const DataFile&) { return "const-value"; }) + .RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = ReadManifestEntries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_b_->file_path; + }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + + ExpectManifestEntriesWithSnapshotIds(manifests[2], {file_b_->file_path}, + {ManifestStatus::kAdded}, {second_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ValidateSummary(snapshot, /*replaced=*/3, /*kept=*/1, /*created=*/2, + /*entry_count=*/2); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementCombinedWithRewriteConcurrentDelete) { + SetManifestMergeEnabled(false); + + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + ASSERT_EQ(table_->snapshots().size(), 3U); + + ICEBERG_UNWRAP_OR_FAIL( + auto new_manifest, + WriteExistingManifest("combined-concurrent-delete-a", first_snapshot_id, {file_a_}, + ManifestContent::kData, nullptr, + first_snapshot->sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(new_manifest) + .ClusterBy([](const DataFile&) { return "const-value"; }); + + EXPECT_THAT(static_cast(*rewrite).Apply(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_c_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + + ExpectManifestEntriesWithSnapshotIds(manifests[0], {file_b_->file_path}, + {ManifestStatus::kExisting}, {second_snapshot_id}); + + ExpectManifestEntriesWithSnapshotIds(manifests[1], {file_a_->file_path}, + {ManifestStatus::kExisting}, {first_snapshot_id}); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ValidateSummary(snapshot, /*replaced=*/3, /*kept=*/0, /*created=*/2, + /*entry_count=*/1); +} + +TEST_P(RewriteManifestsTest, InvalidUsage) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + const auto manifest = manifests[0]; + + ICEBERG_UNWRAP_OR_FAIL( + auto invalid_added_manifest, + WriteAddedManifest("manifest-file-2", {file_a_}, snapshot->snapshot_id, + snapshot->sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite_added, table_->NewRewriteManifests()); + rewrite_added->DeleteManifest(manifest).AddManifest(invalid_added_manifest); + auto added_result = rewrite_added->Commit(); + EXPECT_THAT(added_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(added_result, HasErrorMessage("Cannot add manifest with added files")); + + ICEBERG_UNWRAP_OR_FAIL( + auto invalid_deleted_manifest, + WriteDeletedManifest("manifest-file-3", {file_a_}, snapshot->snapshot_id, + snapshot->sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite_deleted, table_->NewRewriteManifests()); + rewrite_deleted->DeleteManifest(manifest).AddManifest(invalid_deleted_manifest); + auto deleted_result = rewrite_deleted->Commit(); + EXPECT_THAT(deleted_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(deleted_result, HasErrorMessage("Cannot add manifest with deleted files")); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite_missing, table_->NewRewriteManifests()); + rewrite_missing->DeleteManifest(manifest); + auto missing_result = rewrite_missing->Commit(); + EXPECT_THAT(missing_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(missing_result, + HasErrorMessage("Replaced and created manifests must have the same")); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementFailure) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + const auto first_snapshot_manifest = first_snapshot_manifests[0]; + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(second_snapshot_manifests.size(), 2U); + const auto second_snapshot_manifest = second_snapshot_manifests[0]; + + ICEBERG_UNWRAP_OR_FAIL( + auto new_manifest, + WriteExistingManifest("manifest-file", {{file_a_, first_snapshot_id}, + {file_b_, second_snapshot_id}})); + EXPECT_TRUE(FileExists(new_manifest.manifest_path)); + + SetCommitRetryProperties(1); + BindTableWithFailingCommits(/*failures=*/5); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifest) + .DeleteManifest(second_snapshot_manifest) + .AddManifest(new_manifest); + + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("Injected failure")); + EXPECT_TRUE(FileExists(new_manifest.manifest_path)); +} + +TEST_P(RewriteManifestsTest, ManifestReplacementFailureWithSnapshotIdInheritance) { + SetSnapshotIdInheritanceEnabled(); + + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + const auto first_snapshot_manifest = first_snapshot_manifests[0]; + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot, table_->current_snapshot()); + const int64_t second_snapshot_id = second_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto second_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(second_snapshot_manifests.size(), 2U); + const auto second_snapshot_manifest = second_snapshot_manifests[0]; + + ICEBERG_UNWRAP_OR_FAIL( + auto new_manifest, + WriteExistingManifest("manifest-file", {{file_a_, first_snapshot_id}, + {file_b_, second_snapshot_id}})); + EXPECT_TRUE(FileExists(new_manifest.manifest_path)); + + SetCommitRetryProperties(1); + BindTableWithFailingCommits(/*failures=*/5); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifest) + .DeleteManifest(second_snapshot_manifest) + .AddManifest(new_manifest); + + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("Injected failure")); + EXPECT_TRUE(FileExists(new_manifest.manifest_path)); +} + +TEST_P(RewriteManifestsTest, RewriteManifestsOnBranchUnsupported) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->SetTargetBranch("someBranch"); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, + HasErrorMessage("Cannot commit to branch someBranch: RewriteManifests " + "does not support branch commits")); +} + +TEST_P(RewriteManifestsTest, RewriteDataManifestsPreservesDeletes) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + const int64_t append_snapshot_sequence_number = append_snapshot->sequence_number; + + auto delete_a = MakeDeleteFile("a-pos-deletes", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("a2-deletes", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_snapshot, table_->current_snapshot()); + const int64_t delete_snapshot_id = delete_snapshot->snapshot_id; + const int64_t delete_snapshot_sequence_number = delete_snapshot->sequence_number; + + ICEBERG_UNWRAP_OR_FAIL(auto data_before, CurrentManifests()); + ICEBERG_UNWRAP_OR_FAIL(auto deletes_before, CurrentDeleteManifests()); + ASSERT_EQ(data_before.size(), 1U); + ASSERT_EQ(deletes_before.size(), 1U); + const std::string delete_manifest_path = deletes_before[0].manifest_path; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile& file) { return file.file_path; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, CurrentManifests()); + ASSERT_EQ(data_manifests.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto data_manifest_a, + ManifestContainingPath(data_manifests, file_a_->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto data_manifest_b, + ManifestContainingPath(data_manifests, file_b_->file_path)); + ExpectManifestEntriesWithSequenceNumbers( + *data_manifest_a, {file_a_->file_path}, {ManifestStatus::kExisting}, + {append_snapshot_id}, {append_snapshot_sequence_number}, + {append_snapshot_sequence_number}); + ExpectManifestEntriesWithSequenceNumbers( + *data_manifest_b, {file_b_->file_path}, {ManifestStatus::kExisting}, + {append_snapshot_id}, {append_snapshot_sequence_number}, + {append_snapshot_sequence_number}); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(delete_manifests.size(), 1U); + EXPECT_EQ(delete_manifests[0].manifest_path, delete_manifest_path); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[0], {delete_a->file_path, delete_a2->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {delete_snapshot_id, delete_snapshot_id}, + {delete_snapshot_sequence_number, delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number, delete_snapshot_sequence_number}); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries, ReadManifestEntries(delete_manifests[0])); + std::unordered_map delete_content_by_path; + for (const auto& entry : delete_entries) { + ASSERT_NE(entry.data_file, nullptr); + delete_content_by_path.emplace(entry.data_file->file_path, entry.data_file->content); + } + EXPECT_EQ(delete_content_by_path.at(delete_a->file_path), + DataFile::Content::kPositionDeletes); + EXPECT_EQ(delete_content_by_path.at(delete_a2->file_path), + DataFile::Content::kEqualityDeletes); +} + +TEST_P(RewriteManifestsTest, ReplaceDeleteManifestsOnly) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + const int64_t append_snapshot_sequence_number = append_snapshot->sequence_number; + + auto delete_a = MakeDeleteFile("replace-delete-a", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("replace-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_snapshot, table_->current_snapshot()); + const int64_t delete_snapshot_id = delete_snapshot->snapshot_id; + const int64_t delete_snapshot_sequence_number = delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto original_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(original_delete_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a, + WriteExistingManifest("delete-manifest-file-1", delete_snapshot_id, {delete_a}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a2, + WriteExistingManifest("delete-manifest-file-2", delete_snapshot_id, {delete_a2}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(original_delete_manifests[0]) + .AddManifest(new_delete_manifest_a) + .AddManifest(new_delete_manifest_a2); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, CurrentManifests()); + ASSERT_EQ(data_manifests.size(), 1U); + ExpectManifestEntriesWithSequenceNumbers( + data_manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {append_snapshot_id, append_snapshot_id}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(delete_manifests.size(), 2U); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[0], {delete_a->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[1], {delete_a2->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries_a, ReadManifestEntries(delete_manifests[0])); + ASSERT_EQ(delete_entries_a.size(), 1U); + ASSERT_NE(delete_entries_a[0].data_file, nullptr); + EXPECT_EQ(delete_entries_a[0].data_file->content, DataFile::Content::kPositionDeletes); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries_a2, + ReadManifestEntries(delete_manifests[1])); + ASSERT_EQ(delete_entries_a2.size(), 1U); + ASSERT_NE(delete_entries_a2[0].data_file, nullptr); + EXPECT_EQ(delete_entries_a2[0].data_file->content, DataFile::Content::kEqualityDeletes); +} + +TEST_P(RewriteManifestsTest, ReplaceDataAndDeleteManifests) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + const int64_t append_snapshot_sequence_number = append_snapshot->sequence_number; + + auto delete_a = MakeDeleteFile("replace-data-delete-a", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("replace-data-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_snapshot, table_->current_snapshot()); + const int64_t delete_snapshot_id = delete_snapshot->snapshot_id; + const int64_t delete_snapshot_sequence_number = delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto original_data_manifests, CurrentManifests()); + ICEBERG_UNWRAP_OR_FAIL(auto original_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(original_data_manifests.size(), 1U); + ASSERT_EQ(original_delete_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto new_data_manifest_a, + WriteExistingManifest("manifest-file-1", append_snapshot_id, + {file_a_}, ManifestContent::kData, nullptr, + append_snapshot_sequence_number, + append_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL(auto new_data_manifest_b, + WriteExistingManifest("manifest-file-2", append_snapshot_id, + {file_b_}, ManifestContent::kData, nullptr, + append_snapshot_sequence_number, + append_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a, + WriteExistingManifest("delete-manifest-file-1", delete_snapshot_id, {delete_a}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a2, + WriteExistingManifest("delete-manifest-file-2", delete_snapshot_id, {delete_a2}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(original_data_manifests[0]) + .AddManifest(new_data_manifest_a) + .AddManifest(new_data_manifest_b) + .DeleteManifest(original_delete_manifests[0]) + .AddManifest(new_delete_manifest_a) + .AddManifest(new_delete_manifest_a2); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, CurrentManifests()); + ASSERT_EQ(data_manifests.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto data_manifest_a, + ManifestContainingPath(data_manifests, file_a_->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto data_manifest_b, + ManifestContainingPath(data_manifests, file_b_->file_path)); + ExpectManifestEntriesWithSequenceNumbers( + *data_manifest_a, {file_a_->file_path}, {ManifestStatus::kExisting}, + {append_snapshot_id}, {append_snapshot_sequence_number}, + {append_snapshot_sequence_number}); + ExpectManifestEntriesWithSequenceNumbers( + *data_manifest_b, {file_b_->file_path}, {ManifestStatus::kExisting}, + {append_snapshot_id}, {append_snapshot_sequence_number}, + {append_snapshot_sequence_number}); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(delete_manifests.size(), 2U); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[0], {delete_a->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ExpectManifestEntryContents(delete_manifests[0], {delete_a->file_path}, + {DataFile::Content::kPositionDeletes}); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[1], {delete_a2->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ExpectManifestEntryContents(delete_manifests[1], {delete_a2->file_path}, + {DataFile::Content::kEqualityDeletes}); +} + +TEST_P(RewriteManifestsTest, DeleteManifestReplacementConcurrentAppend) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + const int64_t append_snapshot_sequence_number = append_snapshot->sequence_number; + + auto delete_a = MakeDeleteFile("concurrent-append-delete-a", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("concurrent-append-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_snapshot, table_->current_snapshot()); + const int64_t delete_snapshot_id = delete_snapshot->snapshot_id; + const int64_t delete_snapshot_sequence_number = delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto original_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(original_delete_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a, + WriteExistingManifest("delete-manifest-file-1", delete_snapshot_id, {delete_a}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a2, + WriteExistingManifest("delete-manifest-file-2", delete_snapshot_id, {delete_a2}, + ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(original_delete_manifests[0]) + .AddManifest(new_delete_manifest_a) + .AddManifest(new_delete_manifest_a2); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto concurrent_snapshot, table_->current_snapshot()); + const int64_t concurrent_snapshot_id = concurrent_snapshot->snapshot_id; + const int64_t concurrent_snapshot_sequence_number = + concurrent_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto data_before_rewrite, CurrentManifests()); + ICEBERG_UNWRAP_OR_FAIL(auto deletes_before_rewrite, CurrentDeleteManifests()); + ASSERT_EQ(data_before_rewrite.size(), 2U); + ASSERT_EQ(deletes_before_rewrite.size(), 1U); + + EXPECT_THAT(rewrite->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto rewrite_snapshot, table_->current_snapshot()); + ValidateSummary(rewrite_snapshot, /*replaced=*/1, /*kept=*/2, /*created=*/2, + /*entry_count=*/0); + + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, CurrentManifests()); + ASSERT_EQ(data_manifests.size(), 2U); + ICEBERG_UNWRAP_OR_FAIL(auto original_data_manifest, + ManifestContainingPath(data_manifests, file_a_->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto concurrent_data_manifest, + ManifestContainingPath(data_manifests, file_c_->file_path)); + ExpectManifestEntriesWithSequenceNumbers( + *original_data_manifest, {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {append_snapshot_id, append_snapshot_id}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}); + ExpectManifestEntriesWithSequenceNumbers( + *concurrent_data_manifest, {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {concurrent_snapshot_id, concurrent_snapshot_id}, + {concurrent_snapshot_sequence_number, concurrent_snapshot_sequence_number}, + {concurrent_snapshot_sequence_number, concurrent_snapshot_sequence_number}); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(delete_manifests.size(), 2U); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[0], {delete_a->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ExpectManifestEntryContents(delete_manifests[0], {delete_a->file_path}, + {DataFile::Content::kPositionDeletes}); + ExpectManifestEntriesWithSequenceNumbers( + delete_manifests[1], {delete_a2->file_path}, {ManifestStatus::kExisting}, + {delete_snapshot_id}, {delete_snapshot_sequence_number}, + {delete_snapshot_sequence_number}); + ExpectManifestEntryContents(delete_manifests[1], {delete_a2->file_path}, + {DataFile::Content::kEqualityDeletes}); +} + +TEST_P(RewriteManifestsTest, DeleteManifestReplacementConcurrentDeleteFileRemoval) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + const int64_t append_snapshot_sequence_number = append_snapshot->sequence_number; + + auto delete_a = MakeDeleteFile("concurrent-removal-delete-a", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("concurrent-removal-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_delete_snapshot, table_->current_snapshot()); + const int64_t first_delete_snapshot_id = first_delete_snapshot->snapshot_id; + const int64_t first_delete_snapshot_sequence_number = + first_delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto first_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(first_delete_manifests.size(), 1U); + const auto original_delete_manifest = first_delete_manifests[0]; + + auto delete_b = MakeDeleteFile("concurrent-removal-delete-b", 2, file_b_->file_path); + auto delete_c2 = MakeEqualityDeleteFile("concurrent-removal-delete-c2", 3); + ASSERT_THAT(AppendDeleteFiles({delete_b, delete_c2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_delete_snapshot, table_->current_snapshot()); + const int64_t second_delete_snapshot_id = second_delete_snapshot->snapshot_id; + const int64_t second_delete_snapshot_sequence_number = + second_delete_snapshot->sequence_number; + + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a, + WriteExistingManifest("delete-concurrent-removal-a", first_delete_snapshot_id, + {delete_a}, ManifestContent::kDeletes, nullptr, + first_delete_snapshot_sequence_number, + first_delete_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a2, + WriteExistingManifest("delete-concurrent-removal-a2", first_delete_snapshot_id, + {delete_a2}, ManifestContent::kDeletes, nullptr, + first_delete_snapshot_sequence_number, + first_delete_snapshot_sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(original_delete_manifest) + .AddManifest(new_delete_manifest_a) + .AddManifest(new_delete_manifest_a2); + + ASSERT_THAT(RemoveDeleteFiles({delete_b}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto concurrent_snapshot, table_->current_snapshot()); + const int64_t concurrent_snapshot_id = concurrent_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto data_before_rewrite, CurrentManifests()); + ICEBERG_UNWRAP_OR_FAIL(auto deletes_before_rewrite, CurrentDeleteManifests()); + ASSERT_EQ(data_before_rewrite.size(), 1U); + ASSERT_EQ(deletes_before_rewrite.size(), 2U); + + EXPECT_THAT(rewrite->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto rewrite_snapshot, table_->current_snapshot()); + ValidateSummary(rewrite_snapshot, /*replaced=*/1, /*kept=*/2, /*created=*/2, + /*entry_count=*/0); + + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, CurrentManifests()); + ASSERT_EQ(data_manifests.size(), 1U); + ExpectManifestEntriesWithSequenceNumbers( + data_manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}, + {append_snapshot_id, append_snapshot_id}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}, + {append_snapshot_sequence_number, append_snapshot_sequence_number}); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(delete_manifests.size(), 3U); + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifest_a, + ManifestContainingPath(delete_manifests, delete_a->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifest_a2, + ManifestContainingPath(delete_manifests, delete_a2->file_path)); + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifest_b, + ManifestContainingPath(delete_manifests, delete_b->file_path)); + ExpectManifestEntriesWithSequenceNumbers( + *delete_manifest_a, {delete_a->file_path}, {ManifestStatus::kExisting}, + {first_delete_snapshot_id}, {first_delete_snapshot_sequence_number}, + {first_delete_snapshot_sequence_number}); + ExpectManifestEntryContents(*delete_manifest_a, {delete_a->file_path}, + {DataFile::Content::kPositionDeletes}); + ExpectManifestEntriesWithSequenceNumbers( + *delete_manifest_a2, {delete_a2->file_path}, {ManifestStatus::kExisting}, + {first_delete_snapshot_id}, {first_delete_snapshot_sequence_number}, + {first_delete_snapshot_sequence_number}); + ExpectManifestEntryContents(*delete_manifest_a2, {delete_a2->file_path}, + {DataFile::Content::kEqualityDeletes}); + ExpectManifestEntriesWithSequenceNumbers( + *delete_manifest_b, {delete_b->file_path, delete_c2->file_path}, + {ManifestStatus::kDeleted, ManifestStatus::kExisting}, + {concurrent_snapshot_id, second_delete_snapshot_id}, + {second_delete_snapshot_sequence_number, second_delete_snapshot_sequence_number}, + {second_delete_snapshot_sequence_number, second_delete_snapshot_sequence_number}); + ExpectManifestEntryContents( + *delete_manifest_b, {delete_b->file_path, delete_c2->file_path}, + {DataFile::Content::kPositionDeletes, DataFile::Content::kEqualityDeletes}); +} + +TEST_P(RewriteManifestsTest, DeleteManifestReplacementConflictingDeleteFileRemoval) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_, file_b_, file_c_}), IsOk()); + + auto delete_a = MakeDeleteFile("conflicting-removal-delete-a", 1, file_a_->file_path); + auto delete_a2 = MakeEqualityDeleteFile("conflicting-removal-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a, delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_snapshot, table_->current_snapshot()); + const int64_t delete_snapshot_id = delete_snapshot->snapshot_id; + const int64_t delete_snapshot_sequence_number = delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto original_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(original_delete_manifests.size(), 1U); + const auto original_delete_manifest = original_delete_manifests[0]; + + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a, + WriteExistingManifest("delete-conflicting-removal-a", delete_snapshot_id, + {delete_a}, ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest_a2, + WriteExistingManifest("delete-conflicting-removal-a2", delete_snapshot_id, + {delete_a2}, ManifestContent::kDeletes, nullptr, + delete_snapshot_sequence_number, + delete_snapshot_sequence_number)); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(original_delete_manifest) + .AddManifest(new_delete_manifest_a) + .AddManifest(new_delete_manifest_a2); + + ASSERT_THAT(RemoveDeleteFiles({delete_a}), IsOk()); + + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage(std::format( + "Deleted manifest {} could not be found in the latest snapshot", + original_delete_manifest.manifest_path))); +} + +TEST_P(RewriteManifestsTest, DeleteManifestReplacementFailure) { + if (GetParam() == 1) { + GTEST_SKIP() << "Delete manifests require format version 2 or higher"; + } + + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + + auto delete_a = MakeDeleteFile("failure-delete-a", 1, file_a_->file_path); + ASSERT_THAT(AppendDeleteFiles({delete_a}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_delete_snapshot, table_->current_snapshot()); + const int64_t first_delete_snapshot_id = first_delete_snapshot->snapshot_id; + const int64_t first_delete_snapshot_sequence_number = + first_delete_snapshot->sequence_number; + + auto delete_a2 = MakeEqualityDeleteFile("failure-delete-a2", 1); + ASSERT_THAT(AppendDeleteFiles({delete_a2}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto second_delete_snapshot, table_->current_snapshot()); + const int64_t second_delete_snapshot_id = second_delete_snapshot->snapshot_id; + const int64_t second_delete_snapshot_sequence_number = + second_delete_snapshot->sequence_number; + ICEBERG_UNWRAP_OR_FAIL(auto original_delete_manifests, CurrentDeleteManifests()); + ASSERT_EQ(original_delete_manifests.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL( + auto new_delete_manifest, + WriteExistingDeleteManifest( + "delete-manifest-file", + {{delete_a, first_delete_snapshot_id, first_delete_snapshot_sequence_number, + first_delete_snapshot_sequence_number}, + {delete_a2, second_delete_snapshot_id, second_delete_snapshot_sequence_number, + second_delete_snapshot_sequence_number}})); + EXPECT_TRUE(FileExists(new_delete_manifest.manifest_path)); + + SetCommitRetryProperties(1); + BindTableWithFailingCommits(/*failures=*/5); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + for (const auto& original_delete_manifest : original_delete_manifests) { + rewrite->DeleteManifest(original_delete_manifest); + } + rewrite->AddManifest(new_delete_manifest); + + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("Injected failure")); + EXPECT_TRUE(FileExists(new_delete_manifest.manifest_path)); +} + +INSTANTIATE_TEST_SUITE_P(RewriteManifestVersions, RewriteManifestsTest, + ::testing::Values(1, 2, 3), + [](const ::testing::TestParamInfo& info) { + return "V" + std::to_string(info.param); + }); + +} // namespace iceberg diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index b0ad3d20f..53ee001e5 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -21,13 +21,18 @@ #include #include +#include #include +#include +#include #include #include #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/catalog/memory/in_memory_catalog.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/table.h" @@ -135,6 +140,114 @@ class UpdateTestBase : public ::testing::Test { EXPECT_THAT(result, HasErrorMessage(message)); } + Result> ReadManifestEntries(const ManifestFile& manifest) { + return ReadManifestEntries(std::span(&manifest, 1)); + } + + Result> ReadManifestEntries( + std::span manifests) { + std::vector result; + ICEBERG_ASSIGN_OR_RAISE(auto schema, table_->metadata()->Schema()); + for (const auto& manifest : manifests) { + ICEBERG_ASSIGN_OR_RAISE( + auto spec, table_->metadata()->PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + result.insert(result.end(), entries.begin(), entries.end()); + } + return result; + } + + void ExpectManifestEntries(const ManifestFile& manifest, + std::vector expected_paths, + std::vector expected_statuses) { + ASSERT_EQ(expected_statuses.size(), expected_paths.size()); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadManifestEntries(manifest)); + ASSERT_EQ(entries.size(), expected_paths.size()); + + std::vector> actual; + actual.reserve(entries.size()); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, entry.status); + } + + std::vector> expected; + expected.reserve(expected_paths.size()); + for (size_t i = 0; i < expected_paths.size(); ++i) { + expected.emplace_back(std::move(expected_paths[i]), expected_statuses[i]); + } + EXPECT_THAT(actual, ::testing::UnorderedElementsAreArray(expected)); + } + + void ExpectManifestEntriesWithSnapshotIds(const ManifestFile& manifest, + std::vector expected_paths, + std::vector expected_statuses, + std::vector expected_snapshot_ids) { + ASSERT_EQ(expected_statuses.size(), expected_paths.size()); + ASSERT_EQ(expected_snapshot_ids.size(), expected_paths.size()); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadManifestEntries(manifest)); + ASSERT_EQ(entries.size(), expected_paths.size()); + + std::vector> actual; + actual.reserve(entries.size()); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + ASSERT_TRUE(entry.snapshot_id.has_value()); + actual.emplace_back(entry.data_file->file_path, entry.status, + entry.snapshot_id.value()); + } + + std::vector> expected; + expected.reserve(expected_paths.size()); + for (size_t i = 0; i < expected_paths.size(); ++i) { + expected.emplace_back(std::move(expected_paths[i]), expected_statuses[i], + expected_snapshot_ids[i]); + } + EXPECT_THAT(actual, ::testing::UnorderedElementsAreArray(expected)); + } + + void ExpectManifestEntriesWithSequenceNumbers( + const ManifestFile& manifest, std::vector expected_paths, + std::vector expected_statuses, + std::vector expected_snapshot_ids, + std::vector expected_data_sequence_numbers, + std::vector expected_file_sequence_numbers) { + ASSERT_EQ(expected_statuses.size(), expected_paths.size()); + ASSERT_EQ(expected_snapshot_ids.size(), expected_paths.size()); + ASSERT_EQ(expected_data_sequence_numbers.size(), expected_paths.size()); + ASSERT_EQ(expected_file_sequence_numbers.size(), expected_paths.size()); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadManifestEntries(manifest)); + ASSERT_EQ(entries.size(), expected_paths.size()); + + std::vector> + actual; + actual.reserve(entries.size()); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + ASSERT_TRUE(entry.snapshot_id.has_value()); + ASSERT_TRUE(entry.sequence_number.has_value()); + ASSERT_TRUE(entry.file_sequence_number.has_value()); + actual.emplace_back(entry.data_file->file_path, entry.status, + entry.snapshot_id.value(), entry.sequence_number.value(), + entry.file_sequence_number.value()); + } + + std::vector> + expected; + expected.reserve(expected_paths.size()); + for (size_t i = 0; i < expected_paths.size(); ++i) { + expected.emplace_back(std::move(expected_paths[i]), expected_statuses[i], + expected_snapshot_ids[i], expected_data_sequence_numbers[i], + expected_file_sequence_numbers[i]); + } + EXPECT_THAT(actual, ::testing::UnorderedElementsAreArray(expected)); + } + TableIdentifier table_ident_; std::string table_location_; std::shared_ptr file_io_; diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 169e7ec90..df4501e0f 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -38,6 +38,7 @@ #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/rewrite_manifests.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -521,6 +522,13 @@ Result> Transaction::NewOverwrite() { return overwrite; } +Result> Transaction::NewRewriteManifests() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr rewrite_manifests, + RewriteManifests::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(rewrite_manifests)); + return rewrite_manifests; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 49b607d60..655be3e16 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -118,6 +118,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewOverwrite(); + /// \brief Create a new RewriteManifests to rewrite manifest layout. + Result> NewRewriteManifests(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 870badba0..39c387bb6 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -245,6 +245,7 @@ class FastAppend; class MergeAppend; class OverwriteFiles; class PendingUpdate; +class RewriteManifests; class RowDelta; class SetSnapshot; class SnapshotManager; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4ba4168d4..6a455dbd7 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -24,6 +24,7 @@ install_headers( 'merging_snapshot_update.h', 'overwrite_files.h', 'pending_update.h', + 'rewrite_manifests.h', 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index 4b3000652..eaccbc596 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -35,6 +35,10 @@ Status PendingUpdate::Commit() { if (!ctx_->transaction) { // Table-created path: no transaction exists yet, create a temporary one. ICEBERG_ASSIGN_OR_RAISE(auto txn, Transaction::Make(ctx_)); + auto self = weak_from_this().lock(); + if (self) { + ICEBERG_RETURN_UNEXPECTED(txn->AddUpdate(self)); + } auto apply_status = txn->Apply(*this); if (!apply_status.has_value()) { std::ignore = Finalize(std::unexpected(apply_status.error())); @@ -43,11 +47,15 @@ Status PendingUpdate::Commit() { auto commit_result = txn->Commit(); if (!commit_result.has_value()) { - std::ignore = Finalize(std::unexpected(commit_result.error())); + if (!self) { + std::ignore = Finalize(std::unexpected(commit_result.error())); + } return std::unexpected(commit_result.error()); } - std::ignore = Finalize(commit_result.value()->metadata().get()); + if (!self) { + std::ignore = Finalize(commit_result.value()->metadata().get()); + } return {}; } auto txn = ctx_->transaction->lock(); diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 19998ddb3..5e3415160 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -38,7 +38,8 @@ namespace iceberg { /// /// \note Implementations are expected to use builder pattern and errors /// should be handled by the ErrorCollector base class. -class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { +class ICEBERG_EXPORT PendingUpdate : public ErrorCollector, + public std::enable_shared_from_this { public: enum class Kind : uint8_t { kExpireSnapshots, diff --git a/src/iceberg/update/rewrite_manifests.cc b/src/iceberg/update/rewrite_manifests.cc new file mode 100644 index 000000000..68f88a477 --- /dev/null +++ b/src/iceberg/update/rewrite_manifests.cc @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_manifests.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/inheritable_metadata.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" // IWYU pragma: keep +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/executor_util_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +void SetSnapshotId(ManifestFile& manifest, int64_t snapshot_id) { + manifest.added_snapshot_id = snapshot_id; +} + +struct RewriteCandidate { + ManifestFile manifest; + std::shared_ptr spec; +}; + +struct ManifestEntries { + ManifestFile manifest; + std::shared_ptr spec; + std::vector entries; +}; + +struct RewriteWriter { + std::shared_ptr spec; + ManifestContent content; + std::unique_ptr writer; +}; + +} // namespace + +Result> RewriteManifests::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create RewriteManifests without a context"); + return std::shared_ptr( + new RewriteManifests(std::move(table_name), std::move(ctx))); +} + +RewriteManifests::RewriteManifests(std::string table_name, + std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {} + +RewriteManifests& RewriteManifests::ClusterBy(ClusterByFunc func) { + ICEBERG_BUILDER_CHECK(static_cast(func), "Cluster function cannot be null"); + cluster_by_func_ = std::move(func); + return *this; +} + +RewriteManifests& RewriteManifests::RewriteIf(RewritePredicate predicate) { + ICEBERG_BUILDER_CHECK(static_cast(predicate), "Rewrite predicate cannot be null"); + predicate_ = std::move(predicate); + return *this; +} + +RewriteManifests& RewriteManifests::DeleteManifest(const ManifestFile& manifest) { + auto [_, inserted] = deleted_manifest_paths_.insert(manifest.manifest_path); + if (inserted) { + deleted_manifests_.push_back(manifest); + } + return *this; +} + +RewriteManifests& RewriteManifests::AddManifest(const ManifestFile& manifest) { + // Reject added/deleted files unconditionally, matching Java's checkArgument. A + // missing count is treated as non-zero (has_*_files defaults to true), so the + // error is reported at the AddManifest call site rather than deferred to Apply. + ICEBERG_BUILDER_CHECK(!manifest.has_added_files(), + "Cannot add manifest with added files"); + ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(), + "Cannot add manifest with deleted files"); + ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId, + "Snapshot id must be assigned during commit"); + ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber, + "Sequence number must be assigned during commit"); + + if (can_inherit_snapshot_id()) { + added_manifests_.push_back(manifest); + } else { + // The manifest must be rewritten with this update's snapshot ID. CopyManifest + // also validates that the manifest only contains existing entries. + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); + rewritten_added_manifests_.push_back(std::move(copied_manifest)); + } + return *this; +} + +Status RewriteManifests::ValidateTargetBranch(const std::string& branch) const { + return NotSupported( + "Cannot commit to branch {}: RewriteManifests does not support branch commits", + branch); +} + +std::string RewriteManifests::operation() { return DataOperation::kReplace; } + +Result> RewriteManifests::Apply( + const TableMetadata& /*metadata_to_update*/, + const std::shared_ptr& snapshot) { + ICEBERG_PRECHECK(snapshot != nullptr, + "Cannot rewrite manifests without a current snapshot"); + + SnapshotCache cached_snapshot(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto current_manifests, + cached_snapshot.Manifests(ctx_->table->io())); + + std::unordered_set current_manifest_paths; + current_manifest_paths.reserve(current_manifests.size()); + for (const auto& manifest : current_manifests) { + current_manifest_paths.insert(manifest.manifest_path); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateDeletedManifests(current_manifest_paths, snapshot->snapshot_id)); + + if (RequiresRewrite(current_manifest_paths)) { + ICEBERG_ASSIGN_OR_RAISE(auto rewritten, Rewrite(current_manifests)); + new_manifests_ = std::move(rewritten); + } else { + // Keep any existing manifests as-is that were not processed. Previously + // created manifests in new_manifests_ are reused across commit retries. + kept_manifests_.clear(); + for (const auto& manifest : current_manifests) { + if (!rewritten_manifest_paths_.contains(manifest.manifest_path) && + !deleted_manifest_paths_.contains(manifest.manifest_path)) { + kept_manifests_.push_back(manifest); + } + } + } + + ICEBERG_RETURN_UNEXPECTED(ValidateActiveFiles()); + + std::vector manifests; + manifests.reserve(new_manifests_.size() + added_manifests_.size() + + rewritten_added_manifests_.size() + kept_manifests_.size()); + + const int64_t snapshot_id = SnapshotId(); + for (auto& manifest : new_manifests_) { + SetSnapshotId(manifest, snapshot_id); + manifests.push_back(manifest); + } + for (auto& manifest : added_manifests_) { + SetSnapshotId(manifest, snapshot_id); + manifests.push_back(manifest); + } + for (auto& manifest : rewritten_added_manifests_) { + SetSnapshotId(manifest, snapshot_id); + manifests.push_back(manifest); + } + // Kept manifests are carried over unchanged, matching Java which adds them + // as-is without recomputing counts. + for (const auto& manifest : kept_manifests_) { + manifests.push_back(manifest); + } + + manifest_count_summary_ = BuildManifestCountSummary( + manifests, + static_cast(rewritten_manifests_.size() + deleted_manifests_.size())); + return manifests; +} + +std::unordered_map RewriteManifests::Summary() { + summary_.Clear(); + summary_.SetPartitionSummaryLimit(0); + for (const auto& [property, value] : custom_summary_properties_) { + summary_.Set(property, value); + } + summary_.Merge(manifest_count_summary_); + summary_.Set(SnapshotSummaryFields::kEntriesProcessed, std::to_string(entry_count_)); + return summary_.Build(); +} + +void RewriteManifests::SetSummaryProperty(const std::string& property, + const std::string& value) { + custom_summary_properties_[property] = value; + SnapshotUpdate::SetSummaryProperty(property, value); +} + +Status RewriteManifests::CleanUncommitted( + const std::unordered_set& committed) { + if (committed.empty() && !cleanup_all_) { + return {}; + } + ICEBERG_RETURN_UNEXPECTED(DeleteUncommitted(new_manifests_, committed, + /*clear=*/false)); + ICEBERG_RETURN_UNEXPECTED(DeleteUncommitted(rewritten_added_manifests_, committed, + /*clear=*/false)); + return {}; +} + +Status RewriteManifests::Finalize(Result commit_result) { + if (!commit_result.has_value() && + commit_result.error().kind != ErrorKind::kCommitStateUnknown) { + cleanup_all_ = true; + } + auto status = SnapshotUpdate::Finalize(std::move(commit_result)); + cleanup_all_ = false; + return status; +} + +bool RewriteManifests::RequiresRewrite( + const std::unordered_set& current_manifest_paths) const { + const bool has_direct_replacements = !deleted_manifests_.empty() || + !added_manifests_.empty() || + !rewritten_added_manifests_.empty(); + if (!cluster_by_func_ && !predicate_ && has_direct_replacements) { + // Manifests are deleted and added directly, so don't rewrite unrelated manifests + // unless a clustering function or predicate explicitly requests it. + return false; + } + if (rewritten_manifests_.empty()) { + // nothing yet processed so perform a full rewrite + return true; + } + + // if any processed manifest is not in the current manifest list, perform a full rewrite + return std::ranges::any_of(rewritten_manifests_, [&](const ManifestFile& manifest) { + return !current_manifest_paths.contains(manifest.manifest_path); + }); +} + +bool RewriteManifests::MatchesPredicate(const ManifestFile& manifest) const { + return !predicate_ || predicate_(manifest); +} + +Status RewriteManifests::ValidateDeletedManifests( + const std::unordered_set& current_manifest_paths, + int64_t current_snapshot_id) const { + for (const auto& manifest : deleted_manifests_) { + if (!current_manifest_paths.contains(manifest.manifest_path)) { + return ValidationFailed( + "Deleted manifest {} could not be found in the latest snapshot {}", + manifest.manifest_path, current_snapshot_id); + } + } + return {}; +} + +Status RewriteManifests::ValidateActiveFiles() const { + // Compare the number of active (added + existing) files between created and + // replaced manifests using the persisted manifest counts, mirroring Java's + // BaseRewriteManifests.validateFilesCounts. This avoids re-reading manifest + // entries on every apply, including commit retries. + auto accumulate_active_files = [](const std::vector& manifests, + int64_t& active_files) -> Status { + for (const auto& manifest : manifests) { + if (!manifest.added_files_count.has_value() || + !manifest.existing_files_count.has_value()) { + return ValidationFailed("Missing file counts in {}", manifest.manifest_path); + } + active_files += manifest.added_files_count.value(); + active_files += manifest.existing_files_count.value(); + } + return {}; + }; + + int64_t created_active_files = 0; + ICEBERG_RETURN_UNEXPECTED( + accumulate_active_files(new_manifests_, created_active_files)); + ICEBERG_RETURN_UNEXPECTED( + accumulate_active_files(added_manifests_, created_active_files)); + ICEBERG_RETURN_UNEXPECTED( + accumulate_active_files(rewritten_added_manifests_, created_active_files)); + + int64_t replaced_active_files = 0; + ICEBERG_RETURN_UNEXPECTED( + accumulate_active_files(rewritten_manifests_, replaced_active_files)); + ICEBERG_RETURN_UNEXPECTED( + accumulate_active_files(deleted_manifests_, replaced_active_files)); + + if (created_active_files != replaced_active_files) { + return ValidationFailed( + "Replaced and created manifests must have the same number of active files: {} " + "(new), {} (old)", + created_active_files, replaced_active_files); + } + return {}; +} + +Result RewriteManifests::CopyManifest(const ManifestFile& manifest) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(manifest.partition_spec_id)); + // For a rewritten manifest all entries must already carry explicit snapshot ids. + // Use empty inheritable metadata so reading throws if any snapshot id is missing, + // and existing snapshot ids are preserved (matching Java copyRewriteManifest). + ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty()); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, + ctx_->table->io(), schema, spec, + std::move(inheritable_metadata), manifest.first_row_id, + /*is_committed=*/false)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(base().format_version, SnapshotId(), ManifestPath(), + ctx_->table->io(), std::move(spec), std::move(schema), + manifest.content, manifest.first_row_id)); + for (const auto& entry : entries) { + // A rewritten added manifest may only contain existing entries. + if (entry.status == ManifestStatus::kAdded) { + return ValidationFailed("Cannot add manifest with added files"); + } + if (entry.status == ManifestStatus::kDeleted) { + return ValidationFailed("Cannot add manifest with deleted files"); + } + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); +} + +Result> RewriteManifests::Rewrite( + std::span current_manifests) { + ResetRewriteState(); + + using WriterKey = std::pair; + struct WriterKeyHash { + size_t operator()(const WriterKey& key) const { + size_t seed = std::hash{}(key.first); + seed ^= std::hash{}(key.second) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + return seed; + } + }; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); + std::vector rewrite_candidates; + rewrite_candidates.reserve(current_manifests.size()); + + for (const auto& manifest : current_manifests) { + if (deleted_manifest_paths_.contains(manifest.manifest_path)) { + continue; + } + if (manifest.content == ManifestContent::kDeletes || !MatchesPredicate(manifest)) { + kept_manifests_.push_back(manifest); + continue; + } + + rewritten_manifests_.push_back(manifest); + rewritten_manifest_paths_.insert(manifest.manifest_path); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(manifest.partition_spec_id)); + rewrite_candidates.push_back( + RewriteCandidate{.manifest = manifest, .spec = std::move(spec)}); + } + + auto file_io = ctx_->table->io(); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_entries, + ParallelCollect( + plan_executor(), rewrite_candidates, + [&](const RewriteCandidate& candidate) -> Result> { + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ManifestReader::Make(candidate.manifest, file_io, schema, + candidate.spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + std::vector result; + result.push_back(ManifestEntries{.manifest = candidate.manifest, + .spec = candidate.spec, + .entries = std::move(entries)}); + return result; + })); + + std::unordered_map writers; + + auto close_writer = + [](RewriteWriter& rewrite_writer) -> Result> { + if (rewrite_writer.writer == nullptr) { + return std::nullopt; + } + ICEBERG_RETURN_UNEXPECTED(rewrite_writer.writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, rewrite_writer.writer->ToManifestFile()); + rewrite_writer.writer.reset(); + return manifest_file; + }; + + auto new_writer = [this, &schema](const RewriteWriter& rewrite_writer) + -> Result> { + std::optional first_row_id = std::nullopt; + if (base().format_version >= 3 && rewrite_writer.content == ManifestContent::kData) { + // Rewritten manifests contain existing files only. Use a non-null manifest + // first_row_id so v3 manifest-list writing does not assign new row IDs for + // existing rows. + first_row_id = 0; + } + return ManifestWriter::MakeWriter(base().format_version, SnapshotId(), ManifestPath(), + ctx_->table->io(), rewrite_writer.spec, schema, + rewrite_writer.content, first_row_id); + }; + + std::vector result; + for (const auto& manifest_entry : manifest_entries) { + for (const auto& entry : manifest_entry.entries) { + ICEBERG_PRECHECK(entry.data_file != nullptr, + "Manifest entry in {} is missing data_file", + manifest_entry.manifest.manifest_path); + auto key = + WriterKey{cluster_by_func_ ? cluster_by_func_(*entry.data_file) : std::string{}, + manifest_entry.manifest.partition_spec_id}; + + auto writer_it = writers.find(key); + if (writer_it == writers.end()) { + auto [inserted_it, _] = writers.emplace( + key, RewriteWriter{.spec = manifest_entry.spec, + .content = manifest_entry.manifest.content}); + writer_it = inserted_it; + } + + auto& rewrite_writer = writer_it->second; + if (rewrite_writer.writer == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(rewrite_writer.writer, new_writer(rewrite_writer)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto length, rewrite_writer.writer->length()); + if (length >= target_manifest_size_bytes()) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, close_writer(rewrite_writer)); + if (manifest_file.has_value()) { + result.push_back(std::move(manifest_file).value()); + } + ICEBERG_ASSIGN_OR_RAISE(rewrite_writer.writer, new_writer(rewrite_writer)); + } + } + + ICEBERG_RETURN_UNEXPECTED(rewrite_writer.writer->WriteExistingEntry(entry)); + ++entry_count_; + } + } + + for (auto& [_, writer] : writers) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, close_writer(writer)); + if (manifest_file.has_value()) { + result.push_back(std::move(manifest_file).value()); + } + } + return result; +} + +Status RewriteManifests::DeleteUncommitted( + std::vector& manifests, + const std::unordered_set& committed, bool clear) { + for (const auto& manifest : manifests) { + if (!committed.contains(manifest.manifest_path)) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + if (clear) { + manifests.clear(); + } + return {}; +} + +void RewriteManifests::ResetRewriteState() { + std::ignore = DeleteUncommitted(new_manifests_, {}, /*clear=*/true); + entry_count_ = 0; + kept_manifests_.clear(); + rewritten_manifests_.clear(); + rewritten_manifest_paths_.clear(); +} + +} // namespace iceberg diff --git a/src/iceberg/update/rewrite_manifests.h b/src/iceberg/update/rewrite_manifests.h new file mode 100644 index 000000000..194c001ed --- /dev/null +++ b/src/iceberg/update/rewrite_manifests.h @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/rewrite_manifests.h + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/snapshot_update.h" + +namespace iceberg { + +/// \brief API for rewriting manifests for a table. +/// +/// This API accumulates manifest files, produces a new snapshot of the table +/// described only by the manifest files that were added, and commits that snapshot +/// as the current. +/// +/// This API can be used to rewrite matching manifests according to a clustering +/// function as well as to replace specific manifests. Manifests that are deleted +/// or added directly are ignored during the rewrite process. The set of active +/// files in replaced manifests must be the same as in new manifests. +/// +/// When committing, these changes will be applied to the latest table snapshot. +/// Commit conflicts will be resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. +class ICEBERG_EXPORT RewriteManifests : public SnapshotUpdate { + public: + using ClusterByFunc = std::function; + using RewritePredicate = std::function; + + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + /// \brief Group an existing data file by a cluster key. + /// + /// The cluster key determines which data file will be associated with a + /// particular manifest. All data files with the same cluster key will be written + /// to the same manifest unless the manifest is large and split into multiple + /// files. Manifests deleted via DeleteManifest or added via AddManifest are + /// ignored during the rewrite process. + RewriteManifests& ClusterBy(ClusterByFunc func); + + /// \brief Determine which existing manifest files should be rewritten. + /// + /// Manifests that do not match the predicate are kept as-is. If this is not + /// called and no predicate is set, all manifests will be rewritten. + RewriteManifests& RewriteIf(RewritePredicate predicate); + + /// \brief Delete a manifest file from the table. + RewriteManifests& DeleteManifest(const ManifestFile& manifest); + + /// \brief Add a manifest file to the table. + /// + /// The added manifest cannot contain new or deleted files. + /// + /// By default, the manifest will be rewritten to ensure all entries have + /// explicit snapshot IDs. In that case, it is always the responsibility of the + /// caller to manage the lifecycle of the original manifest. + /// + /// If manifest entries are allowed to inherit the snapshot ID assigned on + /// commit, the manifest should never be deleted manually if the commit succeeds + /// as it will become part of the table metadata and will be cleaned up on + /// expiry. If the manifest gets merged with others while preparing a new + /// snapshot, it will be deleted automatically if this operation is successful. + /// If the commit fails, the manifest will never be deleted and it is up to the + /// caller whether to delete or reuse it. + RewriteManifests& AddManifest(const ManifestFile& manifest); + + std::string operation() override; + + Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) override; + std::unordered_map Summary() override; + void SetSummaryProperty(const std::string& property, const std::string& value) override; + Status CleanUncommitted(const std::unordered_set& committed) override; + Status Finalize(Result commit_result) override; + + private: + explicit RewriteManifests(std::string table_name, + std::shared_ptr ctx); + + bool RequiresRewrite( + const std::unordered_set& current_manifest_paths) const; + bool MatchesPredicate(const ManifestFile& manifest) const; + Status ValidateDeletedManifests( + const std::unordered_set& current_manifest_paths, + int64_t current_snapshot_id) const; + Status ValidateActiveFiles() const; + + Result CopyManifest(const ManifestFile& manifest); + Result> Rewrite( + std::span current_manifests); + + Status DeleteUncommitted(std::vector& manifests, + const std::unordered_set& committed, bool clear); + void ResetRewriteState(); + Status ValidateTargetBranch(const std::string& branch) const override; + + private: + std::string table_name_; + ClusterByFunc cluster_by_func_; + RewritePredicate predicate_; + + std::vector deleted_manifests_; + std::unordered_set deleted_manifest_paths_; + std::vector added_manifests_; + std::vector rewritten_added_manifests_; + + std::vector kept_manifests_; + std::vector new_manifests_; + std::vector rewritten_manifests_; + std::unordered_set rewritten_manifest_paths_; + int64_t entry_count_{0}; + + std::unordered_map custom_summary_properties_; + SnapshotSummaryBuilder manifest_count_summary_; + bool cleanup_all_{false}; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index fa8dcbf12..0c6db9bd9 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -101,6 +101,11 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be empty"); } + if (auto status = static_cast(self).ValidateTargetBranch(branch); + !status) [[unlikely]] { + return self.AddError(status.error()); + } + if (auto ref_it = self.base().refs.find(branch); ref_it != self.base().refs.end()) { if (ref_it->second->type() != SnapshotRefType::kBranch) { return self.AddError(ErrorKind::kInvalidArgument, @@ -197,6 +202,9 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return {}; }; + /// \brief Validate whether this update can target the given branch. + virtual Status ValidateTargetBranch(const std::string& branch) const { return {}; } + /// \brief Apply the update's changes to the given metadata and snapshot. /// /// \param metadata_to_update The base table metadata to apply changes to @@ -233,6 +241,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::string ManifestPath(); std::string ManifestListPath(); + OptionalExecutor plan_executor() const { return plan_executor_; } SnapshotSummaryBuilder& summary_builder() { return summary_; } SnapshotSummaryBuilder BuildManifestCountSummary( std::span manifests, int32_t replaced_manifests_count);