-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Thread version_data to callbacks #69185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
|
|
||
| import logging | ||
| from datetime import datetime | ||
| from typing import TYPE_CHECKING | ||
| from typing import TYPE_CHECKING, Any | ||
| from uuid import UUID | ||
|
|
||
| import sqlalchemy as sa | ||
|
|
@@ -235,3 +235,21 @@ def get_version( | |
| def version(self) -> str: | ||
| """A human-friendly representation of the version.""" | ||
| return f"{self.dag_id}-{self.version_number}" | ||
|
|
||
|
|
||
| def resolve_pinned_version_data( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor nit: should this be |
||
| dag_version: DagVersion | None, bundle_version: str | None | ||
| ) -> dict[str, Any] | None: | ||
| """ | ||
| Return a bundle version's ``version_data`` manifest, but only for pinned runs. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sam here. I would keep the first line. Leave any detailed explanations for the comments. |
||
|
|
||
| Mirrors the bundle-version pinning rule used when building task and callback | ||
| workloads: ``version_data`` is exposed only when the run is pinned | ||
| (``bundle_version`` is set) and a ``DagVersion`` is available, so the worker | ||
| initializes the bundle against the exact version the run used. Returns ``None`` | ||
| for unpinned runs (which should follow the latest bundle state) and for legacy | ||
| rows without a ``DagVersion``. | ||
| """ | ||
| if dag_version is not None and bundle_version is not None: | ||
| return dag_version.version_data | ||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -566,12 +566,18 @@ def _submit_callback_if_necessary() -> None: | |
| if event.task_instance_state in (TaskInstanceState.SUCCESS, TaskInstanceState.FAILED): | ||
| if task_instance.dag_model.relative_fileloc is None: | ||
| raise RuntimeError("relative_fileloc should not be None for a finished task") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be a reason for it, but from a quick check of the modified files, it looks like this is the only one whose tests file didn't get updated. |
||
| from airflow.models.dag_version import resolve_pinned_version_data | ||
|
|
||
| version_data = resolve_pinned_version_data( | ||
| task_instance.dag_version, task_instance.dag_run.bundle_version | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm noticing that other callback code gets But here we're doing I wonder if these two can diverge? |
||
| ) | ||
| request = TaskCallbackRequest( | ||
| filepath=task_instance.dag_model.relative_fileloc, | ||
| ti=task_instance, | ||
| task_callback_type=event.task_instance_state, | ||
| bundle_name=task_instance.dag_model.bundle_name, | ||
| bundle_version=task_instance.dag_run.bundle_version, | ||
| version_data=version_data, | ||
| ) | ||
| log.info("Sending callback: %s", request) | ||
| try: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1375,6 +1375,7 @@ def test_collect_results_processes_remaining_files_when_one_persist_fails(self, | |
| "filepath": "dag_callback_dag.py", | ||
| "bundle_name": "testing", | ||
| "bundle_version": None, | ||
| "version_data": None, | ||
| "msg": None, | ||
| "dag_id": "dag_id", | ||
| "run_id": "run_id", | ||
|
|
@@ -1967,7 +1968,28 @@ def test_prepare_callback_bundle_initializes_versioned_bundle(self, mock_bundle_ | |
| bundle.initialize.assert_called_once() | ||
|
|
||
| @mock.patch("airflow.dag_processing.manager.DagBundlesManager") | ||
| def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, mock_bundle_manager): | ||
| def test_prepare_callback_bundle_forwards_version_data(self, mock_bundle_manager): | ||
| manager = DagFileProcessorManager(max_runs=1) | ||
| bundle = MagicMock(spec=BaseDagBundle) | ||
| bundle.supports_versioning = True | ||
| mock_bundle_manager.return_value.get_bundle.return_value = bundle | ||
|
|
||
| version_data = {"schema_version": 1, "files": {"dags/my_dag.py": "ver123"}} | ||
| request = DagCallbackRequest( | ||
| filepath="file1.py", | ||
| dag_id="dag1", | ||
| run_id="run1", | ||
| is_failure_callback=False, | ||
| bundle_name="testing", | ||
| bundle_version="some_commit_hash", | ||
| version_data=version_data, | ||
| msg=None, | ||
| ) | ||
|
|
||
| manager.prepare_callback_bundle(request) | ||
| mock_bundle_manager.return_value.get_bundle.assert_called_once_with( | ||
| name="testing", version="some_commit_hash", version_data=version_data | ||
| ) | ||
| manager = DagFileProcessorManager(max_runs=1) | ||
|
Comment on lines
+1992
to
1993
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like maybe a mistake? Looks like you maybe munged two tests togther? You set |
||
| bundle = MagicMock(spec=BaseDagBundle) | ||
| bundle.supports_versioning = True | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep only the first line. The docstring under each parameter is only intended to explain what the field is. Not its inner workings.