Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.contrib.delta

import org.apache.spark.sql.types.StructType

/**
* Delta-specific planning info captured by `DeltaScanRule` and carried on the
* `CometDeltaScanMarker` as a field (rather than mutating the scan schema / `relation.options`).
* Read by `CometDeltaNativeScan.convert` when building the native scan operator.
*
* @param analyzedSchema
* the analysis-time Delta schema (`DeltaParquetFileFormat.referenceSchema`), captured while the
* original Delta file format is still present so column-mapping physical names / field-ids
* resolve against the schema the query was analyzed with rather than a re-resolved latest
* snapshot. `None` when the table has no reference schema.
* @param oneTaskPerPartition
* force one file per Spark partition (per-file `_metadata.file_path` projection needs 1:1
* file/partition alignment so per-file synthetic columns aren't dropped).
*/
case class DeltaScanMetadata(analyzedSchema: Option[StructType], oneTaskPerPartition: Boolean)

object DeltaScanMetadata {

/**
* Tag for the native Delta scan implementation, surfaced in `withFallbackReason` messages (e.g.
* "<ScanImpl> does not support encryption config"). Lives here -- the leaf metadata file -- so
* the claim/decline rule (`DeltaScanRule`) can name it without depending on the serde
* (`CometDeltaNativeScan`); the serde points its own usage at this constant.
*/
val ScanImpl: String = "native_delta_compat"
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.contrib.delta

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow}
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

/**
* Wraps a Delta `FileIndex` and augments each per-file `PartitionDirectory` with two synthetic
* partition values drawn from the corresponding Delta `AddFile`:
*
* - `baseRowIdColumnName` <- `AddFile.baseRowId`
* - `defaultRowCommitVersionColumnName` <- `AddFile.defaultRowCommitVersion`
*
* Used by Comet's row-tracking phase 3: `_row_id_` and `_row_commit_version_` values that are
* still null in their materialised physical columns get synthesised by the outer Project as:
*
* row_id = coalesce(materialised_row_id, baseRowIdCol + _tmp_metadata_row_index)
* row_commit_version = coalesce(materialised_row_commit_version, defaultRowCommitVersionCol)
*
* once the scan can see the per-file values as constant columns.
*
* The map from file basename (tail of `AddFile.path`) to `RowTrackingFileInfo` is supplied by the
* caller (via reflection on the delegate's `matchingFiles` API) so we don't need a compile-time
* dep on spark-delta. Each listed directory entry is split into one `PartitionDirectory` per file
* so each file's values travel with it.
*/
class RowTrackingAugmentedFileIndex(
delegate: FileIndex,
infoByFileName: Map[String, DeltaReflection.RowTrackingFileInfo],
baseRowIdColumnName: String,
defaultRowCommitVersionColumnName: String)
extends FileIndex {

override def rootPaths: Seq[Path] = delegate.rootPaths

override def inputFiles: Array[String] = delegate.inputFiles

override def refresh(): Unit = delegate.refresh()

override def sizeInBytes: Long = delegate.sizeInBytes

/** Appends both synthetic columns as Long, nullable partition columns. */
override def partitionSchema: StructType =
delegate.partitionSchema
.add(StructField(baseRowIdColumnName, LongType, nullable = true))
.add(StructField(defaultRowCommitVersionColumnName, LongType, nullable = true))

/**
* Delegates listing to the underlying FileIndex, then splits each returned `PartitionDirectory`
* into one-per-file directories, each carrying the original partition values PLUS the per-file
* baseRowId and defaultRowCommitVersion.
*
* The per-file split is unavoidable for correctness: `AddFile.baseRowId` is unique per file, so
* two files that share a Delta partition cannot share a `PartitionDirectory` once we inject the
* per-file synthetic columns. Scheduling parallelism is unaffected -- `FileSourceScanExec`
* flattens all PDs' files into `PartitionedFile`s and bin-packs them by `maxSplitBytes`, so PD
* granularity only governs how partition values get serialised with each file, not the number
* of tasks.
*/
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val underlying = delegate.listFiles(partitionFilters, dataFilters)
underlying.flatMap { pd =>
pd.files.map { fileStatus =>
val info = infoByFileName.getOrElse(
fileStatus.getPath.getName,
DeltaReflection.RowTrackingFileInfo(None, None))
// Use `pd.copy(...)` rather than `PartitionDirectory.apply(...)` so this
// compiles against both Spark 3.x (files: Seq[FileStatus]) and Spark 4.0
// (files: Seq[FileStatusWithMetadata]) without a per-version shim -- we
// round-trip the same element type we got from `pd.files`.
pd.copy(values = augmentPartitionValues(pd.values, info), files = Seq(fileStatus))
}
}
}

