test(workflow-operator): add unit test coverage for interval-join, sleep, and cache-source executors#6098
test(workflow-operator): add unit test coverage for interval-join, sleep, and cache-source executors#6098aglinxinyuan wants to merge 2 commits into
Conversation
…eep, and cache-source executors - IntervalJoinOpExec: extend the existing spec to cover the YEAR/MONTH/HOUR/MINUTE/SECOND timestamp interval units and the unsupported-key-type WorkflowRuntimeException - SleepOpExec: new spec (construct + pass-through processTuple) - CacheSourceOpExec: new spec covering the non-result-URI and non-vfs-URI rejections (the Iceberg-backed read path is integration-only)
Automated Reviewer SuggestionsBased on the
|
There was a problem hiding this comment.
Pull request overview
This PR increases unit test coverage in common/workflow-operator by adding/expanding ScalaTest specs for three operator executors (Interval Join, Sleep, Cache Source), without changing production code. The goal is to exercise previously uncovered branches identified by Codecov and to pin constructor/validation behavior at the unit-test level where possible.
Changes:
- Extend
IntervalOpExecSpecto cover additional timestamp interval units and unsupported key-type error behavior. - Add
SleepOpExecSpecto cover JSON-based construction andprocessTuplepass-through behavior whensleepTime = 0. - Add
CacheSourceOpExecSpecto cover constructor validation errors for invalid/non-result and non-VFS URIs.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/cache/CacheSourceOpExecSpec.scala | Adds unit tests for constructor URI validation error cases in CacheSourceOpExec. |
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpExecSpec.scala | Adds unit tests for SleepOpExec deserialization and tuple pass-through behavior. |
| common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intervalJoin/IntervalOpExecSpec.scala | Extends interval-join executor tests to cover additional interval units and unsupported key-type rejection. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #6098 +/- ##
============================================
+ Coverage 59.06% 59.14% +0.07%
- Complexity 3203 3208 +5
============================================
Files 1132 1132
Lines 43674 43674
Branches 4734 4734
============================================
+ Hits 25797 25831 +34
+ Misses 16448 16404 -44
- Partials 1429 1439 +10
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 392 | 0.24 | 24,147/35,416/35,416 us | 🔴 +16.6% / 🔴 +131.0% |
| 🔴 | bs=100 sw=10 sl=64 | 805 | 0.492 | 124,959/137,033/137,033 us | 🟢 -10.5% / 🔴 +26.5% |
| ⚪ | bs=1000 sw=10 sl=64 | 916 | 0.559 | 1,090,013/1,137,275/1,137,275 us | ⚪ within ±5% / 🔴 +10.1% |
Baseline details
Latest main 4c9d30a from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 392 tuples/sec | 426 tuples/sec | 772.08 tuples/sec | -8.0% | -49.2% |
| bs=10 sw=10 sl=64 | MB/s | 0.24 MB/s | 0.26 MB/s | 0.471 MB/s | -7.7% | -49.1% |
| bs=10 sw=10 sl=64 | p50 | 24,147 us | 20,701 us | 12,745 us | +16.6% | +89.5% |
| bs=10 sw=10 sl=64 | p95 | 35,416 us | 37,635 us | 15,330 us | -5.9% | +131.0% |
| bs=10 sw=10 sl=64 | p99 | 35,416 us | 37,635 us | 19,054 us | -5.9% | +85.9% |
| bs=100 sw=10 sl=64 | throughput | 805 tuples/sec | 825 tuples/sec | 982.64 tuples/sec | -2.4% | -18.1% |
| bs=100 sw=10 sl=64 | MB/s | 0.492 MB/s | 0.504 MB/s | 0.6 MB/s | -2.4% | -18.0% |
| bs=100 sw=10 sl=64 | p50 | 124,959 us | 118,251 us | 101,961 us | +5.7% | +22.6% |
| bs=100 sw=10 sl=64 | p95 | 137,033 us | 153,154 us | 108,335 us | -10.5% | +26.5% |
| bs=100 sw=10 sl=64 | p99 | 137,033 us | 153,154 us | 114,379 us | -10.5% | +19.8% |
| bs=1000 sw=10 sl=64 | throughput | 916 tuples/sec | 923 tuples/sec | 1,013 tuples/sec | -0.8% | -9.6% |
| bs=1000 sw=10 sl=64 | MB/s | 0.559 MB/s | 0.564 MB/s | 0.618 MB/s | -0.9% | -9.6% |
| bs=1000 sw=10 sl=64 | p50 | 1,090,013 us | 1,082,552 us | 993,573 us | +0.7% | +9.7% |
| bs=1000 sw=10 sl=64 | p95 | 1,137,275 us | 1,109,389 us | 1,032,489 us | +2.5% | +10.1% |
| bs=1000 sw=10 sl=64 | p99 | 1,137,275 us | 1,109,389 us | 1,065,526 us | +2.5% | +6.7% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,509.58,200,128000,392,0.240,24147.14,35416.09,35416.09
1,100,10,64,20,2483.37,2000,1280000,805,0.492,124959.38,137033.27,137033.27
2,1000,10,64,20,21826.01,20000,12800000,916,0.559,1090013.06,1137274.75,1137274.75…ec specs - IntervalOpExec: wrap the per-unit loop body in try/finally so the executor is always closed - SleepOpExec: drop the unnecessary asInstanceOf[Tuple] casts and compare emitted tuples directly
What changes were proposed in this PR?
Add unit test coverage for three
workflow-operatorexecutors, selected from the Codecov report. No production-code changes.IntervalJoinOpExec.scalaWorkflowRuntimeExceptionSleepOpExec.scalaprocessTuple(sleepTime 0)CacheSourceOpExec.scalaThe
CacheSourceOpExecresult-storage read path needs a live Iceberg catalog and stays in the amber integration suite. Likewise theIntervalJoincase Noneinterval arm requires a JSON payload with the field absent (Jackson maps an explicitNonetoSome(null)), so it is left to integration.Any related issues, documentation, discussions?
Follow-up to the review feedback on #6043: prioritize tests that fill uncovered code paths.
How was this PR tested?
sbt "WorkflowOperator/testOnly *IntervalOpExecSpec *SleepOpExecSpec *CacheSourceOpExecSpec"— 19 tests, all greensbt "WorkflowOperator/Test/scalafmtCheck"andsbt "WorkflowOperator/scalafixAll --check"— cleanWas this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])