Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,26 @@ class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
``UPDATE/MERGE/DELETE statement over table ... would affect rows in the
streaming buffer`` errors.

.. warning::
The sensor reads ``table.streaming_buffer`` from BigQuery's table
metadata, which is eventually consistent. For a short window right
after a streaming insert the buffer metadata is still absent, so the
sensor may report the buffer empty before it actually is. Known
limitation tracked at
https://github.com/apache/airflow/issues/66963
The ``table.streaming_buffer`` metadata BigQuery exposes is eventually
consistent: for a short window right after a streaming insert the rows are
in the buffer but the metadata still reads absent. A single absent reading
is therefore ambiguous (truly empty vs. metadata lag), so the sensor only
reports empty after ``empty_confirmations`` consecutive empty readings, each
one ``poke_interval`` apart. This spans the eventual-consistency window
without hanging when the table is genuinely empty or the buffer flushes
between two pokes.

:param project_id: Google Cloud project containing the table.
:param dataset_id: Dataset of the table to monitor.
:param table_id: Table to monitor.
:param gcp_conn_id: Airflow connection ID for GCP.
:param impersonation_chain: Optional service account to impersonate, or a
chained list of accounts. See the Google provider docs for details.
:param empty_confirmations: Number of consecutive empty readings (each
``poke_interval`` apart) required before reporting the buffer empty.
Must be at least 1; values above 1 guard against BigQuery's
eventually-consistent streaming-buffer metadata reporting empty too
early after a streaming insert.
:param deferrable: Run in deferrable mode using
:class:`BigQueryStreamingBufferEmptyTrigger`.
"""
Expand All @@ -368,6 +374,7 @@ def __init__(
table_id: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
empty_confirmations: int = 2,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
Expand All @@ -376,12 +383,17 @@ def __init__(

super().__init__(**kwargs)

if empty_confirmations < 1:
raise ValueError("empty_confirmations must be at least 1")

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.empty_confirmations = empty_confirmations
self.deferrable = deferrable
self._consecutive_empty = 0

def execute(self, context: Context) -> None:
if not self.deferrable:
Expand All @@ -398,6 +410,7 @@ def execute(self, context: Context) -> None:
poll_interval=self.poke_interval,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
empty_confirmations=self.empty_confirmations,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -426,4 +439,16 @@ def poke(self, context: Context) -> bool:
table = hook.get_client(project_id=self.project_id).get_table(table_ref)
except NotFound as err:
raise ValueError(f"Table {table_uri} not found") from err
return table.streaming_buffer is None

if table.streaming_buffer is not None:
self._consecutive_empty = 0
return False

self._consecutive_empty += 1
self.log.info(
"Streaming buffer reported empty (%s/%s confirmations) for table: %s",
self._consecutive_empty,
self.empty_confirmations,
table_uri,
)
return self._consecutive_empty >= self.empty_confirmations
Original file line number Diff line number Diff line change
Expand Up @@ -891,13 +891,21 @@ class BigQueryStreamingBufferEmptyTrigger(BaseTrigger):
Used by :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryStreamingBufferEmptySensor`
in deferrable mode.

The ``streamingBuffer`` table metadata BigQuery returns is eventually
consistent, so a single absent reading can be a false "empty" right after a
streaming insert. The trigger therefore yields success only after
``empty_confirmations`` consecutive empty polls, each ``poll_interval``
apart.

:param project_id: Google Cloud project ID.
:param dataset_id: Dataset of the table to monitor.
:param table_id: Table to monitor.
:param gcp_conn_id: Airflow connection ID for GCP.
:param poll_interval: Seconds between polls.
:param impersonation_chain: Optional service account to impersonate, or a
chained list of accounts.
:param empty_confirmations: Number of consecutive empty polls required
before reporting the buffer empty. Must be at least 1.
"""

def __init__(
Expand All @@ -908,14 +916,18 @@ def __init__(
gcp_conn_id: str,
poll_interval: float = 30.0,
impersonation_chain: str | Sequence[str] | None = None,
empty_confirmations: int = 2,
):
super().__init__()
if empty_confirmations < 1:
raise ValueError("empty_confirmations must be at least 1")
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.gcp_conn_id = gcp_conn_id
self.poll_interval = poll_interval
self.impersonation_chain = impersonation_chain
self.empty_confirmations = empty_confirmations

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
Expand All @@ -927,6 +939,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"poll_interval": self.poll_interval,
"impersonation_chain": self.impersonation_chain,
"empty_confirmations": self.empty_confirmations,
},
)