private def augmentPartitionValues(
original: InternalRow,
info: DeltaReflection.RowTrackingFileInfo): InternalRow = {
val n = original.numFields
val values = new Array[Any](n + 2)
var i = 0
while (i < n) {
values(i) = original.get(i, delegate.partitionSchema.fields(i).dataType)
i += 1
}
values(n) = info.baseRowId.map(Long.box).orNull
values(n + 1) = info.defaultRowCommitVersion.map(Long.box).orNull
new GenericInternalRow(values)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.contrib.delta.DeltaScanMetadata

/**
* Planning-time marker the Delta contrib's `DeltaScanRule` produces for a Delta scan it can
* accelerate. Mirrors how Iceberg uses `CometBatchScanExec` with `nativeIcebergScanMetadata`:
*
* - it wraps the ORIGINAL, untouched `FileSourceScanExec` (so its planning `logicalLink` is
* intact -- no rebuild, no AQE `setLogicalLinkForNewQueryStage` workaround), and
* - it carries the Delta-specific planning info as a [[DeltaScanMetadata]] FIELD (which survives
* node copies/AQE re-planning, unlike a `TreeNodeTag`), instead of mutating the scan's schema
* or smuggling it through `relation.options`.
*
* `CometExecRule` detects it by type (via `DeltaIntegration.isDeltaScanMarker`) and converts it to a
* `CometDeltaNativeScanExec` through the contrib serde. If conversion declines, the marker executes
* by delegating to the wrapped scan (i.e. vanilla Spark Delta read), so leaving it in the plan is
* safe.
*
* The accessors mirror the `FileSourceScanExec`/`CometScanExec` surface the serde reads
* (`relation`, `requiredSchema`, `partitionFilters`, `output`, `wrapped`) so the serde body is
* unchanged apart from reading metadata from [[deltaMetadata]].
*/
case class CometDeltaScanMarker(originalScan: FileSourceScanExec, deltaMetadata: DeltaScanMetadata)
extends LeafExecNode {

override def output: Seq[Attribute] = originalScan.output

def relation: HadoopFsRelation = originalScan.relation

def requiredSchema: StructType = originalScan.requiredSchema

def partitionFilters: Seq[Expression] = originalScan.partitionFilters

def dataFilters: Seq[Expression] = originalScan.dataFilters

/**
* Data filters Comet can push down -- delegated to `CometScanExec`'s logic (drops dynamic-pruning
* and array null-check filters). A transient `CometScanExec` is built just to reuse that lazy val.
*/
def supportedDataFilters: Seq[Expression] =
CometScanExec(originalScan, originalScan.relation.sparkSession).supportedDataFilters

/** The original scan; used as the produced exec's `originalPlan` (retains the logicalLink). */
def wrapped: FileSourceScanExec = originalScan

override def supportsColumnar: Boolean = originalScan.supportsColumnar

// The marker is normally converted by CometExecRule before execution. If conversion declines and
// the marker is left in the plan, fall back to the wrapped (vanilla Spark) Delta scan.
override protected def doExecute(): RDD[InternalRow] = originalScan.execute()

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = originalScan.executeColumnar()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.contrib.delta

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, input_file_name}

/**
* Coverage for the contrib-delta CLAIM/DECLINE layer (`DeltaScanRule` + `CometDeltaScanMarker`)
* that this unit introduces, independent of the native read path (the serde/exec land later).
*
* On this build there is no `CometDeltaNativeScan` serde, so `CometExecRule`'s `scanHandler`
* lookup returns `None` and a planted `CometDeltaScanMarker` is left in the plan executing as a
* vanilla Delta fallback. That makes the marker's PRESENCE the observable signal that the rule
* claimed the scan, and its absence the signal that the rule declined -- exactly what these tests
* assert. The native-read assertions live with the serde/exec unit.
*/
class CometDeltaMarkerSuite extends CometDeltaTestBase {

test("DeltaScanRule plants the marker on a plain Delta read (claim path active)") {
assume(deltaSparkAvailable, "io.delta.spark not on the test classpath")
withDeltaTable("marker-planted") { tablePath =>
spark.range(0, 100).toDF("id").write.format("delta").save(tablePath)
val df = spark.read.format("delta").load(tablePath)
// Red-green vs the A.2 build: with `DeltaScanRule$` absent (A.2 bridge only) no marker is
// planted; this unit supplies the rule, so the marker appears (then falls back to vanilla).
assertMarkerPlanted(df)
}
}

test("marker is planted on a filtered/projected read and the fallback stays result-correct") {
assume(deltaSparkAvailable, "io.delta.spark not on the test classpath")
withDeltaTable("marker-fallback-correct") { tablePath =>
spark.range(0, 100).selectExpr("id", "id * 2 as v").write.format("delta").save(tablePath)
val query = (df: DataFrame) => df.filter("id > 10").select("id", "v")
// Assert the rule actually CLAIMS this query shape (catches a claim-path regression, not just
// a result mismatch -- a disengaged claim path would still match rows since both sides run
// vanilla), AND that the marker's vanilla fallback returns identical rows.
assertMarkerPlanted(query(spark.read.format("delta").load(tablePath)))
assertResultsMatchVanilla(tablePath, query)
}
}

test("DeltaScanRule declines an input_file_name() projection (no marker, vanilla read)") {
assume(deltaSparkAvailable, "io.delta.spark not on the test classpath")
withDeltaTable("decline-input-file-name") { tablePath =>
spark.range(0, 50).toDF("id").write.format("delta").save(tablePath)
// `input_file_name()` forces a fall back to vanilla (per-file provenance the native scan
// can't surface), so the rule declines and plants no marker.
val df = spark.read.format("delta").load(tablePath).select(col("id"), input_file_name())
assertNoMarker(df)
assert(df.count() == 50L, "declined read must still return all rows via vanilla Spark")
}
}
}
Loading