Skip to content

Add durable option to TriggerDagRunOperator to reconnect on retry#68936

Open
1fanwang wants to merge 4 commits into
apache:mainfrom
1fanwang:durable-trigger-dagrun
Open

Add durable option to TriggerDagRunOperator to reconnect on retry#68936
1fanwang wants to merge 4 commits into
apache:mainfrom
1fanwang:durable-trigger-dagrun

Conversation

@1fanwang

@1fanwang 1fanwang commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Why

When TriggerDagRunOperator runs with wait_for_completion=True (synchronous,
non-deferrable), a worker crash while it is polling turns into a duplicate.

On retry, the operator recomputes a fresh run_id (with no logical_date/
trigger_run_id it derives one from utcnow()), so the runner triggers a
second child run instead of reconnecting to the one the first attempt
already started — or, with a fixed run_id, the retry fails with
DagRunAlreadyExists. Either way a transient worker blip mid-wait either
duplicates the downstream work or fails a task whose triggered run is healthy
and still running.

This is the same duplicate-job-on-retry problem ResumableJobMixin
(durable=True, Airflow 3.3) solves for submit-and-poll operators. The mixin
targets operators whose execute() does the submit+poll; on Airflow 3 the
trigger/wait happens in the task runner (the operator raises
DagRunTriggerException), so the same persist-then-reconnect contract is
applied where the wait actually lives.

What

Add an opt-in durable flag to TriggerDagRunOperator (default False, no
behavior change). When durable=True and waiting synchronously:

  • the triggered run_id is persisted to task_state_store before polling
    starts;
  • on retry the runner reads it back and:
    • reconnects and resumes the wait if the run is still active;
    • returns success without resubmitting if it already finished in an
      allowed state;
    • triggers a fresh run if the prior one failed, is gone, or its state is
      unreadable.

Scoped to the synchronous wait; the deferrable path is unchanged and left for a
follow-up. Crash recovery is silently disabled when task_state_store is
unavailable (degrades to today's behavior).

Tests

  • Four unit tests covering the contract: persist-before-poll, reconnect to a
    running prior run, short-circuit on an already-succeeded prior run, resubmit
    after a failed prior run.
  • Existing TriggerDagRunOperator tests are untouched and pass — opt-in means
    no behavior change on the default path.

End-to-end (live, Breeze built from main)

A parent TriggerDagRunOperator(wait_for_completion=True) triggers a child Dag
that sleeps; the parent's worker is SIGKILLed while it polls, and the scheduler
retries the task.

child runs from one parent task parent task on retry
durable=False (today) 2 — a duplicate child run re-triggers a fresh run
durable=True (this PR) 1 reconnects to the in-flight run, succeeds
Raw before / after
BEFORE (durable=False) — worker killed mid-wait, scheduler retried (try_number=2):
  child dag runs:
    manual__2026-...T08:17:47Z  success   <- attempt 1
    manual__2026-...T08:19:46Z  success   <- attempt 2, DUPLICATE
  => 2 runs from one parent task

AFTER (durable=True) — same crash, try_number=2:
  attempt=2.log: "Reconnecting to run triggered on a prior attempt."
                 run_id=manual__2026-...T08:53:51Z
  parent run: success   (trigger_child succeeded on try_number=2)
  child dag runs:
    manual__2026-...T08:53:51Z  success   <- single run, reconnected
  => 1 run from one parent task

Risk

Opt-in, so the default path is unchanged. The new path adds two
task_state_store round-trips (one read, one write) per durable run, only when
durable=True. A run deleted between attempts is treated as "resubmit fresh".
The submit→persist gap is the same small window documented on ResumableJobMixin:
a crash between triggering and persisting falls back to a fresh trigger on retry.

Open question

Default is False to preserve behavior; ResumableJobMixin defaults durable=True.
Worth deciding whether this should eventually default on for synchronous waits.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Related, from the dev-list thread "Let TriggerDagRunOperator own its execution logic" — three ways to make the synchronous wait crash-safe:

Still open which way to go, and whether to revisit a min_version floor for providers.

1fanwang added 4 commits June 24, 2026 11:43
With wait_for_completion the trigger-and-wait runs in the task runner. A worker
crash while polling makes the retry recompute a fresh run_id and trigger a
duplicate child run (or fail with DagRunAlreadyExists), even though the run the
first attempt started is healthy and still running. The opt-in durable flag
persists the triggered run_id to task_state_store before polling, so the retry
reconnects to the in-flight run instead of resubmitting.

@seanghaeli seanghaeli left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, definitely a follow-up to handle deferrable=True will be useful

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 25, 2026
@raphaelauv

raphaelauv commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

It looks like the run_id is not save before trggering the dag_run

so in the worst case :

  1. the dag_run is trigger
  2. the worker hard crash
  3. run_id is never save ( because it's only saved, if the code _handle_trigger_dag_run is run )

is this the case ? thanks

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your efforts on this @1fanwang, requesting changes for the time being to avoid accidental merge till we arrive at a consensus on https://lists.apache.org/thread/q5b7qsjotop023smr1521o2wfmdyrlhr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

5 participants