diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 174eae89c6..5aa9241614 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -358,6 +358,7 @@ jobs: org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.spark.sql.comet.PlanDataInjectorSuite org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite + org.apache.spark.sql.comet.CometScanWithPlanDataSuite org.apache.spark.sql.comet.util.UtilsSuite org.apache.comet.objectstore.NativeConfigSuite org.apache.spark.sql.CometToPrettyStringSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 2294e00363..b499ac2c87 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -174,6 +174,7 @@ jobs: org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.spark.sql.comet.PlanDataInjectorSuite org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite + org.apache.spark.sql.comet.CometScanWithPlanDataSuite org.apache.spark.sql.comet.util.UtilsSuite org.apache.comet.objectstore.NativeConfigSuite org.apache.spark.sql.CometToPrettyStringSuite diff --git a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala index 217f8bc314..8d97651078 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala @@ -83,9 +83,22 @@ case object CometPlanAdaptiveDynamicPruningFilters if icebergScan.runtimeFilters.exists(hasCometSAB) => logDebug("Converting AQE DPP for CometIcebergNativeScanExec") convertIcebergScanDPP(icebergScan, plan) + // Comet scans whose DPP filters live in a @transient field (the contrib's + // CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite + // them, and a rewritten copy is orphaned when the enclosing native block + // is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs the + // rewrite in place and returns `this`, so it lands on the executing + // instance. + case p: CometScanWithPlanData if p.dynamicPruningFilters.exists(hasCometSAB) => + logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in place") + p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f => convertFilter(f, plan))) case p: SparkPlan if !p.isInstanceOf[CometNativeScanExec] && !p.isInstanceOf[CometIcebergNativeScanExec] + // Contrib trait scans handle their own DPP in the arm above. Exclude them here too so + // a trait scan with a wrapped SAB but empty `dynamicPruningFilters` isn't misrouted to + // the non-Comet path (which mutates expressions in place via makeCopy). + && !p.isInstanceOf[CometScanWithPlanData] && hasWrappedSAB(p) => logDebug(s"Converting AQE DPP for non-Comet node: ${p.nodeName}") convertNonCometNodeDPP(p, plan) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala index e08cd5b29d..b0e4a63389 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala @@ -51,7 +51,7 @@ import org.apache.comet.serde.operator.CometIcebergNativeScan * `expressions` walk picks up the contained `DynamicPruningExpression(InSubqueryExec(...))`, and * the standard `prepare -> prepareSubqueries -> waitForSubqueries` lifecycle resolves it. The * lifecycle is invoked via `CometLeafExec.ensureSubqueriesResolved`, called from - * `CometNativeExec.findAllPlanData` before `commonData` is read. + * `PlanDataInjector.findAllPlanData` before `commonData` is read. */ case class CometIcebergNativeScanExec( override val nativeOp: Operator, @@ -69,12 +69,12 @@ case class CometIcebergNativeScanExec( /** * Lazy partition serialization, deferred until execution time. Triggered from `commonData` / - * `perPartitionData` (via `CometNativeExec.findAllPlanData`) and from `LazyIcebergMetric.value` - * (via Iceberg planning metrics). Lazy val semantics ensure single evaluation across entry - * points. + * `perPartitionData` (via `PlanDataInjector.findAllPlanData`) and from + * `LazyIcebergMetric.value` (via Iceberg planning metrics). Lazy val semantics ensure single + * evaluation across entry points. * * DPP InSubqueryExec values must already be resolved by the time this lazy val runs. - * `CometNativeExec.findAllPlanData` calls `ensureSubqueriesResolved` (which invokes Spark's + * `PlanDataInjector.findAllPlanData` calls `ensureSubqueriesResolved` (which invokes Spark's * `prepare -> waitForSubqueries`) before reading `commonData`. The `serializePartitions` call * below reads `originalPlan.runtimeFilters` indirectly through `inputRDD -> filteredPartitions` * and applies the resolved values to Iceberg's runtime filtering. `originalPlan.runtimeFilters` diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 0ce8547563..8287849d5e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -71,7 +71,8 @@ case class CometNativeScanExec( sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec - with ShimStreamSourceAwareSparkPlan { + with ShimStreamSourceAwareSparkPlan + with CometScanWithPlanData { override lazy val metadata: Map[String, String] = if (originalPlan != null) originalPlan.metadata else Map.empty @@ -159,7 +160,7 @@ case class CometNativeScanExec( // Outer partitionFilters (wrapper) DPP is resolved by Spark's standard // prepare -> waitForSubqueries lifecycle, triggered explicitly via // CometLeafExec.ensureSubqueriesResolved called from - // CometNativeExec.findAllPlanData before commonData is read. + // PlanDataInjector.findAllPlanData before commonData is read. // // Inner scan.partitionFilters holds a SEPARATE InSubqueryExec instance that // Spark's expressions walk does not see (scan is @transient and not a sibling diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index af9e1df8a3..357000a471 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -20,13 +20,16 @@ package org.apache.spark.sql.comet import java.util.Locale +import java.util.ServiceLoader import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal import org.apache.spark.{Partition, TaskContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} @@ -87,14 +90,34 @@ private[comet] trait PlanDataInjector { /** * Registry and utilities for injecting per-partition planning data into operator trees. */ -private[comet] object PlanDataInjector { - - // Registry of injectors for different operator types - private val injectors: Seq[PlanDataInjector] = Seq( - IcebergPlanDataInjector, - NativeScanPlanDataInjector - // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. - ) +private[comet] object PlanDataInjector extends Logging { + + // Registry of injectors for different operator types. The built-in injectors live in core. + // Out-of-tree contribs (e.g. contrib-delta's `DeltaPlanDataInjector`) are discovered via the + // standard JDK `ServiceLoader`: a contrib ships a + // `META-INF/services/org.apache.spark.sql.comet.PlanDataInjector` resource naming its + // implementation, so it joins the registry without core holding any compile-time reference or + // contrib-specific code. Default builds carry no such service file, so discovery yields nothing + // and the registry is exactly the built-ins -- zero contrib surface at runtime. + private[comet] val injectors: Seq[PlanDataInjector] = { + val builtin: Seq[PlanDataInjector] = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector) + val discovered: Seq[PlanDataInjector] = + try { + ServiceLoader.load(classOf[PlanDataInjector], getClass.getClassLoader).asScala.toSeq + } catch { + // A misbuilt contrib jar -- a malformed service file, or a listed provider that can't be + // instantiated -- surfaces as ServiceConfigurationError while the iterator is forced. + // Warn so it's diagnosable, then continue with the built-ins so the planner stays alive. + // NonFatal covers ServiceConfigurationError (it is not a LinkageError). + case NonFatal(e) => + logWarning( + "Failed to load contrib PlanDataInjector services; " + + "continuing with built-in injectors only", + e) + Seq.empty + } + builtin ++ discovered + } // O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op // `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls @@ -153,6 +176,73 @@ private[comet] object PlanDataInjector { codedOutput.checkNoSpaceLeft() bytes } + + /** + * Find all plan nodes with per-partition planning data in the plan tree. Returns two maps keyed + * by a unique identifier: one for common data (shared across partitions) and one for + * per-partition data. + * + * Recognises Iceberg scans (keyed by metadata_location) plus any leaf scan that surfaces its + * data via the [[CometScanWithPlanData]] trait (`CometNativeScanExec` and out-of-tree contrib + * scans such as the Delta contrib's `CometDeltaNativeScanExec`). + * + * Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid + * within the same stage. + * + * @return + * (commonByKey, perPartitionByKey) - common data is shared, per-partition varies + */ + private[comet] def findAllPlanData( + plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = { + plan match { + case iceberg: CometIcebergNativeScanExec => + // Trigger Spark's standard prepare -> waitForSubqueries lifecycle so DPP + // InSubqueryExec values are resolved before commonData is read. Without this, + // the parent CometNativeExec.executeQuery flow never invokes the scan's + // executeQuery, leaving DPP unresolved and forcing a sync-on-this await inside + // the serializedPartitionData lazy val initializer (a known deadlock surface). + iceberg.ensureSubqueriesResolved() + if (iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty) { + ( + Map(iceberg.metadataLocation -> iceberg.commonData), + Map(iceberg.metadataLocation -> iceberg.perPartitionData)) + } else { + (Map.empty, Map.empty) + } + + // Generic path for leaf scans that surface planning data via the + // `CometScanWithPlanData` trait. Catches `CometNativeScanExec` and any contrib + // leaf scan (e.g. the Delta contrib's `CometDeltaNativeScanExec`) without + // requiring core to compile-time reference contrib classes. The trait's + // `self: CometLeafExec` self-type guarantees this is also a `CometLeafExec`, so + // the compound pattern always matches and we can drive the subquery lifecycle + // directly -- there is no silent "not a leaf" skip. + case s: CometLeafExec with CometScanWithPlanData => + s.ensureSubqueriesResolved() + if (s.commonData.nonEmpty && s.perPartitionData.nonEmpty) { + (Map(s.sourceKey -> s.commonData), Map(s.sourceKey -> s.perPartitionData)) + } else { + (Map.empty, Map.empty) + } + + // Broadcast stages are boundaries - don't collect per-partition data from inside them. + // After DPP filtering, broadcast scans may have different partition counts than the + // probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions. + case _: BroadcastQueryStageExec | _: CometBroadcastExchangeExec => + (Map.empty, Map.empty) + + // Stage boundaries - stop searching (partition indices won't align after these) + case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | + _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | + _: ReusedExchangeExec | _: CometSparkToColumnarExec => + (Map.empty, Map.empty) + + // Continue searching through other operators, combining results from all children + case _ => + val results = plan.children.map(findAllPlanData) + (results.flatMap(_._1).toMap, results.flatMap(_._2).toMap) + } + } } /** @@ -539,7 +629,7 @@ abstract class CometNativeExec extends CometExec { } // Find planning data within this stage (stops at shuffle boundaries). - val (commonByKey, perPartitionByKey) = findAllPlanData(this) + val (commonByKey, perPartitionByKey) = PlanDataInjector.findAllPlanData(this) // Collect the input batches from the child operators. Non-shuffle inputs become // RDD[ArrowArrayStream] (one stream per partition, exported via the C Stream Interface @@ -718,8 +808,16 @@ abstract class CometNativeExec extends CometExec { */ def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { plan match { - case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | - _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | + // Match `CometLeafExec` first so contrib leaf scans (e.g. the Delta + // contrib's `CometDeltaNativeScanExec`) are recognised as input boundaries + // without requiring a core compile-time reference to the contrib class. + // All built-in leaf scans (`CometNativeScanExec`, `CometIcebergNativeScanExec`, + // `CometCsvNativeScanExec`) also extend `CometLeafExec`, so this is a + // strict superset of the previous enumeration -- it just generalises the + // input-boundary concept from "this fixed list" to "any leaf Comet exec". + case _: CometLeafExec => + func(plan) + case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | @@ -754,63 +852,6 @@ abstract class CometNativeExec extends CometExec { indices.toSet } - /** - * Find all plan nodes with per-partition planning data in the plan tree. Returns two maps keyed - * by a unique identifier: one for common data (shared across partitions) and one for - * per-partition data. - * - * Currently supports Iceberg scans (keyed by metadata_location). Additional scan types can be - * added by extending this method. - * - * Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid - * within the same stage. - * - * @return - * (commonByKey, perPartitionByKey) - common data is shared, per-partition varies - */ - private def findAllPlanData( - plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = { - plan match { - case iceberg: CometIcebergNativeScanExec => - // Trigger Spark's standard prepare -> waitForSubqueries lifecycle so DPP - // InSubqueryExec values are resolved before commonData is read. Without this, - // the parent CometNativeExec.executeQuery flow never invokes the scan's - // executeQuery, leaving DPP unresolved and forcing a sync-on-this await inside - // the serializedPartitionData lazy val initializer (a known deadlock surface). - iceberg.ensureSubqueriesResolved() - if (iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty) { - ( - Map(iceberg.metadataLocation -> iceberg.commonData), - Map(iceberg.metadataLocation -> iceberg.perPartitionData)) - } else { - (Map.empty, Map.empty) - } - - case nativeScan: CometNativeScanExec => - nativeScan.ensureSubqueriesResolved() - ( - Map(nativeScan.sourceKey -> nativeScan.commonData), - Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) - - // Broadcast stages are boundaries - don't collect per-partition data from inside them. - // After DPP filtering, broadcast scans may have different partition counts than the - // probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions. - case _: BroadcastQueryStageExec | _: CometBroadcastExchangeExec => - (Map.empty, Map.empty) - - // Stage boundaries - stop searching (partition indices won't align after these) - case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | - _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | - _: ReusedExchangeExec | _: CometSparkToColumnarExec => - (Map.empty, Map.empty) - - // Continue searching through other operators, combining results from all children - case _ => - val results = plan.children.map(findAllPlanData) - (results.flatMap(_._1).toMap, results.flatMap(_._2).toMap) - } - } - /** * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. @@ -901,6 +942,52 @@ abstract class CometLeafExec extends CometNativeExec with LeafExecNode { } } +/** + * Marker trait for scan execs that surface planning data (a `commonData` block + per-partition + * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can find and inject the + * data when the scan is fused into a larger native subtree. + * + * Implemented by `CometNativeScanExec` and the contrib's `CometDeltaNativeScanExec` -- without + * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition tasks and the + * parent's native execution receives an empty input. (`CometIcebergNativeScanExec` does NOT use + * this trait; it has a dedicated `findAllPlanData` case.) + * + * Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved` before + * `commonData`/`perPartitionData` are read. That method lives on [[CometLeafExec]], so the `self: + * CometLeafExec` self-type below makes "is a leaf scan" a compile-time requirement: an + * implementer cannot forget to extend [[CometLeafExec]] (which would otherwise compile and then + * silently skip subquery resolution -- the deadlock surface `ensureSubqueriesResolved` exists to + * prevent). It also lets [[PlanDataInjector.findAllPlanData]] drive the lifecycle without a + * runtime "not a leaf" fallback. + */ +trait CometScanWithPlanData { self: CometLeafExec => + def sourceKey: String + def commonData: Array[Byte] + def perPartitionData: Array[Array[Byte]] + + // DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast + // subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters. + // Default empty: scans with dedicated handling (CometNativeScanExec, + // CometIcebergNativeScanExec) don't use this path. + def dynamicPruningFilters: Seq[Expression] = Nil + + // Install rewritten DPP filters on this scan. Implementers whose filters live + // in a @transient field (which TreeNode.makeCopy can't carry, #3510) update + // them via a transient side-channel and return `this` -- so the optimizer + // rule's rewrite lands on the SAME instance that executes, instead of a copy + // that gets dropped when the enclosing native block is rebuilt. Only called + // when `dynamicPruningFilters` is non-empty, so the default is never reached + // for scans that leave it empty. + // + // TODO(#3510): once TreeNode.makeCopy preserves @transient fields, the + // mutate-and-return-`this` workaround can collapse back to a normal copy, mirroring + // the matching TODO in CometPlanAdaptiveDynamicPruningFilters. + def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan = + throw new UnsupportedOperationException( + s"${getClass.getSimpleName} exposes dynamicPruningFilters but does not " + + "override withDynamicPruningFilters") +} + abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode abstract class CometBinaryExec extends CometNativeExec with BinaryExecNode diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala new file mode 100644 index 0000000000..0a3fc25a60 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import java.io.File +import java.net.URLClassLoader +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util.ServiceLoader + +import scala.jdk.CollectionConverters._ + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.execution.SparkPlan + +import org.apache.comet.serde.OperatorOuterClass.Operator + +/** + * Unit coverage for the core SPI that lets out-of-tree contrib leaf scans (e.g. the Delta + * contrib's `CometDeltaNativeScanExec`) participate in Comet native planning without core holding + * a compile-time reference to them: + * + * - the [[CometScanWithPlanData]] trait contract and its collection through + * [[PlanDataInjector.findAllPlanData]], and + * - the `ServiceLoader`-based discovery of contrib [[PlanDataInjector]]s in + * [[PlanDataInjector]]'s registry. + * + * Deliberately does not exercise the per-op injector registry mechanics; that surface is owned by + * `PlanDataInjectorSuite`. + */ +class CometScanWithPlanDataSuite extends AnyFunSuite { + + /** + * Minimal `CometLeafExec with CometScanWithPlanData` (the trait's `self: CometLeafExec` + * self-type forces the leaf base). It opts into none of the optional DPP behaviour and records + * whether the subquery-resolution lifecycle hook was driven, so a test can confirm + * `findAllPlanData` routes a non-`CometNativeScanExec` trait scan through it. + */ + private case class StubScan( + override val sourceKey: String = "stub-key", + common: Array[Byte] = Array.emptyByteArray, + perPart: Array[Array[Byte]] = Array.empty) + extends CometLeafExec + with CometScanWithPlanData { + + @transient var subqueriesResolved: Boolean = false + + // Real CometLeafExec.ensureSubqueriesResolved drives Spark's prepare/waitForSubqueries, which + // needs a live session; for this pure unit test we record the call instead. The behaviour + // under test is that findAllPlanData *invokes* the hook for a trait scan, not the hook itself + // (CometLeafExec.ensureSubqueriesResolved is exercised by the operator suites end to end). + override def ensureSubqueriesResolved(): Unit = { subqueriesResolved = true } + + // findAllPlanData only reads the trait members + the lifecycle hook; originalPlan (used by + // CometExec for output/ordering/partitioning) is never touched on this path. + override def output: Seq[Attribute] = Seq.empty + override def originalPlan: SparkPlan = null + override def nativeOp: Operator = Operator.getDefaultInstance + override def serializedPlanOpt: SerializedPlan = SerializedPlan(None) + override def commonData: Array[Byte] = common + override def perPartitionData: Array[Array[Byte]] = perPart + } + + test( + "CometScanWithPlanData defaults: no DPP filters and withDynamicPruningFilters is unsupported") { + val scan = StubScan() + assert(scan.dynamicPruningFilters == Nil, "a scan must expose no DPP filters by default") + // The default exists only so the DPP rule never calls it for scans that leave + // dynamicPruningFilters empty; if a scan does expose filters it must override this. + val ex = intercept[UnsupportedOperationException] { + scan.withDynamicPruningFilters(Seq.empty[Expression]) + } + assert( + ex.getMessage.contains("withDynamicPruningFilters"), + s"default failure must name the un-overridden method, got: ${ex.getMessage}") + } + + test("findAllPlanData collects data from a non-CometNativeScanExec trait scan") { + // The whole point of the SPI: a brand-new leaf scan that core has never heard of, surfacing + // its data only through CometScanWithPlanData, is collected by findAllPlanData. This exercises + // the generic trait arm independently of CometNativeScanExec / CometIcebergNativeScanExec. + val common = Array[Byte](1, 2, 3) + val perPart = Array(Array[Byte](10), Array[Byte](20)) + val scan = StubScan("contrib-key", common, perPart) + + val (commonByKey, perPartitionByKey) = PlanDataInjector.findAllPlanData(scan) + + assert(commonByKey.keySet == Set("contrib-key")) + assert(commonByKey("contrib-key") sameElements common) + assert(perPartitionByKey.keySet == Set("contrib-key")) + assert(perPartitionByKey("contrib-key").map(_.toSeq).toSeq == perPart.map(_.toSeq).toSeq) + assert( + scan.subqueriesResolved, + "findAllPlanData must drive the scan's subquery-resolution lifecycle before reading data") + } + + test("findAllPlanData skips a trait scan that surfaces no data") { + // Mirrors the Iceberg arm: an implementation that returns empty common/per-partition data + // contributes nothing, rather than an empty-array entry that would fail downstream injection. + val scan = StubScan("empty-key") + val (commonByKey, perPartitionByKey) = PlanDataInjector.findAllPlanData(scan) + assert(commonByKey.isEmpty) + assert(perPartitionByKey.isEmpty) + } + + test("PlanDataInjector registry contains only built-in injectors on a default build") { + // No contrib ships a META-INF/services/...PlanDataInjector on the default test classpath, so + // ServiceLoader discovery adds nothing and the registry is exactly the two built-ins. + val injectors = PlanDataInjector.injectors + assert( + injectors.contains(IcebergPlanDataInjector), + "registry must keep the built-in Iceberg injector") + assert( + injectors.contains(NativeScanPlanDataInjector), + "registry must keep the built-in native-scan injector") + assert( + injectors.forall(i => i == IcebergPlanDataInjector || i == NativeScanPlanDataInjector), + s"default build must carry no contrib injectors, got: ${injectors.map(_.getClass.getName)}") + } + + test("a contrib PlanDataInjector is discovered via ServiceLoader (META-INF/services)") { + // Proves the discovery contract a contrib relies on: dropping a service file naming a + // PlanDataInjector implementation makes it visible to ServiceLoader -- no core change. We use + // an isolated child classloader carrying only the service file so the global registry (asserted + // built-ins-only above) is untouched. + // The scalatest plugin points java.io.tmpdir at spark/target/tmp, which may not exist yet; + // ensure it before createTempDirectory (which requires its parent to exist). + val baseTmp = new File(System.getProperty("java.io.tmpdir")) + baseTmp.mkdirs() + val svcDir = Files.createTempDirectory(baseTmp.toPath, "comet-spi-test").toFile + try { + val servicesDir = new File(svcDir, "META-INF/services") + assert(servicesDir.mkdirs(), s"could not create $servicesDir") + Files.write( + new File(servicesDir, classOf[PlanDataInjector].getName).toPath, + (classOf[TestStubPlanDataInjector].getName + "\n").getBytes(StandardCharsets.UTF_8)) + + val loader = new URLClassLoader(Array(svcDir.toURI.toURL), getClass.getClassLoader) + val discovered = + ServiceLoader.load(classOf[PlanDataInjector], loader).asScala.toSeq + assert( + discovered.exists(_.isInstanceOf[TestStubPlanDataInjector]), + s"ServiceLoader should discover the stub, got: ${discovered.map(_.getClass.getName)}") + } finally { + def del(f: File): Unit = { + Option(f.listFiles()).foreach(_.foreach(del)) + f.delete() + } + del(svcDir) + } + } +} + +/** + * Top-level public no-arg `PlanDataInjector` so `ServiceLoader` can instantiate it (Scala + * `object`s have a private constructor and would not be instantiable). Inert: it claims no + * operator kind. + */ +class TestStubPlanDataInjector extends PlanDataInjector { + override def opStructCase: Operator.OpStructCase = Operator.OpStructCase.OPSTRUCT_NOT_SET + override def canInject(op: Operator): Boolean = false + override def getKey(op: Operator): Option[String] = None + override def inject( + op: Operator, + commonBytes: Array[Byte], + partitionBytes: Array[Byte]): Operator = + op +}