Expand All @@ -938,6 +951,7 @@ def _get_async_hook(self) -> BigQueryTableAsyncHook:

async def run(self) -> AsyncIterator[TriggerEvent]:
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
consecutive_empty = 0
try:
hook = self._get_async_hook()
async with ClientSession() as session:
Expand All @@ -951,11 +965,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
table_id=self.table_id,
)
if is_empty:
message = f"Streaming buffer is empty for table: {table_uri}"
self.log.info(message)
yield TriggerEvent({"status": "success", "message": message})
return
self.log.info("Streaming buffer not empty, sleeping %ss", self.poll_interval)
consecutive_empty += 1
if consecutive_empty >= self.empty_confirmations:
message = f"Streaming buffer is empty for table: {table_uri}"
self.log.info(message)
yield TriggerEvent({"status": "success", "message": message})
return
self.log.info(
"Streaming buffer reported empty (%s/%s confirmations), sleeping %ss",
consecutive_empty,
self.empty_confirmations,
self.poll_interval,
)
else:
consecutive_empty = 0
self.log.info("Streaming buffer not empty, sleeping %ss", self.poll_interval)
await asyncio.sleep(self.poll_interval)
except Exception as e:
self.log.exception("Error while checking streaming buffer for table %s", table_uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from __future__ import annotations

import os
import time
from datetime import datetime

import pytest
Expand Down Expand Up @@ -109,19 +108,6 @@ def streaming_insert(ds: str | None = None) -> None:
rows=[{"value": 100, "ds": ds}],
fail_on_error=True,
)
# BigQuery's streamingBuffer table metadata is eventually consistent: for
# a few seconds after a streaming insert the row is in the buffer but
# table.streaming_buffer is still None. Wait for the metadata to catch up
# so check_streaming_buffer_empty does not falsely report "empty" before
# the buffer is reported at all. Remove once the sensor handles this
# itself; tracked at https://github.com/apache/airflow/issues/66963
client = hook.get_client(project_id=PROJECT_ID)
table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"
for _ in range(30):
if client.get_table(table_uri).streaming_buffer is not None:
return
time.sleep(2)
raise RuntimeError("BigQuery streaming buffer metadata did not appear within 60s")

streaming_insert_task = streaming_insert()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def test_poke_returns_true_when_buffer_absent(self, mock_hook):
sensor = _make_streaming_sensor(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
empty_confirmations=1,
)
mock_table = mock.MagicMock(streaming_buffer=None)
mock_hook.return_value.get_client.return_value.get_table.return_value = mock_table
Expand Down Expand Up @@ -343,6 +344,38 @@ def test_poke_returns_false_when_buffer_present(self, mock_hook):

assert sensor.poke(mock.MagicMock()) is False

@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
def test_poke_requires_consecutive_empty_confirmations(self, mock_hook):
sensor = _make_streaming_sensor(empty_confirmations=2)
mock_hook.return_value.get_client.return_value.get_table.return_value = mock.MagicMock(
streaming_buffer=None
)

# A single absent reading is ambiguous (metadata lag), so it must not
# report empty; only the second consecutive empty reading does.
assert sensor.poke(mock.MagicMock()) is False
assert sensor.poke(mock.MagicMock()) is True

@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
def test_poke_resets_confirmations_when_buffer_reappears(self, mock_hook):
sensor = _make_streaming_sensor(empty_confirmations=2)
get_table = mock_hook.return_value.get_client.return_value.get_table
get_table.side_effect = [
mock.MagicMock(streaming_buffer=None),
mock.MagicMock(streaming_buffer={"estimatedRows": 5}),
mock.MagicMock(streaming_buffer=None),
mock.MagicMock(streaming_buffer=None),
]

assert sensor.poke(mock.MagicMock()) is False # 1st empty
assert sensor.poke(mock.MagicMock()) is False # buffer reappears, counter resets
assert sensor.poke(mock.MagicMock()) is False # empty again, count restarts at 1
assert sensor.poke(mock.MagicMock()) is True # 2nd consecutive empty

def test_init_rejects_non_positive_empty_confirmations(self):
with pytest.raises(ValueError, match="empty_confirmations must be at least 1"):
_make_streaming_sensor(empty_confirmations=0)

