Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class GridRunsResponse(BaseModel):
run_type: DagRunType
dag_versions: list[DagVersionResponse] = []
has_missed_deadline: bool
has_note: bool

@computed_field
def duration(self) -> float:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2976,6 +2976,9 @@ components:
has_missed_deadline:
type: boolean
title: Has Missed Deadline
has_note:
type: boolean
title: Has Note
duration:
type: number
title: Duration
Expand All @@ -2991,6 +2994,7 @@ components:
- state
- run_type
- has_missed_deadline
- has_note
- duration
title: GridRunsResponse
description: Base Node serializer for responses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
task_group_to_dict_grid,
)
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
from airflow.models.dagrun import DagRun, DagRunNote
from airflow.models.deadline import Deadline
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
Expand Down Expand Up @@ -291,8 +291,9 @@ def get_grid_runs(
.correlate(DagRun)
.label("has_missed_deadline")
)
has_note_subq = exists().where(DagRunNote.dag_run_id == DagRun.id).correlate(DagRun).label("has_note")
base_query = (
select(DagRun, has_missed_deadline)
select(DagRun, has_missed_deadline, has_note_subq)
.where(DagRun.dag_id == dag_id)
.options(
load_only(
Expand Down Expand Up @@ -329,10 +330,10 @@ def get_grid_runs(
return_total_entries=False,
)
results = session.execute(dag_runs_select_filter).unique().all()
dag_runs = [run for run, _ in results]
dag_runs = [run for run, _, _ in results]
attach_dag_versions_to_runs(dag_runs, session=session)
grid_runs = []
for run, has_missed in results:
for run, has_missed, has_note in results:
grid_runs.append(
GridRunsResponse.model_validate(
{
Expand All @@ -346,6 +347,7 @@ def get_grid_runs(
"run_type": run.run_type,
"dag_versions": run.dag_versions,
"has_missed_deadline": has_missed,
"has_note": has_note,
}
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9802,14 +9802,18 @@ export const $GridRunsResponse = {
type: 'boolean',
title: 'Has Missed Deadline'
},
has_note: {
type: 'boolean',
title: 'Has Note'
},
duration: {
type: 'number',
title: 'Duration',
readOnly: true
}
},
type: 'object',
required: ['dag_id', 'run_id', 'queued_at', 'start_date', 'end_date', 'run_after', 'state', 'run_type', 'has_missed_deadline', 'duration'],
required: ['dag_id', 'run_id', 'queued_at', 'start_date', 'end_date', 'run_after', 'state', 'run_type', 'has_missed_deadline', 'has_note', 'duration'],
title: 'GridRunsResponse',
description: 'Base Node serializer for responses.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2494,6 +2494,7 @@ export type GridRunsResponse = {
run_type: DagRunType;
dag_versions?: Array<DagVersionResponse>;
has_missed_deadline: boolean;
has_note: boolean;
readonly duration: number;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import {
type GridRunWithVersionFlags,
} from "./useGridRunsWithVersionFlags";

const NOTE_GRADIENT =
"linear-gradient(45deg, var(--chakra-colors-color-palette-solid) 92%, var(--chakra-colors-color-palette-emphasized) 92%)";

type Props = {
readonly max: number;
readonly onClick?: () => void;
Expand Down Expand Up @@ -83,6 +86,7 @@ export const Bar = ({ max, onClick, run, showVersionIndicatorMode }: Props) => {
<GridButton
alignItems="center"
color="fg"
colorPalette={run.state ?? "none"}
dagId={dagId}
duration={run.duration}
flexDir="column"
Expand All @@ -93,6 +97,7 @@ export const Bar = ({ max, onClick, run, showVersionIndicatorMode }: Props) => {
runId={run.run_id}
searchParams={search}
state={run.state}
style={run.has_note ? { background: NOTE_GRADIENT } : undefined}
zIndex={1}
>
{run.run_type !== "scheduled" && <RunTypeIcon color="white" runType={run.run_type} size="10px" />}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DBDagBag
from airflow.models.dagrun import DagRun, DagRunNote
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
Expand Down Expand Up @@ -76,6 +77,7 @@
"duration": 283996800.0,
"end_date": "2024-12-31T00:00:00Z",
"has_missed_deadline": False,
"has_note": False,
"run_after": "2024-11-30T00:00:00Z",
"run_id": "run_1",
"run_type": "scheduled",
Expand All @@ -97,6 +99,7 @@
"duration": 283996800.0,
"end_date": "2024-12-31T00:00:00Z",
"has_missed_deadline": False,
"has_note": False,
"run_after": "2024-11-30T00:00:00Z",
"run_id": "run_2",
"run_type": "manual",
Expand Down Expand Up @@ -643,6 +646,19 @@ def test_get_grid_runs(self, session, test_client):
assert response.status_code == 200
assert _strip_dag_version_ids(response.json()) == [GRID_RUN_1, GRID_RUN_2]

def test_get_grid_runs_has_note(self, session, test_client):
"""has_note is True when a DagRunNote exists for a dag run, False otherwise."""
run_1 = session.scalar(select(DagRun).where(DagRun.dag_id == DAG_ID, DagRun.run_id == "run_1"))
run_1.dag_run_note = DagRunNote(content="a note on run_1")
session.commit()

response = test_client.get(f"/grid/runs/{DAG_ID}")
assert response.status_code == 200
by_run_id = {run["run_id"]: run for run in response.json()}

assert by_run_id["run_1"]["has_note"] is True
assert by_run_id["run_2"]["has_note"] is False

def test_get_grid_runs_multiple_dag_versions(self, session, test_client):
# run_5_2 is created after version 2 exists, so its task instances run on version 2.
# Reassign one of them to version 1 so the run spans two versions.
Expand Down