Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import os
import traceback
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any, Literal, cast

import yaml
from openlineage.client import OpenLineageClient, set_producer
Expand Down Expand Up @@ -50,12 +50,14 @@
get_dag_job_dependency_facet,
get_processing_engine_facet,
)
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from datetime import datetime

from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
from airflow.sdk.execution_time.secrets_masker import SecretsMasker, _secrets_masker
from airflow.utils.state import DagRunState
else:
Expand Down Expand Up @@ -172,10 +174,24 @@ def emit(self, event: RunEvent):
event_type = event.eventType.value.lower() if event.eventType else ""
transport_type = f"{self._client.transport.kind}".lower()

team_name = None

facets = event.run.facets or {}
airflow_facet = cast("AirflowRunFacet | None", facets.get("airflow"))

if airflow_facet:
team_name = airflow_facet.dagRun.get("dag_team_name")

try:
with Stats.timer(
"ol.emit.attempts",
tags={"event_type": event_type, "transport_type": transport_type},
tags=prune_dict(
{
"event_type": event_type,
"transport_type": transport_type,
"team_name": team_name,
}
),
):
self._client.emit(redacted_event)
self.log.info(
Expand All @@ -184,7 +200,11 @@ def emit(self, event: RunEvent):
event.run.runId,
)
except Exception as e:
Stats.incr("ol.emit.failed")
Stats.incr(
"ol.emit.failed",
tags=prune_dict({"team_name": team_name}),
)

self.log.warning(
"Failed to emit OpenLineage `%s` event of id `%s` with the following exception: `%s`",
event_type.upper(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.providers.openlineage.utils.utils import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS,
DagRunInfo,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_job_facet,
Expand All @@ -57,6 +58,7 @@
print_warning,
)
from airflow.settings import configure_orm
from airflow.utils.helpers import prune_dict
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
Expand Down Expand Up @@ -241,9 +243,19 @@ def on_running():
if not doc:
doc, doc_type = get_dag_documentation(dag)

team_name = None
team_name = DagRunInfo.team_name(dagrun)

if controls.extract_operator_metadata:
with Stats.timer(
"ol.extract", tags={"event_type": event_type, "operator_name": operator_name}
"ol.extract",
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand All @@ -257,6 +269,7 @@ def on_running():
"Skipping OpenLineage operator metadata extraction for task `%s` due to emission_policy.",
task_instance.task_id,
)

task_metadata = OperatorLineage()

redacted_event = self.adapter.start_task(
Expand Down Expand Up @@ -291,10 +304,23 @@ def on_running():
},
)
event_size = len(Serde.to_json(redacted_event).encode("utf-8"))

airflow_facet = redacted_event.run.facets.get("airflow")
team_name = None

if airflow_facet:
team_name = getattr(airflow_facet.dagRun, "dag_team_name", None)

Stats.gauge(
"ol.event.size",
event_size,
tags={"event_type": event_type, "operator_name": operator_name},
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
)

self._execute(on_running, "on_running", use_fork=True)
Expand Down Expand Up @@ -386,9 +412,19 @@ def on_success():
if not doc:
doc, doc_type = get_dag_documentation(dag)

team_name = None
team_name = DagRunInfo.team_name(dagrun)

if controls.extract_operator_metadata:
with Stats.timer(
"ol.extract", tags={"event_type": event_type, "operator_name": operator_name}
"ol.extract",
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand All @@ -402,6 +438,7 @@ def on_success():
"Skipping OpenLineage operator metadata extraction for task `%s` due to emission_policy.",
task_instance.task_id,
)

task_metadata = OperatorLineage()

redacted_event = self.adapter.complete_task(
Expand Down Expand Up @@ -435,10 +472,23 @@ def on_success():
},
)
event_size = len(Serde.to_json(redacted_event).encode("utf-8"))

airflow_facet = redacted_event.run.facets.get("airflow")
team_name = None

if airflow_facet:
team_name = getattr(airflow_facet.dagRun, "dag_team_name", None)

Stats.gauge(
"ol.event.size",
event_size,
tags={"event_type": event_type, "operator_name": operator_name},
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
)

self._execute(on_success, "on_success", use_fork=True)
Expand Down Expand Up @@ -545,9 +595,19 @@ def on_failure():
if not doc:
doc, doc_type = get_dag_documentation(dag)

team_name = None
team_name = DagRunInfo.team_name(dagrun)

