diff --git a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py index 5954d30abdbae..dea6fb8b9a6b8 100644 --- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py @@ -334,12 +334,11 @@ class BigQueryStreamingBufferEmptySensor(BaseSensorOperator): streaming buffer`` errors. .. warning:: - The sensor reads ``table.streaming_buffer`` from BigQuery's table - metadata, which is eventually consistent. For a short window right - after a streaming insert the buffer metadata is still absent, so the - sensor may report the buffer empty before it actually is. Known - limitation tracked at - https://github.com/apache/airflow/issues/66963 + BigQuery's ``table.streaming_buffer`` metadata is eventually consistent. + The sensor mitigates this by requiring multiple consecutive empty + observations before reporting the buffer empty (configurable via + ``empty_confirmations``), but this may delay success by up to one + ``poke_interval`` compared to earlier releases. :param project_id: Google Cloud project containing the table. :param dataset_id: Dataset of the table to monitor. @@ -347,6 +346,9 @@ class BigQueryStreamingBufferEmptySensor(BaseSensorOperator): :param gcp_conn_id: Airflow connection ID for GCP. :param impersonation_chain: Optional service account to impersonate, or a chained list of accounts. See the Google provider docs for details. + :param empty_confirmations: Number of consecutive empty readings required + before considering the streaming buffer empty. Must be at least 1. + Defaults to 2. :param deferrable: Run in deferrable mode using :class:`BigQueryStreamingBufferEmptyTrigger`. """ @@ -368,6 +370,7 @@ def __init__( table_id: str, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + empty_confirmations: int = 2, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ) -> None: @@ -376,12 +379,17 @@ def __init__( super().__init__(**kwargs) + if empty_confirmations < 1: + raise ValueError("empty_confirmations must be at least 1") + self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain + self.empty_confirmations = empty_confirmations self.deferrable = deferrable + self._consecutive_empty = 0 def execute(self, context: Context) -> None: if not self.deferrable: @@ -398,6 +406,7 @@ def execute(self, context: Context) -> None: poll_interval=self.poke_interval, gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + empty_confirmations=self.empty_confirmations, ), method_name="execute_complete", ) @@ -426,4 +435,18 @@ def poke(self, context: Context) -> bool: table = hook.get_client(project_id=self.project_id).get_table(table_ref) except NotFound as err: raise ValueError(f"Table {table_uri} not found") from err - return table.streaming_buffer is None + + if table.streaming_buffer is not None: + self._consecutive_empty = 0 + return False + + self._consecutive_empty += 1 + self.log.info( + "Streaming buffer reported empty (%s/%s confirmations) for table: %s", + self._consecutive_empty, + self.empty_confirmations, + table_uri, + ) + if self._consecutive_empty >= self.empty_confirmations: + return True + return False diff --git a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py index e11059dfbb897..5b2c98e566adf 100644 --- a/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py @@ -898,6 +898,8 @@ class BigQueryStreamingBufferEmptyTrigger(BaseTrigger): :param poll_interval: Seconds between polls. :param impersonation_chain: Optional service account to impersonate, or a chained list of accounts. + :param empty_confirmations: Number of consecutive empty polls required + before reporting the buffer empty. Must be at least 1. """ def __init__( @@ -908,14 +910,18 @@ def __init__( gcp_conn_id: str, poll_interval: float = 30.0, impersonation_chain: str | Sequence[str] | None = None, + empty_confirmations: int = 2, ): super().__init__() + if empty_confirmations < 1: + raise ValueError("empty_confirmations must be at least 1") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id self.gcp_conn_id = gcp_conn_id self.poll_interval = poll_interval self.impersonation_chain = impersonation_chain + self.empty_confirmations = empty_confirmations def serialize(self) -> tuple[str, dict[str, Any]]: return ( @@ -927,6 +933,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "poll_interval": self.poll_interval, "impersonation_chain": self.impersonation_chain, + "empty_confirmations": self.empty_confirmations, }, ) @@ -938,6 +945,12 @@ def _get_async_hook(self) -> BigQueryTableAsyncHook: async def run(self) -> AsyncIterator[TriggerEvent]: table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}" + # The ``streamingBuffer`` table metadata BigQuery returns is eventually + # consistent, so a single absent reading can be a false "empty" right + # after a streaming insert. Yield success only after + # ``empty_confirmations`` consecutive empty polls, each ``poll_interval`` + # apart. + consecutive_empty = 0 try: hook = self._get_async_hook() async with ClientSession() as session: @@ -951,11 +964,21 @@ async def run(self) -> AsyncIterator[TriggerEvent]: table_id=self.table_id, ) if is_empty: - message = f"Streaming buffer is empty for table: {table_uri}" - self.log.info(message) - yield TriggerEvent({"status": "success", "message": message}) - return - self.log.info("Streaming buffer not empty, sleeping %ss", self.poll_interval) + consecutive_empty += 1 + if consecutive_empty >= self.empty_confirmations: + message = f"Streaming buffer is empty for table: {table_uri}" + self.log.info(message) + yield TriggerEvent({"status": "success", "message": message}) + return + self.log.info( + "Streaming buffer reported empty (%s/%s confirmations), sleeping %ss", + consecutive_empty, + self.empty_confirmations, + self.poll_interval, + ) + else: + consecutive_empty = 0 + self.log.info("Streaming buffer not empty, sleeping %ss", self.poll_interval) await asyncio.sleep(self.poll_interval) except Exception as e: self.log.exception("Error while checking streaming buffer for table %s", table_uri) diff --git a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py index 68035ac944ee8..9caa7ca180b8d 100644 --- a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py +++ b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_streaming_buffer_sensor.py @@ -26,7 +26,6 @@ from __future__ import annotations import os -import time from datetime import datetime import pytest @@ -109,19 +108,6 @@ def streaming_insert(ds: str | None = None) -> None: rows=[{"value": 100, "ds": ds}], fail_on_error=True, ) - # BigQuery's streamingBuffer table metadata is eventually consistent: for - # a few seconds after a streaming insert the row is in the buffer but - # table.streaming_buffer is still None. Wait for the metadata to catch up - # so check_streaming_buffer_empty does not falsely report "empty" before - # the buffer is reported at all. Remove once the sensor handles this - # itself; tracked at https://github.com/apache/airflow/issues/66963 - client = hook.get_client(project_id=PROJECT_ID) - table_uri = f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}" - for _ in range(30): - if client.get_table(table_uri).streaming_buffer is not None: - return - time.sleep(2) - raise RuntimeError("BigQuery streaming buffer metadata did not appear within 60s") streaming_insert_task = streaming_insert() diff --git a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py index 1881c360e5edc..f90ab5d916196 100644 --- a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py @@ -313,6 +313,7 @@ def test_poke_returns_true_when_buffer_absent(self, mock_hook): sensor = _make_streaming_sensor( gcp_conn_id=TEST_GCP_CONN_ID, impersonation_chain=TEST_IMPERSONATION_CHAIN, + empty_confirmations=1, ) mock_table = mock.MagicMock(streaming_buffer=None) mock_hook.return_value.get_client.return_value.get_table.return_value = mock_table @@ -343,6 +344,50 @@ def test_poke_returns_false_when_buffer_present(self, mock_hook): assert sensor.poke(mock.MagicMock()) is False + @pytest.mark.parametrize( + ("empty_confirmations", "expected_results"), + [ + pytest.param(2, [False, True], id="default"), + pytest.param(3, [False, False, True], id="three_confirmations"), + ], + ) + @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook") + def test_poke_requires_consecutive_empty_confirmations( + self, + mock_hook, + empty_confirmations, + expected_results, + ): + sensor = _make_streaming_sensor(empty_confirmations=empty_confirmations) + mock_hook.return_value.get_client.return_value.get_table.return_value = mock.MagicMock( + streaming_buffer=None + ) + + # A single absent reading is ambiguous (metadata lag), so the sensor only + # reports empty after the configured number of consecutive empty readings. + for expected in expected_results: + assert sensor.poke(mock.MagicMock()) is expected + + @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook") + def test_poke_resets_confirmations_when_buffer_reappears(self, mock_hook): + sensor = _make_streaming_sensor(empty_confirmations=2) + get_table = mock_hook.return_value.get_client.return_value.get_table + get_table.side_effect = [ + mock.MagicMock(streaming_buffer=None), + mock.MagicMock(streaming_buffer={"estimatedRows": 5}), + mock.MagicMock(streaming_buffer=None), + mock.MagicMock(streaming_buffer=None), + ] + + assert sensor.poke(mock.MagicMock()) is False # 1st empty + assert sensor.poke(mock.MagicMock()) is False # buffer reappears, counter resets + assert sensor.poke(mock.MagicMock()) is False # empty again, count restarts at 1 + assert sensor.poke(mock.MagicMock()) is True # 2nd consecutive empty + + def test_init_rejects_non_positive_empty_confirmations(self): + with pytest.raises(ValueError, match="empty_confirmations must be at least 1"): + _make_streaming_sensor(empty_confirmations=0) + @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook") def test_poke_raises_value_error_when_table_not_found(self, mock_hook): mock_hook.return_value.get_client.return_value.get_table.side_effect = NotFound("missing") @@ -364,7 +409,7 @@ def test_execute_does_not_defer_when_buffer_already_empty(self, mock_defer, mock streaming_buffer=None ) - _make_streaming_sensor(deferrable=True).execute(mock.MagicMock()) + _make_streaming_sensor(deferrable=True, empty_confirmations=1).execute(mock.MagicMock()) mock_defer.assert_not_called() @@ -374,6 +419,7 @@ def test_execute_defers_with_trigger_when_buffer_not_empty(self, mock_hook): gcp_conn_id=TEST_GCP_CONN_ID, impersonation_chain=TEST_IMPERSONATION_CHAIN, deferrable=True, + empty_confirmations=3, ) mock_hook.return_value.get_client.return_value.get_table.return_value = mock.MagicMock( streaming_buffer={"estimatedRows": 1} @@ -391,6 +437,7 @@ def test_execute_defers_with_trigger_when_buffer_not_empty(self, mock_hook): assert trigger.project_id == TEST_PROJECT_ID assert trigger.dataset_id == TEST_DATASET_ID assert trigger.table_id == TEST_TABLE_ID + assert trigger.empty_confirmations == 3 def test_execute_complete_returns_message_on_success(self): sensor = _make_streaming_sensor(deferrable=True) diff --git a/providers/google/tests/unit/google/cloud/triggers/test_bigquery.py b/providers/google/tests/unit/google/cloud/triggers/test_bigquery.py index 25a29b8118d1c..89838e21c9a91 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_bigquery.py @@ -1057,8 +1057,19 @@ def test_serialization(self, streaming_buffer_trigger): "gcp_conn_id": TEST_GCP_CONN_ID, "poll_interval": POLLING_PERIOD_SECONDS, "impersonation_chain": TEST_IMPERSONATION_CHAIN, + "empty_confirmations": 2, } + def test_init_rejects_non_positive_empty_confirmations(self): + with pytest.raises(ValueError, match="empty_confirmations must be at least 1"): + BigQueryStreamingBufferEmptyTrigger( + project_id=TEST_GCP_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + gcp_conn_id=TEST_GCP_CONN_ID, + empty_confirmations=0, + ) + @mock.patch("airflow.providers.google.cloud.triggers.bigquery.BigQueryTableAsyncHook") def test_async_hook_receives_impersonation_chain(self, mock_hook_cls, streaming_buffer_trigger): streaming_buffer_trigger._get_async_hook() @@ -1070,12 +1081,77 @@ def test_async_hook_receives_impersonation_chain(self, mock_hook_cls, streaming_ @pytest.mark.asyncio @mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty") @mock.patch(f"{_TRIGGER_PATH}._get_async_hook") - async def test_run_yields_success_when_buffer_empty( - self, _mock_hook, mock_is_empty, streaming_buffer_trigger - ): + async def test_run_yields_success_when_buffer_empty(self, _mock_hook, mock_is_empty): + trigger = BigQueryStreamingBufferEmptyTrigger( + project_id=TEST_GCP_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + gcp_conn_id=TEST_GCP_CONN_ID, + poll_interval=POLLING_PERIOD_SECONDS, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + empty_confirmations=1, + ) mock_is_empty.return_value = True + actual = await trigger.run().asend(None) + + table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}" + assert actual == TriggerEvent( + {"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"} + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("empty_confirmations", "side_effect", "expected_polls"), + [ + pytest.param(2, [True, True], 2, id="default"), + pytest.param(3, [True, True, True], 3, id="three_confirmations"), + ], + ) + @mock.patch("airflow.providers.google.cloud.triggers.bigquery.asyncio.sleep", new_callable=AsyncMock) + @mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty") + @mock.patch(f"{_TRIGGER_PATH}._get_async_hook") + async def test_run_waits_for_consecutive_empty_confirmations( + self, + _mock_hook, + mock_is_empty, + mock_sleep, + empty_confirmations, + side_effect, + expected_polls, + ): + # A single empty reading must not yield success; success only after + # ``empty_confirmations`` consecutive empty polls. + mock_is_empty.side_effect = side_effect + + trigger = BigQueryStreamingBufferEmptyTrigger( + project_id=TEST_GCP_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + gcp_conn_id=TEST_GCP_CONN_ID, + empty_confirmations=empty_confirmations, + ) + + actual = await trigger.run().asend(None) + + assert mock_is_empty.await_count == expected_polls + + table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}" + assert actual == TriggerEvent( + {"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"} + ) + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.triggers.bigquery.asyncio.sleep", new_callable=AsyncMock) + @mock.patch(f"{_TRIGGER_PATH}._is_streaming_buffer_empty") + @mock.patch(f"{_TRIGGER_PATH}._get_async_hook") + async def test_run_resets_confirmations_when_buffer_reappears( + self, _mock_hook, mock_is_empty, mock_sleep, streaming_buffer_trigger + ): + # empty, then non-empty (resets), then two consecutive empties -> success. + mock_is_empty.side_effect = [True, False, True, True] actual = await streaming_buffer_trigger.run().asend(None) + assert mock_is_empty.await_count == 4 table_uri = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}" assert actual == TriggerEvent( {"status": "success", "message": f"Streaming buffer is empty for table: {table_uri}"}