Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ jobs:
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
org.apache.spark.sql.comet.PlanDataInjectorSuite
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
org.apache.spark.sql.comet.CometScanWithPlanDataSuite
org.apache.spark.sql.comet.util.UtilsSuite
org.apache.comet.objectstore.NativeConfigSuite
org.apache.spark.sql.CometToPrettyStringSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ jobs:
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
org.apache.spark.sql.comet.PlanDataInjectorSuite
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
org.apache.spark.sql.comet.CometScanWithPlanDataSuite
org.apache.spark.sql.comet.util.UtilsSuite
org.apache.comet.objectstore.NativeConfigSuite
org.apache.spark.sql.CometToPrettyStringSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,22 @@ case object CometPlanAdaptiveDynamicPruningFilters
if icebergScan.runtimeFilters.exists(hasCometSAB) =>
logDebug("Converting AQE DPP for CometIcebergNativeScanExec")
convertIcebergScanDPP(icebergScan, plan)
// Comet scans whose DPP filters live in a @transient field (the contrib's
// CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite
// them, and a rewritten copy is orphaned when the enclosing native block
// is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs the
// rewrite in place and returns `this`, so it lands on the executing
// instance.
case p: CometScanWithPlanData if p.dynamicPruningFilters.exists(hasCometSAB) =>
logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in place")
p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f => convertFilter(f, plan)))
case p: SparkPlan

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking for this part, just flagging for later: this arm excludes CometNativeScanExec and CometIcebergNativeScanExec by explicit isInstanceOf checks but not contrib trait scans. A future Delta scan with a wrapped SAB but empty dynamicPruningFilters would fall through to convertNonCometNodeDPP. That's the contrib's concern in a later part, worth keeping in mind so the exclusion list doesn't silently misroute trait scans.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — addressed pre-emptively in dfde184 rather than deferring, since it was cheap and forward-safe: the non-Comet arm now also excludes CometScanWithPlanData (&& !p.isInstanceOf[CometScanWithPlanData]), so a future trait scan with a wrapped SAB but empty dynamicPruningFilters can't be misrouted to convertNonCometNodeDPP. No behavior change today (CometNativeScanExec was already excluded by name).