if controls.extract_operator_metadata:
with Stats.timer(
"ol.extract", tags={"event_type": event_type, "operator_name": operator_name}
"ol.extract",
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand Down Expand Up @@ -595,10 +655,23 @@ def on_failure():
},
)
event_size = len(Serde.to_json(redacted_event).encode("utf-8"))

airflow_facet = redacted_event.run.facets.get("airflow")
team_name = None

if airflow_facet:
team_name = getattr(airflow_facet.dagRun, "dag_team_name", None)

Stats.gauge(
"ol.event.size",
event_size,
tags={"event_type": event_type, "operator_name": operator_name},
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
)

self._execute(on_failure, "on_failure", use_fork=True)
Expand Down Expand Up @@ -681,9 +754,19 @@ def on_skipped():
if not doc:
doc, doc_type = get_dag_documentation(dag)

team_name = None
team_name = DagRunInfo.team_name(dagrun)

if controls.extract_operator_metadata:
with Stats.timer(
"ol.extract", tags={"event_type": event_type, "operator_name": operator_name}
"ol.extract",
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand Down Expand Up @@ -730,10 +813,23 @@ def on_skipped():
},
)
event_size = len(Serde.to_json(redacted_event).encode("utf-8"))

airflow_facet = redacted_event.run.facets.get("airflow")
team_name = None

if airflow_facet:
team_name = getattr(airflow_facet.dagRun, "dag_team_name", None)

Stats.gauge(
"ol.event.size",
event_size,
tags={"event_type": event_type, "operator_name": operator_name},
tags=prune_dict(
{
"event_type": event_type,
"operator_name": operator_name,
"team_name": team_name,
}
),
)

self._execute(on_skipped, "on_skipped", use_fork=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from contextlib import suppress
from functools import wraps
from importlib import metadata
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

import attrs
from openlineage.client.facet_v2 import (
Expand All @@ -50,6 +50,7 @@
BaseOperator,
BaseSensorOperator,
MappedOperator,
conf as airflow_conf,
)
from airflow.providers.openlineage import (
__version__ as OPENLINEAGE_PROVIDER_VERSION,
Expand All @@ -72,6 +73,7 @@
from airflow.providers.openlineage.version_compat import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS,
AIRFLOW_V_3_3_PLUS,
get_base_airflow_version_tuple,
)
from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
Expand All @@ -86,6 +88,9 @@
if not AIRFLOW_V_3_0_PLUS:
from airflow.utils.session import NEW_SESSION, provide_session

if AIRFLOW_V_3_3_PLUS:
from airflow.models.dagbundle import DagBundleModel

if TYPE_CHECKING:
from typing import TypeAlias

Expand Down Expand Up @@ -980,9 +985,12 @@ class DagRunInfo(InfoJsonEncodable):
"dag_bundle_version": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "bundle_version"),
"dag_version_id": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "version_id"),
"dag_version_number": lambda dagrun: DagRunInfo.dag_version_info(dagrun, "version_number"),
"dag_team_name": lambda dagrun: DagRunInfo.team_name(dagrun) if AIRFLOW_V_3_3_PLUS else None,
"deadlines": lambda dagrun: DagRunInfo.deadlines(dagrun),
}

_team_name_cache: ClassVar[dict[str, str | None]] = {}

@classmethod
def duration(cls, dagrun: DagRun) -> float | None:
if not getattr(dagrun, "end_date", None) or not isinstance(dagrun.end_date, datetime.datetime):
Expand Down Expand Up @@ -1053,6 +1061,21 @@ def dag_version_info(cls, dagrun: DagRun, key: str) -> str | int | None:
return current_version.version_number
raise ValueError(f"Unsupported key: {key}`")

@classmethod
def team_name(cls, dagrun: DagRun) -> str | None:
"""Extract the team name for the DagRun."""
if not AIRFLOW_V_3_3_PLUS or not airflow_conf.getboolean("core", "multi_team", fallback=False):
return None

bundle_name = cls.dag_version_info(dagrun, "bundle_name")
if not isinstance(bundle_name, str):
return None

if bundle_name not in cls._team_name_cache:
cls._team_name_cache[bundle_name] = DagBundleModel.get_team_name(bundle_name)

return cls._team_name_cache[bundle_name]


class TaskInstanceInfo(InfoJsonEncodable):
"""Defines encoding TaskInstance object to JSON."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:

AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)


__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_2_PLUS"]
__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_2_PLUS", "AIRFLOW_V_3_3_PLUS"]
Loading
Loading