Skip to content

[POC] Let TriggerDagRunOperator own its execution via a new accessor#69135

Draft
1fanwang wants to merge 2 commits into
apache:mainfrom
1fanwang:triggerdagrun-owns-execution
Draft

[POC] Let TriggerDagRunOperator own its execution via a new accessor#69135
1fanwang wants to merge 2 commits into
apache:mainfrom
1fanwang:triggerdagrun-owns-execution

Conversation

@1fanwang

@1fanwang 1fanwang commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

POC for the [DISCUSS] Let TriggerDagRunOperator own its execution logic thread on the dev list — opened to help the discussion assess the accessor approach, not as a merge-ready change (see Back-compat & scope below).

Why

On Airflow 3, TriggerDagRunOperator.execute() raises DagRunTriggerException and the task runner does the trigger and the synchronous wait loop. So the trigger-and-wait contract (and any crash recovery) lives in two places — the operator and the runner — and can drift. The poll half is already a first-class Task SDK accessor (ti.get_dagrun_state()); only the trigger half was missing, which is why it had to go through the exception side channel.

What

  • Add ti.trigger_dag_run(), the counterpart to ti.get_dagrun_state(). It hits the same execution-API endpoint and scoped token the runner already uses, so no new authz surface.
  • On Airflow 3.3+ non-deferrable runs, TriggerDagRunOperator does the submit and poll itself and subclasses ResumableJobMixin directly, so the durability contract lives in one place.
  • Add an opt-in durable flag (default False): on a synchronous wait_for_completion, the triggered run id is persisted before polling, so a worker crash mid-wait reconnects to the in-flight run on retry instead of triggering a duplicate.

Deferrable keeps the DagRunTriggerException path (it still needs the triggerer handoff), and Airflow < 3.3 / Airflow 2 fall back to that path too.

Tests

$ pytest providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
41 passed, 41 skipped     # the 41 skips are Airflow-2-only, skipped on 3.4
$ pytest task-sdk/tests/task_sdk/execution_time/test_task_runner.py -k trigger_dag_run_returns
2 passed
  • New unit tests for the accessor and the operator-owned path: trigger, skip/reset, sync wait success/failure, fire-and-forget, and the four durable states (persist-before-poll, reconnect, short-circuit on an already-succeeded prior run, resubmit after a failed one).
  • The existing synchronous-path tests are re-homed to assert the accessor calls instead of the raised exception (the operator now owns that execution). Deferrable / Airflow-2 tests are unchanged.

E2E — live, Airflow 3.4 standalone (LocalExecutor + Postgres)

Two DAGs: a parent TriggerDagRunOperator(wait_for_completion=True, durable=True, poke_interval=3) triggers a child whose only task sleeps 45s.

1. Clean path (no crash) — proves the operator owns its execution via the accessors; the runner's DagRunTriggerException path is never taken.

t parent task child runs
8s running (try 1) 1 (running)
48s success (try 1) 1 (success)

Parent task log shows the operator's own poll loop (trigger_dagrun.py), and the side channel is unused:

Waiting for e2e_child on manual__…920653+00:00 to reach an allowed state [SUCCESS] ...
DagRunTriggerException occurrences in the task log: 0

2. Worker SIGKILL mid-wait (durable reconnect) — the parent's worker process is kill -9ed while it polls; the scheduler retries the task.

t parent task child runs created by this parent
6s running (try 1), triggered child 1
SIGKILL the parent worker pid 1
9s up_for_retry (try 1) 1
12s running (try 2) 1 (no new trigger)
42s success (try 2) 1

On the retry the operator reconnects to the same run id instead of triggering a duplicate:

attempt=2: Reconnecting to existing job
attempt=2: Waiting for e2e_child on manual__…957584+00:00 ...   <- same run id as attempt=1
attempt=2: Triggering events: 0 | DagRunTriggerException: 0
parent task: success (try_number=2)   child dag runs created: 1  (delta=1 -> reconnected, not duplicated)
Raw state progression — crash run
child runs before crash test: 1     (left over from the clean run above)
[t=3s]  parent=(none)       try=0          | child_total=1
[t=6s]  parent=running      try=1 pid=62403| child_total=2   <- triggered + polling
>>> SIGKILL parent worker pid=62403 (simulate crash mid-wait)
[t=9s]  parent=up_for_retry try=1          | child_total=2
[t=12s] parent=running      try=2 pid=62469| child_total=2   <- retry; NO new child
[t=15s..39s] parent=running try=2          | child_total=2   <- reconnect-polling the in-flight run
[t=42s] parent=success      try=2          | child_total=2
=== child delta=1 (1=reconnect/no-dup, 2=DUPLICATE) ===
manual__2026-…T16:51:09.920653+00:00 = success   (clean run's child)
manual__2026-…T16:54:29.957584+00:00 = success   (crash run's child — single run despite the crash+retry)

Back-compat & scope

TriggerDagRunOperator is in the standard provider, which still supports Airflow 2.11 and 3.0–3.3; the accessor and ResumableJobMixin are 3.3+. So the operator owns its execution on 3.3+ and falls back to the existing DagRunTriggerException path on 3.0–3.2 and the Airflow-2 path on 2.11 — net-new duplication while those versions are supported, and durability (which needs task_state_store) applies on 3.3+ only. This is the 3.3+ end-state, not a drop-in for the full supported range. The interim-vs-end-state trade-off (and a provider min_version policy) is under discussion on the dev list — the lower-duplication interim that keeps one runner path is #68952 / #68955 (share the mixin's contract rather than have the operator own execution).

Refs

This is the accessor-based approach discussed on the [DISCUSS] thread — one option alongside the hand-rolled-in-the-runner durability in #68936 and the share-the-core refactor in #68952 / #68955. Which direction to take is still under discussion.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Airflow already exposes the dag-run poll half to task code as ti.get_dagrun_state(); the trigger half had no first-class accessor and was only reachable through the DagRunTriggerException side channel. Adding the symmetric ti.trigger_dag_run() routes a trigger through the same execution-API endpoint and scoped token the task runner already uses, so an operator can own its trigger-and-wait execution directly.
…ble reconnect

On Airflow 3, TriggerDagRunOperator.execute() raised DagRunTriggerException and the task runner did the trigger and the wait loop, so the synchronous wait-and-reconnect contract was duplicated between the operator and the runner and could drift. With the new ti.trigger_dag_run() accessor the operator does the submit and poll itself and reuses ResumableJobMixin directly, keeping that contract in one place. The opt-in durable flag persists the triggered run id before polling so a worker crash mid-wait reconnects to the in-flight run on retry instead of triggering a duplicate. Deferrable still needs the triggerer handoff, so it keeps the exception path; Airflow < 3.3 and Airflow 2 are unchanged.
@1fanwang 1fanwang force-pushed the triggerdagrun-owns-execution branch from 35cc421 to ea1d794 Compare June 29, 2026 17:25
@1fanwang 1fanwang changed the title Make TriggerDagRunOperator own its execution and add durable reconnect [POC] Let TriggerDagRunOperator own its execution via a new accessor Jun 29, 2026
@1fanwang 1fanwang marked this pull request as draft June 29, 2026 17:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant