Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.contrib.delta

import org.apache.comet.NativeBase

/**
* Contrib-local JVM handle to the Delta-specific native entry point.
*
* Extends `NativeBase` so the libcomet load triggers on first use of any subclass -- the contrib
* doesn't reload the library itself (there is exactly one libcomet at runtime), but inheriting
* from `NativeBase` ensures the static initializer ordering works the same way as core's
* `org.apache.comet.Native`. The `@native` method below binds to
* `Java_org_apache_comet_contrib_delta_Native_planDeltaScan` exported by the contrib's Rust crate
* (compiled INTO libcomet via the `contrib-delta` Cargo feature on `native/core`).
*/
class Native extends NativeBase {

/**
* Driver-side Delta log replay. Returns a prost-encoded `DeltaScanTaskList` proto (raw bytes)
* which the caller decodes via `DeltaScanTaskList.parseFrom(...)`.
*
* @param tableUrl
* absolute URL or bare path of the Delta table root
* @param snapshotVersion
* `-1` for the latest snapshot, otherwise an exact version
* @param storageOptions
* cloud credentials / endpoint overrides (Hadoop-style keys)
* @param predicateBytes
* prost-encoded Catalyst data filter for kernel-side stats-based file pruning, or an empty
* array for no predicate
* @param columnNames
* logical column names the caller requires (kernel uses this for column-mapping resolution
* before stats-based file pruning).
* @param projectedSchemaIpc
* the query's data-read columns in pure-logical names at every nesting level (Spark
* `requiredSchema` minus partition + synthetic columns), serialized as an Arrow IPC schema
* message (`Schema.serializeAsMessage()`). Drives `scan.with_schema(...)` so the returned
* `DeltaScanTaskList` carries kernel's projected `physical_schema` / `logical_schema`. Empty
* array for no projection (full-table scan; no kernel schemas returned).
* @return
* `byte[]` containing the encoded DeltaScanTaskList
*/
@native def planDeltaScan(
tableUrl: String,
snapshotVersion: Long,
storageOptions: java.util.Map[String, String],
predicateBytes: Array[Byte],
columnNames: Array[String],
projectedSchemaJson: String): Array[Byte]

/**
* Schema-only companion to [[planDeltaScan]] for the batch-file-index read path (file list comes
* from Delta `AddFile`s, but the kernel-read executor still needs kernel's resolved
* physical/logical schemas). Returns a `DeltaScanTaskList` with only `physical_schema` /
* `logical_schema` set (Arrow IPC). `projectedSchemaJson` is the data-read schema as Delta schema
* JSON (`StructType.json`, carrying column-mapping physicalName/id from the analysis-time or
* snapshot schema); empty string => zero data columns, no schemas returned.
*/
@native def planDeltaReadSchemas(
tableUrl: String,
snapshotVersion: Long,
storageOptions: java.util.Map[String, String],
projectedSchemaJson: String): Array[Byte]
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.apache.comet.serde.OperatorOuterClass
import org.apache.comet.serde.OperatorOuterClass.Operator

/**
* `PlanDataInjector` for the typed `OpStruct::DeltaScan` operator.
*
* The contrib serialises the Delta scan in two parts to keep the closure sent to every
* task small:
* - At planning time `CometDeltaNativeScan.convert` emits a `DeltaScan` proto with
* the `common` block (schemas, table root, filters, ...) and NO tasks; this lands
* in the `Operator` tree as the typed variant `OpStruct.delta_scan`.
* - Per partition, `CometDeltaNativeScanExec` puts the partition's `DeltaScan`
* (tasks-only) bytes into `perPartitionByKey` under a `sourceKey` derived from
* the common block.
*
* Core's `PlanDataInjector.injectPlanData` discovers this object via the reflective
* `Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector")` lookup added to
* `PlanDataInjector.injectors`; default builds get no DeltaPlanDataInjector class on
* the classpath and the injector list is unchanged.
*
* Without this injection the native side decodes a tasks-empty `DeltaScan` -> `EmptyExec`
* (0 rows) for every Delta scan.
*/
object DeltaPlanDataInjector extends PlanDataInjector {

override val opStructCase: Operator.OpStructCase = Operator.OpStructCase.DELTA_SCAN

override def canInject(op: Operator): Boolean = {
if (!op.hasDeltaScan) return false
// The common-only proto produced at planning time has zero tasks. After injection
// the operator carries the partition's tasks -- skip those (idempotent canInject).
//
// Note: a CDF read always has zero tasks (it carries a version sub-range, not files), so this
// stays true even after the CDF branch in `inject` runs. That's intentionally NOT idempotent-
// guarded the way the task branch is, and it's safe because `PlanDataInjector.injectPlanData`
// walks each operator exactly once per partition (CometExecRDD.compute -> one inject per op).
op.getDeltaScan.getTasksCount == 0
}

override def getKey(op: Operator): Option[String] =
Some(CometDeltaNativeScanExec.computeSourceKey(op))

override def inject(
op: Operator,
commonBytes: Array[Byte],
partitionBytes: Array[Byte]): Operator = {
// `partitionBytes` is the serialised `DeltaScan` that packs only this partition's
// tasks (no common block) to avoid duplicating schemas across partitions. Splice
// the partition's tasks into the original common-only envelope.
val partitionScan = OperatorOuterClass.DeltaScan.parseFrom(partitionBytes)
val originalScan = op.getDeltaScan
val mergedScanBuilder = OperatorOuterClass.DeltaScan
.newBuilder(originalScan)
.addAllTasks(partitionScan.getTasksList)
// CDF version-range split: a Change Data Feed read carries no tasks; instead the per-partition
// DeltaScan packs this partition's inclusive cdf sub-range in a minimal common (cdf_read marks
// it). Splice that [start, end] over the shared common's full range so each partition's native
// TableChanges read covers only its slice. Regular (non-CDF) per-partition bytes set no common,
// so this is skipped and only the task list is merged.
if (partitionScan.hasCommon && partitionScan.getCommon.getCdfRead) {
val pc = partitionScan.getCommon
val mergedCommon = originalScan.getCommon.toBuilder
mergedCommon.setCdfStartVersion(pc.getCdfStartVersion)
if (pc.hasCdfEndVersion) mergedCommon.setCdfEndVersion(pc.getCdfEndVersion)
else mergedCommon.clearCdfEndVersion()
mergedScanBuilder.setCommon(mergedCommon.build())
}
op.toBuilder.setDeltaScan(mergedScanBuilder.build()).build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.contrib.delta

// Deterministic mirror of DeltaColumnMappingSuite "column mapping batch scan should detect
// physical name changes" (id mode). df2 is analyzed before the table is overwritten with new
// physical names/field-ids; reading it afterward (schema-on-read check off) must yield NULLs.
// Native-only fresh collect (no vanilla-first collect, which would cache the pinned snapshot
// and mask the bug).
class CometDeltaColumnMappingPhysicalNameReproSuite extends CometDeltaTestBase {

test("column mapping batch scan should detect physical name changes [id]") {
assume(deltaSparkAvailable, "delta-spark not on the test classpath; skipping")
withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> "id") {
withDeltaTable("cm_physical_name") { tablePath =>
spark.range(10).toDF("id").write.format("delta").save(tablePath)
val df2 = spark.read.format("delta").load(tablePath)
df2.queryExecution.analyzed
withSQLConf(
"spark.databricks.delta.columnMapping.reuseColumnMetadataDuringOverwrite" -> "false") {
spark.range(10).toDF("id")
.write.format("delta").option("overwriteSchema", "true").mode("overwrite")
.save(tablePath)
}
withSQLConf("spark.databricks.delta.checkLatestSchemaOnRead" -> "false") {
val rows = df2.collect()
val nonNull = rows.count(!_.isNullAt(0))
assert(
rows.length == 10 && nonNull == 0,
s"stale physical name should read NULL: ${rows.length} rows, $nonNull non-null " +
s"(sample=${rows.take(5).map(r => if (r.isNullAt(0)) "null" else r.getLong(0)).toSeq})")
}
}
}
}
}
Loading