Reuse the resumable core for TriggerDagRunOperator's durable wait#68955
Open
1fanwang wants to merge 7 commits into
Open
Reuse the resumable core for TriggerDagRunOperator's durable wait#689551fanwang wants to merge 7 commits into
1fanwang wants to merge 7 commits into
Conversation
With wait_for_completion the trigger-and-wait runs in the task runner. A worker crash while polling makes the retry recompute a fresh run_id and trigger a duplicate child run (or fail with DagRunAlreadyExists), even though the run the first attempt started is healthy and still running. The opt-in durable flag persists the triggered run_id to task_state_store before polling, so the retry reconnects to the in-flight run instead of resubmitting.
TriggerDagRunOperator's durable wait lives in the task runner (it raises DagRunTriggerException and is polled there), not in execute(), so it cannot use ResumableJobMixin and re-implements the persist-and-reconnect logic by hand. Lift the mixin's core into a standalone resume_or_submit() so the same implementation can be driven from the runner now and the triggerer later, instead of duplicated per integration point.
The durable synchronous wait previously hand-rolled the persist-and-reconnect logic in the task runner, duplicating what ResumableJobMixin already implements. Drive the shared resume_or_submit core with runner callbacks instead, so the durability primitive has one implementation across the operator (mixin) and the runner.
806c55a to
3f5c29c
Compare
Member
|
@1fanwang — the check-newsfragment-pr-number check is failing: the newsfragment file needs to match this PR's number. Rename it to See the PR quality criteria. Automated first-pass triage note drafted by an AI-assisted tool — may get things wrong; once addressed, a real Apache Airflow maintainer takes the next look. (why automated) Drafted-by: Claude Code (Opus 4.8); reviewed by @potiuk before posting |
The newsfragment number (68936) does not match this PR (68955), so check-newsfragment-pr-number fails. The durable-flag note belongs with the PR that lands the feature, not this stacked change.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
#68936 added a crash-safe
durablewait toTriggerDagRunOperator, but it hand-rolled thepersist-and-reconnect logic in the task runner — a duplicate of what
ResumableJobMixinalreadyimplements. #68952 lifted that logic into a reusable
resume_or_submitcore. This PR makes therunner consume that core, so the durability primitive has one implementation instead of two.
What
_handle_trigger_dag_run's durable path now drivesresume_or_submitthrough runner callbacks —submit(sendTriggerDagRun, raising onDagRunAlreadyExists),get_status(GetDagRunState),poll(the wait loop, raising on a failed state so the retry policy still fires),get_result(the run-id XCom). The hand-rolled
_evaluate_prior_triggered_runand the inline persist/decisionare deleted. No behaviour change.
This is the proof that the #68952 extraction generalises: one operator that fits
ResumableJobMixindirectly (Spark, Livy) and one whose wait lives in the runner (TriggerDagRun) now share the same
durability core.
Tests
The full
TriggerDagRunOperatorrunner suite passes unchanged — thedurablereconnect tests nowexercise the
resume_or_submitpath, and the non-durable / deferrable / conflict tests areuntouched. That unchanged suite is the behaviour-preservation proof.
End-to-end (live, Breeze, on this PR's code)
Same crash scenario as #68936, re-run on the refactored code: a parent
TriggerDagRunOperator(durable=True, wait_for_completion=True)triggers a child Dag; the parent'sworker is
SIGKILLed mid-wait; the scheduler retries. Attempt 2 reconnects through the sharedcore — the log line is
resume_or_submit's own "Reconnecting to existing job", which proves thedurable path now runs the framework primitive rather than the deleted hand-rolled logic.
Raw
Risk
Behaviour-preserving refactor. The durable path's persist + three-state reconnect is now the shared
core (covered by #68952's tests); the runner only supplies the bindings.
Was generative AI tooling used to co-author this PR?
Related, from the dev-list thread "Let TriggerDagRunOperator own its execution logic" — three ways to make the synchronous wait crash-safe:
ResumableJobMixin's core out so the runner drives the wait through one shared contract — works on 2.11 and 3.0-3.3.ti.trigger_dag_run()accessor so the operator owns its execution and uses the mixin directly — 3.3+, falls back below.Still open which way to go, and whether to revisit a
min_versionfloor for providers.