feat: core SPI for contrib leaf scans (CometScanWithPlanData) [Delta contrib split, part 1]#4700
feat: core SPI for contrib leaf scans (CometScanWithPlanData) [Delta contrib split, part 1]#4700schenksj wants to merge 3 commits into
Conversation
Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (apache#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: apache#4366).
…ewed clean (fork #4)
| // class with no MODULE$ field) AND `Foo$.class` (the module class that | ||
| // does have MODULE$). The trailing `$` selects the module class. | ||
| // scalastyle:off classforname | ||
| val cls = Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$") |
There was a problem hiding this comment.
This would require every other contrib to have to add their own block of code. Consider ServiceLoader (META-INF/services/...PlanDataInjector) so contribs are discoverable without modifying core.
There was a problem hiding this comment.
Done in dfde184. Switched to java.util.ServiceLoader: core now does ServiceLoader.load(classOf[PlanDataInjector], ...) and appends whatever it finds to the built-ins, so a contrib just ships a META-INF/services/org.apache.spark.sql.comet.PlanDataInjector file naming its impl — no core edit. The later contrib-delta part will register class DeltaPlanDataInjector extends PlanDataInjector (a class, not a Scala object, so ServiceLoader can instantiate it via its no-arg ctor). Default builds carry no such service file, so the registry stays exactly the built-ins. Added a discovery test (isolated child classloader carrying just the service file) so the global registry is untouched.
| // or a CCE on the PlanDataInjector cast. That's a misbuilt contrib jar, not | ||
| // a default build -- warn so it's diagnosable, then still decline so the | ||
| // rest of the planner stays alive. | ||
| case e: Throwable => |
There was a problem hiding this comment.
Maybe use scala.util.control.NonFatal instead?
There was a problem hiding this comment.
Done in dfde184 — the discovery block now catches scala.util.control.NonFatal, which covers the ServiceConfigurationError thrown when forcing the loader iterator over a misbuilt contrib jar (it is an Error but not a LinkageError, so NonFatal matches), while still letting truly fatal errors through.
| case s: CometScanWithPlanData => | ||
| s match { | ||
| case leaf: CometLeafExec => leaf.ensureSubqueriesResolved() | ||
| case _ => // no DPP lifecycle to drive |
There was a problem hiding this comment.
The iceberg path handles if (commonData.empty && perPartitionData.empty) by returning (Map.empty, Map.empty) but this path doesn't. An implementation returning an empty array will probably cause an exception downstream. Better to handle it here
There was a problem hiding this comment.
Good catch — done in dfde184. The generic trait arm now mirrors the Iceberg arm: if (s.commonData.nonEmpty && s.perPartitionData.nonEmpty) it contributes its entry, else returns (Map.empty, Map.empty) so an empty-array scan adds nothing rather than an entry that fails downstream injection. Added a regression test ("findAllPlanData skips a trait scan that surfaces no data").
| * Deliberately does not exercise the per-op injector registry mechanics; that surface is owned by | ||
| * `PlanDataInjectorSuite`. | ||
| */ | ||
| class CometScanWithPlanDataSuite extends AnyFunSuite { |
There was a problem hiding this comment.
This really needs a test that implements a stub/mock CometLeafExec with CometScanWithPlanData wired through findAllPlanData. That would confirm the new code path works independently of the existing scan classes.
There was a problem hiding this comment.
Done in dfde184. Added StubScan — a CometLeafExec with CometScanWithPlanData that core has never heard of — and a test that runs it through PlanDataInjector.findAllPlanData and asserts its commonData/perPartitionData are collected under its sourceKey (and that the subquery-resolution lifecycle hook is driven). To make this testable I relocated findAllPlanData from CometNativeExec (where it was a private instance method using no instance state) into the PlanDataInjector object — which also makes the existing PlanDataInjector.findAllPlanData scaladoc references correct.
| * Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved` | ||
| * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData` are read. | ||
| */ | ||
| trait CometScanWithPlanData { |
There was a problem hiding this comment.
The scaladoc says each implementation resolves its own DPP subqueries via ensureSubqueriesResolved (overridden from CometLeafExec), but the trait doesn't extend CometLeafExec, and findAllPlanData guards the call with a runtime case leaf: CometLeafExec => ... case _ =>. An implementer that forgets to also extend CometLeafExec compiles fine and then silently skips subquery resolution, which is the deadlock surface ensureSubqueriesResolved exists to prevent. Would it be worth making the requirement explicit with a self-type (trait CometScanWithPlanData { self: CometLeafExec => ... }), or at least a scaladoc note that implementers must extend CometLeafExec? That also pairs well with the suggested stub test: a CometLeafExec with CometScanWithPlanData stub exercises the real lifecycle path rather than the case _ => fallback.
There was a problem hiding this comment.
Done in dfde184 — went with the self-type: trait CometScanWithPlanData { self: CometLeafExec => ... }. Now "is a leaf scan" is a compile-time requirement (an impl that forgets to extend CometLeafExec no longer compiles), and findAllPlanData matches the compound type case s: CometLeafExec with CometScanWithPlanData and drives ensureSubqueriesResolved() directly — the runtime case _ => silent-skip is gone. The stub test you suggested now exercises this exact path (CometLeafExec with CometScanWithPlanData through findAllPlanData).
| // that gets dropped when the enclosing native block is rebuilt. Only called | ||
| // when `dynamicPruningFilters` is non-empty, so the default is never reached | ||
| // for scans that leave it empty. | ||
| def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan = |
There was a problem hiding this comment.
The mutate-and-return-this shape is a reasonable workaround given @transient filters can't survive makeCopy, and the comment explains it well. Since CometPlanAdaptiveDynamicPruningFilters already carries a TODO(#3510) for the day makeCopy is fixed, could we mirror that marker here? It would help this collapse back into a normal copy rather than lingering once the underlying issue is resolved.
There was a problem hiding this comment.
Done in dfde184 — mirrored the marker: withDynamicPruningFilters now carries a TODO(#3510) noting the mutate-and-return-this collapses back to a normal copy once makeCopy preserves @transient fields, pointing at the matching TODO in CometPlanAdaptiveDynamicPruningFilters.
| case p: CometScanWithPlanData if p.dynamicPruningFilters.exists(hasCometSAB) => | ||
| logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in place") | ||
| p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f => convertFilter(f, plan))) | ||
| case p: SparkPlan |
There was a problem hiding this comment.
Not blocking for this part, just flagging for later: this arm excludes CometNativeScanExec and CometIcebergNativeScanExec by explicit isInstanceOf checks but not contrib trait scans. A future Delta scan with a wrapped SAB but empty dynamicPruningFilters would fall through to convertNonCometNodeDPP. That's the contrib's concern in a later part, worth keeping in mind so the exclusion list doesn't silently misroute trait scans.
There was a problem hiding this comment.
Thanks — addressed pre-emptively in dfde184 rather than deferring, since it was cheap and forward-safe: the non-Comet arm now also excludes CometScanWithPlanData (&& !p.isInstanceOf[CometScanWithPlanData]), so a future trait scan with a wrapped SAB but empty dynamicPruningFilters can't be misrouted to convertNonCometNodeDPP. No behavior change today (CometNativeScanExec was already excluded by name).
parthchandra:
- Discover contrib PlanDataInjectors via java.util.ServiceLoader
(META-INF/services) instead of a hardcoded reflective DeltaPlanDataInjector$
slot, so contribs are discoverable without editing core.
- Catch via scala.util.control.NonFatal (covers ServiceConfigurationError).
- findAllPlanData's generic trait arm now mirrors the Iceberg arm: empty
common/per-partition data contributes nothing instead of an empty entry
that would fail downstream injection.
- Add a stub CometLeafExec with CometScanWithPlanData wired through
findAllPlanData, confirming the new path works independently of the
existing scan classes; plus a ServiceLoader discovery test.
andygrove:
- Make CometScanWithPlanData a self-type `{ self: CometLeafExec => }`, so
"is a leaf scan" is a compile-time requirement and findAllPlanData drives
the subquery lifecycle with no silent "not a leaf" fallback.
- Mirror the TODO(apache#3510) marker on withDynamicPruningFilters.
- Pre-emptively exclude CometScanWithPlanData from the non-Comet DPP arm in
CometPlanAdaptiveDynamicPruningFilters so a future trait scan with a wrapped
SAB but empty dynamicPruningFilters isn't misrouted.
Also relocate findAllPlanData from CometNativeExec into the PlanDataInjector
object (it uses no instance state), which fixes the now-correct
PlanDataInjector.findAllPlanData scaladoc reference and makes it unit-testable.
CometScanWithPlanDataSuite 5/5 on Spark 4.1/2.13; spark module also compiles
clean under Scala 2.12 (Spark 3.4).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01BtErWgRQKCDRAg8Mk6qR4G
|
Thanks for the thorough reviews @parthchandra and @andygrove — all points addressed in dfde184 (replied inline on each thread). Summary:
I also relocated The later contrib-delta part will ship Verified: (Disclosure: these changes and this response were written with help from AI — Claude Code.) |
findAllPlanData moved from CometNativeExec into the PlanDataInjector object; update the now-stale `CometNativeExec.findAllPlanData` references in the CometNativeScanExec and CometIcebergNativeScanExec doc comments to `PlanDataInjector.findAllPlanData`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01BtErWgRQKCDRAg8Mk6qR4G
Part 1 of the Delta Lake contrib PR breakup. The native Delta scan work (delta-kernel-rs, Iceberg-style contrib) was first posted as a single ~27k-line tracking PR, #4366, which is impractical to review as one unit. This is the first of a sequence of small, independently-reviewable, independently-mergeable PRs that reconstruct that work. The full sequence and its dependency graph live in #4366.
This first slice touches core only. It adds a small extension contract that lets out-of-tree Comet contrib leaf scans (Delta now... others later?) take part in native planning without core holding a compile-time reference to them. This is the same "the edge keeps the source-specific code" shape Iceberg already uses. It ships no Delta code and is inert on default builds.
Changes
trait CometScanWithPlanData—sourceKey/commonData/perPartitionData, plus optionaldynamicPruningFilters/withDynamicPruningFilters(for scans whose DPP filters live in a@transientfield thatTreeNode.makeCopycannot carry, Epic: CometNativeScan improvements (per-partition serde, cleanup, DPP, AQE DPP, V2 operator) #3510).CometNativeScanExecmixes it in.foreachUntilCometInputnow matchescase _: CometLeafExec. This is a strict superset of the previous fixed scan list: the three leaf scans it replaces (CometNativeScanExec,CometIcebergNativeScanExec,CometCsvNativeScanExec) are exactly the classes that extendCometLeafExec.PlanDataInjector.findAllPlanDatacollects per-partition planning data via the trait instead of a hardcodedCometNativeScanExecmatch.PlanDataInjectorregistry gains one reflectiveDeltaPlanDataInjector$slot, appended to the existinginjectorsByKindregistry (perf: O(1) PlanDataInjector lookup by op kind #4535) only when a contrib bundled the class (-Pcontrib-delta). Default builds getClassNotFoundException -> Noneand an unchanged registry. A class that is present but fails to bind (a misbuilt contrib jar) is logged, not silently swallowed.CometPlanAdaptiveDynamicPruningFiltersrewrites AQE DPP filters in place for trait scans whose filters cannot survivemakeCopy.What this part deliberately does NOT do yet
perPartitionFilePathson the trait. That member only feedsFAILED_READ_FILEerror conversion and lands in a later part, after feat: surface native parquet read failures as FAILED_READ_FILE #4536 (now merged).DeltaPlanDataInjectoron the classpath yet, so the reflective slot resolves to nothing. This part is inert.Why it is safe on default builds
With no contrib on the classpath the change is behavior-preserving. The leaf match is a proven superset of the old enumeration. The trait match catches the same
CometNativeScanExecand still drives its subquery resolution. The reflective slot resolves toNone. And the new DPP arm never fires becauseCometNativeScanExecleavesdynamicPruningFiltersempty.Verification
CometScanWithPlanDataSuite(new): trait-contract defaults plus reflective-slot graceful absence. 2/2.CometJoinSuite(native scan fusion and the DPP path): 28/28.Roadmap
This is part 1 of the breakup. Subsequent parts add the build gate and inert wiring, the Rust planning and read path, the Scala claim/decline and execution, Change Data Feed reads, the test battery, and docs. Each later part is gated behind
-Pcontrib-delta, so every intermediate state onmainis safe for default builds. Tracking umbrella: #4366.🤖 AI disclosure: this PR was prepared with assistance from Claude Code (Claude Opus 4.8), under the submitter's review and direction.