if !p.isInstanceOf[CometNativeScanExec]
&& !p.isInstanceOf[CometIcebergNativeScanExec]
// Contrib trait scans handle their own DPP in the arm above. Exclude them here too so
// a trait scan with a wrapped SAB but empty `dynamicPruningFilters` isn't misrouted to
// the non-Comet path (which mutates expressions in place via makeCopy).
&& !p.isInstanceOf[CometScanWithPlanData]
&& hasWrappedSAB(p) =>
logDebug(s"Converting AQE DPP for non-Comet node: ${p.nodeName}")
convertNonCometNodeDPP(p, plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.comet.serde.operator.CometIcebergNativeScan
* `expressions` walk picks up the contained `DynamicPruningExpression(InSubqueryExec(...))`, and
* the standard `prepare -> prepareSubqueries -> waitForSubqueries` lifecycle resolves it. The
* lifecycle is invoked via `CometLeafExec.ensureSubqueriesResolved`, called from
* `CometNativeExec.findAllPlanData` before `commonData` is read.
* `PlanDataInjector.findAllPlanData` before `commonData` is read.
*/
case class CometIcebergNativeScanExec(
override val nativeOp: Operator,
Expand All @@ -69,12 +69,12 @@ case class CometIcebergNativeScanExec(

/**
* Lazy partition serialization, deferred until execution time. Triggered from `commonData` /
* `perPartitionData` (via `CometNativeExec.findAllPlanData`) and from `LazyIcebergMetric.value`
* (via Iceberg planning metrics). Lazy val semantics ensure single evaluation across entry
* points.
* `perPartitionData` (via `PlanDataInjector.findAllPlanData`) and from
* `LazyIcebergMetric.value` (via Iceberg planning metrics). Lazy val semantics ensure single
* evaluation across entry points.
*
* DPP InSubqueryExec values must already be resolved by the time this lazy val runs.
* `CometNativeExec.findAllPlanData` calls `ensureSubqueriesResolved` (which invokes Spark's
* `PlanDataInjector.findAllPlanData` calls `ensureSubqueriesResolved` (which invokes Spark's
* `prepare -> waitForSubqueries`) before reading `commonData`. The `serializePartitions` call
* below reads `originalPlan.runtimeFilters` indirectly through `inputRDD -> filteredPartitions`
* and applies the resolved values to Iceberg's runtime filtering. `originalPlan.runtimeFilters`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ case class CometNativeScanExec(
sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime
extends CometLeafExec
with DataSourceScanExec
with ShimStreamSourceAwareSparkPlan {
with ShimStreamSourceAwareSparkPlan
with CometScanWithPlanData {

override lazy val metadata: Map[String, String] =
if (originalPlan != null) originalPlan.metadata else Map.empty
Expand Down Expand Up @@ -159,7 +160,7 @@ case class CometNativeScanExec(
// Outer partitionFilters (wrapper) DPP is resolved by Spark's standard
// prepare -> waitForSubqueries lifecycle, triggered explicitly via
// CometLeafExec.ensureSubqueriesResolved called from
// CometNativeExec.findAllPlanData before commonData is read.
// PlanDataInjector.findAllPlanData before commonData is read.
//
// Inner scan.partitionFilters holds a SEPARATE InSubqueryExec instance that
// Spark's expressions walk does not see (scan is @transient and not a sibling
Expand Down
223 changes: 155 additions & 68 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.apache.spark.sql.comet

import java.util.Locale
import java.util.ServiceLoader

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder}
Expand Down Expand Up @@ -87,14 +90,34 @@ private[comet] trait PlanDataInjector {
/**
* Registry and utilities for injecting per-partition planning data into operator trees.
*/
private[comet] object PlanDataInjector {

// Registry of injectors for different operator types
private val injectors: Seq[PlanDataInjector] = Seq(
IcebergPlanDataInjector,
NativeScanPlanDataInjector
// Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
)
private[comet] object PlanDataInjector extends Logging {

// Registry of injectors for different operator types. The built-in injectors live in core.
// Out-of-tree contribs (e.g. contrib-delta's `DeltaPlanDataInjector`) are discovered via the
// standard JDK `ServiceLoader`: a contrib ships a
// `META-INF/services/org.apache.spark.sql.comet.PlanDataInjector` resource naming its
// implementation, so it joins the registry without core holding any compile-time reference or
// contrib-specific code. Default builds carry no such service file, so discovery yields nothing
// and the registry is exactly the built-ins -- zero contrib surface at runtime.
private[comet] val injectors: Seq[PlanDataInjector] = {
val builtin: Seq[PlanDataInjector] = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector)
val discovered: Seq[PlanDataInjector] =
try {
ServiceLoader.load(classOf[PlanDataInjector], getClass.getClassLoader).asScala.toSeq
} catch {
// A misbuilt contrib jar -- a malformed service file, or a listed provider that can't be
// instantiated -- surfaces as ServiceConfigurationError while the iterator is forced.
// Warn so it's diagnosable, then continue with the built-ins so the planner stays alive.
// NonFatal covers ServiceConfigurationError (it is not a LinkageError).
case NonFatal(e) =>
logWarning(
"Failed to load contrib PlanDataInjector services; " +
"continuing with built-in injectors only",
e)
Seq.empty
}
builtin ++ discovered
}

// O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op
// `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls
Expand Down Expand Up @@ -153,6 +176,73 @@ private[comet] object PlanDataInjector {
codedOutput.checkNoSpaceLeft()
bytes
}

/**
* Find all plan nodes with per-partition planning data in the plan tree. Returns two maps keyed
* by a unique identifier: one for common data (shared across partitions) and one for
* per-partition data.
*
* Recognises Iceberg scans (keyed by metadata_location) plus any leaf scan that surfaces its
* data via the [[CometScanWithPlanData]] trait (`CometNativeScanExec` and out-of-tree contrib
* scans such as the Delta contrib's `CometDeltaNativeScanExec`).
*
* Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid
* within the same stage.
*
* @return
* (commonByKey, perPartitionByKey) - common data is shared, per-partition varies
*/
private[comet] def findAllPlanData(
plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = {
plan match {
case iceberg: CometIcebergNativeScanExec =>
// Trigger Spark's standard prepare -> waitForSubqueries lifecycle so DPP
// InSubqueryExec values are resolved before commonData is read. Without this,
// the parent CometNativeExec.executeQuery flow never invokes the scan's
// executeQuery, leaving DPP unresolved and forcing a sync-on-this await inside
// the serializedPartitionData lazy val initializer (a known deadlock surface).
iceberg.ensureSubqueriesResolved()
if (iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty) {
(
Map(iceberg.metadataLocation -> iceberg.commonData),
Map(iceberg.metadataLocation -> iceberg.perPartitionData))
} else {
(Map.empty, Map.empty)
}

// Generic path for leaf scans that surface planning data via the
// `CometScanWithPlanData` trait. Catches `CometNativeScanExec` and any contrib
// leaf scan (e.g. the Delta contrib's `CometDeltaNativeScanExec`) without
// requiring core to compile-time reference contrib classes. The trait's
// `self: CometLeafExec` self-type guarantees this is also a `CometLeafExec`, so
// the compound pattern always matches and we can drive the subquery lifecycle
// directly -- there is no silent "not a leaf" skip.
case s: CometLeafExec with CometScanWithPlanData =>
s.ensureSubqueriesResolved()
if (s.commonData.nonEmpty && s.perPartitionData.nonEmpty) {
(Map(s.sourceKey -> s.commonData), Map(s.sourceKey -> s.perPartitionData))
} else {
(Map.empty, Map.empty)
}

// Broadcast stages are boundaries - don't collect per-partition data from inside them.
// After DPP filtering, broadcast scans may have different partition counts than the
// probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions.
case _: BroadcastQueryStageExec | _: CometBroadcastExchangeExec =>
(Map.empty, Map.empty)

// Stage boundaries - stop searching (partition indices won't align after these)
case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec |
_: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec |
_: ReusedExchangeExec | _: CometSparkToColumnarExec =>
(Map.empty, Map.empty)

// Continue searching through other operators, combining results from all children
case _ =>
val results = plan.children.map(findAllPlanData)
(results.flatMap(_._1).toMap, results.flatMap(_._2).toMap)
}
}
}

/**
Expand Down Expand Up @@ -539,7 +629,7 @@ abstract class CometNativeExec extends CometExec {
}

// Find planning data within this stage (stops at shuffle boundaries).
val (commonByKey, perPartitionByKey) = findAllPlanData(this)
val (commonByKey, perPartitionByKey) = PlanDataInjector.findAllPlanData(this)

// Collect the input batches from the child operators. Non-shuffle inputs become
// RDD[ArrowArrayStream] (one stream per partition, exported via the C Stream Interface
Expand Down Expand Up @@ -718,8 +808,16 @@ abstract class CometNativeExec extends CometExec {
*/
def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = {
plan match {
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
_: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec |
// Match `CometLeafExec` first so contrib leaf scans (e.g. the Delta
// contrib's `CometDeltaNativeScanExec`) are recognised as input boundaries
// without requiring a core compile-time reference to the contrib class.
// All built-in leaf scans (`CometNativeScanExec`, `CometIcebergNativeScanExec`,
// `CometCsvNativeScanExec`) also extend `CometLeafExec`, so this is a
// strict superset of the previous enumeration -- it just generalises the
// input-boundary concept from "this fixed list" to "any leaf Comet exec".
case _: CometLeafExec =>
func(plan)
case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec |
_: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec |
_: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec |
_: CometBroadcastExchangeExec | _: BroadcastQueryStageExec |
Expand Down Expand Up @@ -754,63 +852,6 @@ abstract class CometNativeExec extends CometExec {
indices.toSet
}

/**
* Find all plan nodes with per-partition planning data in the plan tree. Returns two maps keyed
* by a unique identifier: one for common data (shared across partitions) and one for
* per-partition data.
*
* Currently supports Iceberg scans (keyed by metadata_location). Additional scan types can be
* added by extending this method.
*
* Stops at stage boundaries (shuffle exchanges, etc.) because partition indices are only valid
* within the same stage.
*
* @return
* (commonByKey, perPartitionByKey) - common data is shared, per-partition varies
*/
private def findAllPlanData(
plan: SparkPlan): (Map[String, Array[Byte]], Map[String, Array[Array[Byte]]]) = {
plan match {
case iceberg: CometIcebergNativeScanExec =>
// Trigger Spark's standard prepare -> waitForSubqueries lifecycle so DPP
// InSubqueryExec values are resolved before commonData is read. Without this,
// the parent CometNativeExec.executeQuery flow never invokes the scan's
// executeQuery, leaving DPP unresolved and forcing a sync-on-this await inside
// the serializedPartitionData lazy val initializer (a known deadlock surface).
iceberg.ensureSubqueriesResolved()
if (iceberg.commonData.nonEmpty && iceberg.perPartitionData.nonEmpty) {
(
Map(iceberg.metadataLocation -> iceberg.commonData),
Map(iceberg.metadataLocation -> iceberg.perPartitionData))
} else {
(Map.empty, Map.empty)
}

case nativeScan: CometNativeScanExec =>
nativeScan.ensureSubqueriesResolved()
(
Map(nativeScan.sourceKey -> nativeScan.commonData),
Map(nativeScan.sourceKey -> nativeScan.perPartitionData))

// Broadcast stages are boundaries - don't collect per-partition data from inside them.
// After DPP filtering, broadcast scans may have different partition counts than the
// probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions.
case _: BroadcastQueryStageExec | _: CometBroadcastExchangeExec =>
(Map.empty, Map.empty)

// Stage boundaries - stop searching (partition indices won't align after these)
case _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec |
_: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec |
_: ReusedExchangeExec | _: CometSparkToColumnarExec =>
(Map.empty, Map.empty)

// Continue searching through other operators, combining results from all children
case _ =>
val results = plan.children.map(findAllPlanData)
(results.flatMap(_._1).toMap, results.flatMap(_._2).toMap)
}
}

/**
* Converts this native Comet operator and its children into a native block which can be
* executed as a whole (i.e., in a single JNI call) from the native side.
Expand Down Expand Up @@ -901,6 +942,52 @@ abstract class CometLeafExec extends CometNativeExec with LeafExecNode {
}
}

/**
* Marker trait for scan execs that surface planning data (a `commonData` block + per-partition
* task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can find and inject the
* data when the scan is fused into a larger native subtree.
*
* Implemented by `CometNativeScanExec` and the contrib's `CometDeltaNativeScanExec` -- without
* it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition tasks and the
* parent's native execution receives an empty input. (`CometIcebergNativeScanExec` does NOT use
* this trait; it has a dedicated `findAllPlanData` case.)
*
* Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved` before
* `commonData`/`perPartitionData` are read. That method lives on [[CometLeafExec]], so the `self:
* CometLeafExec` self-type below makes "is a leaf scan" a compile-time requirement: an
* implementer cannot forget to extend [[CometLeafExec]] (which would otherwise compile and then
* silently skip subquery resolution -- the deadlock surface `ensureSubqueriesResolved` exists to
* prevent). It also lets [[PlanDataInjector.findAllPlanData]] drive the lifecycle without a
* runtime "not a leaf" fallback.
*/
trait CometScanWithPlanData { self: CometLeafExec =>
def sourceKey: String
def commonData: Array[Byte]
def perPartitionData: Array[Array[Byte]]

// DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast
// subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters.
// Default empty: scans with dedicated handling (CometNativeScanExec,
// CometIcebergNativeScanExec) don't use this path.
def dynamicPruningFilters: Seq[Expression] = Nil

// Install rewritten DPP filters on this scan. Implementers whose filters live
// in a @transient field (which TreeNode.makeCopy can't carry, #3510) update
// them via a transient side-channel and return `this` -- so the optimizer
// rule's rewrite lands on the SAME instance that executes, instead of a copy
// that gets dropped when the enclosing native block is rebuilt. Only called
// when `dynamicPruningFilters` is non-empty, so the default is never reached
// for scans that leave it empty.
//
// TODO(#3510): once TreeNode.makeCopy preserves @transient fields, the
// mutate-and-return-`this` workaround can collapse back to a normal copy, mirroring
// the matching TODO in CometPlanAdaptiveDynamicPruningFilters.
def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutate-and-return-this shape is a reasonable workaround given @transient filters can't survive makeCopy, and the comment explains it well. Since CometPlanAdaptiveDynamicPruningFilters already carries a TODO(#3510) for the day makeCopy is fixed, could we mirror that marker here? It would help this collapse back into a normal copy rather than lingering once the underlying issue is resolved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in dfde184 — mirrored the marker: withDynamicPruningFilters now carries a TODO(#3510) noting the mutate-and-return-this collapses back to a normal copy once makeCopy preserves @transient fields, pointing at the matching TODO in CometPlanAdaptiveDynamicPruningFilters.

throw new UnsupportedOperationException(
s"${getClass.getSimpleName} exposes dynamicPruningFilters but does not " +
"override withDynamicPruningFilters")
}

abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode

abstract class CometBinaryExec extends CometNativeExec with BinaryExecNode
Expand Down
Loading