@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
def test_poke_raises_value_error_when_table_not_found(self, mock_hook):
mock_hook.return_value.get_client.return_value.get_table.side_effect = NotFound("missing")
Expand All @@ -364,7 +397,7 @@ def test_execute_does_not_defer_when_buffer_already_empty(self, mock_defer, mock
streaming_buffer=None
)

_make_streaming_sensor(deferrable=True).execute(mock.MagicMock())
_make_streaming_sensor(deferrable=True, empty_confirmations=1).execute(mock.MagicMock())

mock_defer.assert_not_called()

Expand All @@ -374,6 +407,7 @@ def test_execute_defers_with_trigger_when_buffer_not_empty(self, mock_hook):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
deferrable=True,
empty_confirmations=3,
)
mock_hook.return_value.get_client.return_value.get_table.return_value = mock.MagicMock(
streaming_buffer={"estimatedRows": 1}
Expand All @@ -391,6 +425,7 @@ def test_execute_defers_with_trigger_when_buffer_not_empty(self, mock_hook):
assert trigger.project_id == TEST_PROJECT_ID
assert trigger.dataset_id == TEST_DATASET_ID
assert trigger.table_id == TEST_TABLE_ID
assert trigger.empty_confirmations == 3

def test_execute_complete_returns_message_on_success(self):
sensor = _make_streaming_sensor(deferrable=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,19 @@ def test_serialization(self, streaming_buffer_trigger):
"gcp_conn_id": TEST_GCP_CONN_ID,
"poll_interval": POLLING_PERIOD_SECONDS,
"impersonation_chain": TEST_IMPERSONATION_CHAIN,
"empty_confirmations": 2,
}

def test_init_rejects_non_positive_empty_confirmations(self):
with pytest.raises(ValueError, match="empty_confirmations must be at least 1"):
BigQueryStreamingBufferEmptyTrigger(
project_id=TEST_GCP_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
empty_confirmations=0,
)

@mock.patch("airflow.providers.google.cloud.triggers.bigquery.BigQueryTableAsyncHook")
def test_async_hook_receives_impersonation_chain(self, mock_hook_cls, streaming_buffer_trigger):
streaming_buffer_trigger._get_async_hook()
Expand All @@ -1070,12 +1081,54 @@ def test_async_hook_receives_impersonation_chain(self, mock_hook_cls, streaming_
@pytest.mark.asyncio
@mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty")
@mock.patch(f"{_TRIGGER_PATH}._get_async_hook")
async def test_run_yields_success_when_buffer_empty(
self, _mock_hook, mock_is_empty, streaming_buffer_trigger
):
async def test_run_yields_success_when_buffer_empty(self, _mock_hook, mock_is_empty):
trigger = BigQueryStreamingBufferEmptyTrigger(
project_id=TEST_GCP_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
gcp_conn_id=TEST_GCP_CONN_ID,
poll_interval=POLLING_PERIOD_SECONDS,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
empty_confirmations=1,
)
mock_is_empty.return_value = True
actual = await trigger.run().asend(None)

table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
assert actual == TriggerEvent(
{"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"}
)

@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.triggers.bigquery.asyncio.sleep", new_callable=AsyncMock)
@mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty")
@mock.patch(f"{_TRIGGER_PATH}._get_async_hook")
async def test_run_waits_for_consecutive_empty_confirmations(
self, _mock_hook, mock_is_empty, mock_sleep, streaming_buffer_trigger
):
# A single empty reading must not yield success (default empty_confirmations=2);
# success only after the second consecutive empty poll.
mock_is_empty.side_effect = [True, True]
actual = await streaming_buffer_trigger.run().asend(None)

assert mock_is_empty.await_count == 2
table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
assert actual == TriggerEvent(
{"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"}
)

@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.triggers.bigquery.asyncio.sleep", new_callable=AsyncMock)
@mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty")
@mock.patch(f"{_TRIGGER_PATH}._get_async_hook")
async def test_run_resets_confirmations_when_buffer_reappears(
self, _mock_hook, mock_is_empty, mock_sleep, streaming_buffer_trigger
):
# empty, then non-empty (resets), then two consecutive empties -> success.
mock_is_empty.side_effect = [True, False, True, True]
actual = await streaming_buffer_trigger.run().asend(None)

assert mock_is_empty.await_count == 4
table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
assert actual == TriggerEvent(
{"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"}
Expand Down