feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900aglinxinyuan wants to merge 5 commits into
Conversation
…ant) Extend the cross-region State materialization format from 1 column (content) to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri. The loop bookkeeping is promoted to first-class columns (never inside the content JSON), and the transport carries them: OutputManager state writer + emit, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the loop columns to 0/"", so every existing non-loop caller is unchanged, and fromTuple/from_tuple read only the content column. The columns activate only once the loop operators set them (follow-up PR). State materialization is intra-execution (execution-scoped iceberg URI, recreated fresh each run), so no backward-compatible read of old 1-column data is needed. Extracted from apache#5700 (loop operators); part of apache#4442.
Automated Reviewer SuggestionsBased on the
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5900 +/- ##
============================================
+ Coverage 54.11% 55.44% +1.33%
- Complexity 2819 3024 +205
============================================
Files 1103 1114 +11
Lines 42650 43065 +415
Branches 4588 4632 +44
============================================
+ Hits 23079 23879 +800
+ Misses 18226 17775 -451
- Partials 1345 1411 +66
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 396 | 0.242 | 24,315/38,967/38,967 us | 🔴 +22.5% / 🔴 +158.6% |
| 🟢 | bs=100 sw=10 sl=64 | 966 | 0.59 | 103,802/113,152/113,152 us | 🟢 -17.2% / 🔴 +5.2% |
| 🔴 | bs=1000 sw=10 sl=64 | 1,050 | 0.641 | 951,698/1,024,360/1,024,360 us | 🔴 +8.4% / ⚪ within ±5% |
Baseline details
Latest main 8803d08 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 396 tuples/sec | 454 tuples/sec | 777.62 tuples/sec | -12.8% | -49.1% |
| bs=10 sw=10 sl=64 | MB/s | 0.242 MB/s | 0.277 MB/s | 0.475 MB/s | -12.6% | -49.0% |
| bs=10 sw=10 sl=64 | p50 | 24,315 us | 19,841 us | 12,612 us | +22.5% | +92.8% |
| bs=10 sw=10 sl=64 | p95 | 38,967 us | 33,213 us | 15,070 us | +17.3% | +158.6% |
| bs=10 sw=10 sl=64 | p99 | 38,967 us | 33,213 us | 18,360 us | +17.3% | +112.2% |
| bs=100 sw=10 sl=64 | throughput | 966 tuples/sec | 957 tuples/sec | 988.31 tuples/sec | +0.9% | -2.3% |
| bs=100 sw=10 sl=64 | MB/s | 0.59 MB/s | 0.584 MB/s | 0.603 MB/s | +1.0% | -2.2% |
| bs=100 sw=10 sl=64 | p50 | 103,802 us | 103,334 us | 101,066 us | +0.5% | +2.7% |
| bs=100 sw=10 sl=64 | p95 | 113,152 us | 136,621 us | 107,594 us | -17.2% | +5.2% |
| bs=100 sw=10 sl=64 | p99 | 113,152 us | 136,621 us | 115,830 us | -17.2% | -2.3% |
| bs=1000 sw=10 sl=64 | throughput | 1,050 tuples/sec | 1,123 tuples/sec | 1,019 tuples/sec | -6.5% | +3.0% |
| bs=1000 sw=10 sl=64 | MB/s | 0.641 MB/s | 0.685 MB/s | 0.622 MB/s | -6.4% | +3.0% |
| bs=1000 sw=10 sl=64 | p50 | 951,698 us | 891,907 us | 986,982 us | +6.7% | -3.6% |
| bs=1000 sw=10 sl=64 | p95 | 1,024,360 us | 944,812 us | 1,028,491 us | +8.4% | -0.4% |
| bs=1000 sw=10 sl=64 | p99 | 1,024,360 us | 944,812 us | 1,058,493 us | +8.4% | -3.2% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,505.24,200,128000,396,0.242,24315.28,38967.08,38967.08
1,100,10,64,20,2069.55,2000,1280000,966,0.590,103801.78,113152.11,113152.11
2,1000,10,64,20,19048.83,20000,12800000,1050,0.641,951697.90,1024360.22,1024360.22There was a problem hiding this comment.
Pull request overview
This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.
Changes:
- Extend Scala/Python
Stateschemas andtoTuple/to_tuplewriters to emit the 4-column state tuple with defaults for non-loop callers. - Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on
StateFrame. - Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala | Updates state tuple tests to align with the new toTuple() signature and schema. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala | Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Updates Scala state persistence path to call state.toTuple(). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala | Updates Scala→Python Arrow send path to call state.toTuple(). |
| amber/src/main/python/core/models/state.py | Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...). |
| amber/src/main/python/core/models/payload.py | Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values). |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic. |
| amber/src/main/python/core/runnables/network_sender.py | Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns). |
| amber/src/main/python/core/runnables/network_receiver.py | Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames. |
| amber/src/test/python/core/models/test_state.py | Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON. |
| amber/src/test/python/core/runnables/test_network_receiver.py | Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization. |
| amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames. |
| amber/src/test/python/core/architecture/packaging/test_output_manager.py | Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings. |
| amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py | Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
I reviewed the part of the loop-split change that handles the state format. The main logic looks correct:
- State only lives within a single execution, so no pre-existing single-column state can reach the new four-column reader. I checked this across the restart, fault-recovery, and result-reuse paths.
- The non-loop defaults keep existing behavior unchanged.
- The Python and Scala four-column schemas agree on names, order, and types.
The comments below are about test coverage and two design points. I'm not recording an approve / request-changes decision.
One accuracy note on the description: it lists only to_tuple/toTuple and two OutputManager methods, but the change also adds the three fields to the Python StateFrame and widens the wire format to four columns. Behavior is unchanged; just worth stating accurately.
…ip tests Address review feedback on apache#5900: - Add `State.to_columns`, the single column-name -> value mapping for the State wire/storage format, and route both `to_tuple` (iceberg) and the network sender's StateFrame branch through it, so adding a column is a one-line change rather than an edit in every serializer. - e2e materialization test and StateSpec now round-trip all three loop columns (loop_counter, loop_start_id, loop_start_state_uri) with non-default values, not just loop_counter, so a regression in any single column's plumbing is caught. - Document why the e2e deliberately uses a hermetic sqlite catalog while the other iceberg tests use postgres/REST.
There was a problem hiding this comment.
Approving. I verified the four code fixes are in 74eef1d / db66a7f: the e2e now round-trips all three loop columns through real storage with non-default values, StateSpec asserts them via getField, the column mapping now lives in one place (State.to_columns), and the sqlite-catalog difference is documented. The state-format change is correct and isn't active on main yet.
One thing is still open, though: whether loop_start_state_uri belongs in State (see the thread on state.py). content, loop_counter, and loop_start_id aren't in question, but the URI is a write address that stays the same across the whole loop. If that thread lands on 'not in State', it's better to drop the column from this PR than to ship it now and remove it later.
Per review feedback on apache#5900: `loop_start_state_uri` is a write address that stays constant across the whole loop -- it's loop config, not per-iteration State data, and the design landed on delivering it to LoopEnd at setup rather than carrying it in State. Rather than ship the dormant column now and remove it later, drop it here. Reduces the materialized State format from 4 columns to 3 (`content`, `loop_counter`, `loop_start_id`). Removes the column and schema entry, the `StateFrame` field, and the transport/serialization plumbing (network sender/receiver, materialization reader, OutputManager save/emit, Scala `toTuple`), plus the corresponding test coverage. `content` / `loop_counter` / `loop_start_id` are unchanged.
What changes were proposed in this PR?
Extends the cross-region State materialization format from a single
contentcolumn to 3 columns —content,loop_counter,loop_start_id— promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: theOutputManagerstate writer +emit_state, the Python network sender/receiver, the materialization reader, and the Scalastate.toTuplecall sites. In memory the two loop fields ride on theStateFrameenvelope; they are materialized/serialized as their own columns (parallel tocontent), andfrom_tuple/fromTupleread onlycontentback into theState.The loop-back write address (LoopStart's input-port URI) is intentionally not carried in State. It's constant per-execution config, not per-iteration data, so it will be delivered to Loop End at setup in the loop PR rather than round-tripping through every State row. (An earlier revision of this PR carried a
loop_start_state_uricolumn; it was dropped after review — better than shipping a dormant column and removing it later.)On the Python side the column-name → value mapping is defined once in
State.to_columnsand reused by bothto_tuple(iceberg) and the network sender'sStateFramebranch, so adding a column later is a single-line change in one place rather than an edit in every serializer.Dormant on
main— nothing observable changes without the loop operators:to_tuple()/toTuple()andOutputManager.save_state_to_storage_if_needed/emit_statedefault the two loop columns to0/"", so every existing non-loop caller is unchanged.from_tuple/fromTupleread only thecontentcolumn, so round-trip identity is preserved and the extra columns are inert.No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (
…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 3-column reader.This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).
Any related issues, documentation, discussions?
Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").
How was this PR tested?
test_state.py(loop columns are their own columns, never in content JSON, default to0/""), ScalaStateSpec(both loop columns round-trip through a tuple with non-default values, not justcontent),ArrowUtilsSpec(3-column Arrow vector round-trip),IcebergDocumentSpec(iceberg state-doc round-trip).test_network_receiver.py,test_input_port_materialization_reader_runnable.py, andtest_state_materialization_e2e.py— the e2e (hermetic sqlite catalog) writes non-default values for both loop columns end-to-end and asserts they replay both on theStateFrameand on the raw iceberg row, exercising the real Tuple/Schema/iceberg path.test_output_manager.py::test_defaults_loop_columns_when_omittedpins that a no-loop caller (noloop_counter) still produces a valid 3-column tuple with the loop columns at0/"".workflow-core+ambercompile;StateSpec+ArrowUtilsSpecpass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpecneeds the iceberg catalog backend, so it runs in CI.)Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.