Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions services/hackbot-pulse-listener/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM python:3.14-slim AS builder

COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

ENV UV_PROJECT_ENVIRONMENT=/opt/venv

WORKDIR /app

# Install external deps without building workspace members.
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=VERSION,target=VERSION \
uv sync --frozen --no-dev --no-install-workspace --package hackbot-pulse-listener

RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,target=/app,rw \
uv sync --locked --no-dev --no-editable --package hackbot-pulse-listener

FROM python:3.14-slim AS base

COPY --from=builder /opt/venv /opt/venv
WORKDIR /app

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PATH="/opt/venv/bin:$PATH"

RUN useradd --create-home --shell /bin/bash app
USER app

CMD ["python", "-m", "app"]
38 changes: 38 additions & 0 deletions services/hackbot-pulse-listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Hackbot Pulse Listener

Listens to Taskcluster build-failure pulse messages, and for failed **Firefox build
tasks** triggers the `build-repair` hackbot agent through the hackbot-api. Notification
about the run outcome is handled separately (in the API or another service).

## How it works

1. Consume `task-failed` messages from `pulse.mozilla.org`.
2. Keep only **build-kind** tasks (`tags.kind == "build"`) on a watched `project`
(`WATCHED_REPOS`, default `try`). Build tasks don't run tests, so a failure is a
compilation/link error.
3. Fetch the task definition to read `GECKO_HEAD_REV` (the revision is not in the message).
4. Dedupe by revision with an in-memory TTL cache, so only one agent run is triggered per
revision even when many build tasks fail for the same push.
5. `POST /agents/build-repair/runs`.

The dedupe cache is in-memory (reset on restart).

## Run locally

```bash
export PULSE_USER=... PULSE_PASSWORD=... # https://pulseguardian.mozilla.org
export HACKBOT_API_URL=https://hackbot-api.../ HACKBOT_API_KEY=...
export WATCHED_REPOS=try
export DRY_RUN=true # log intended calls, don't POST
uv run --package hackbot-pulse-listener python -m app
```

## Test

```bash
uv run --package hackbot-pulse-listener pytest services/hackbot-pulse-listener/tests
```

## Deploy

Cloud Run worker pool (no HTTP). See `deploy.sh`.
Empty file.
39 changes: 39 additions & 0 deletions services/hackbot-pulse-listener/app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import signal

from app import consumer
from app.config import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def main() -> None:
if settings.sentry_dsn:
import sentry_sdk

sentry_sdk.init(dsn=settings.sentry_dsn, environment=settings.environment)

if not (settings.pulse_user and settings.pulse_password):
logger.warning("PULSE_USER/PULSE_PASSWORD not set; listener will not start")
return

consumer_obj = consumer.build_consumer()

def shutdown(signum, _frame):
logger.info("Received signal %s; shutting down", signum)
consumer_obj.should_stop = True

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)

logger.info(
"Listening for build failures on %s; watched repos: %s",
", ".join(consumer.EXCHANGES),
sorted(settings.watched_repos_set),
)
consumer_obj.run()


if __name__ == "__main__":
main()
25 changes: 25 additions & 0 deletions services/hackbot-pulse-listener/app/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

import httpx

from app.config import settings

logger = logging.getLogger(__name__)

_TIMEOUT = httpx.Timeout(30.0)


def _headers() -> dict[str, str]:
return {"X-API-Key": settings.hackbot_api_key}


def trigger_run(inputs: dict) -> str | None:
"""Create a build-repair run. Returns the run id, or None in dry-run mode."""
if settings.dry_run:
logger.info("[dry-run] would trigger %s run: %s", settings.agent_name, inputs)
return None

url = f"{settings.hackbot_api_url}/agents/{settings.agent_name}/runs"
resp = httpx.post(url, json=inputs, headers=_headers(), timeout=_TIMEOUT)
resp.raise_for_status()
return resp.json()["run_id"]
41 changes: 41 additions & 0 deletions services/hackbot-pulse-listener/app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
# Pulse (https://pulseguardian.mozilla.org)
pulse_user: str = ""
pulse_password: str = ""
taskcluster_root_url: str = "https://firefox-ci-tc.services.mozilla.com"

# hackbot-api
hackbot_api_url: str = ""
hackbot_api_key: str = ""
agent_name: str = "build-repair"

# Failure filtering and agent inputs.
# ``watched_repos`` is a comma-separated list of Taskcluster ``project`` tags.
watched_repos: str = "try,autoland"
run_try_push: bool = False
model: str | None = None
max_turns: int | None = None

# Dedupe (in-memory, by git revision)
dedupe_ttl_seconds: int = 6 * 60 * 60
dedupe_max_size: int = 4096

dry_run: bool = False
environment: str = "development"
sentry_dsn: str | None = None

model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"extra": "ignore",
}

@property
def watched_repos_set(self) -> set[str]:
return {r.strip() for r in self.watched_repos.split(",") if r.strip()}


settings = Settings()
112 changes: 112 additions & 0 deletions services/hackbot-pulse-listener/app/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging

from cachetools import TTLCache
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin

from app import client, taskcluster
from app.config import settings

logger = logging.getLogger(__name__)

CONNECTION_URL = "amqp://{}:{}@pulse.mozilla.org:5671/?ssl=1"

EXCHANGES = ("exchange/taskcluster-queue/v1/task-failed",)

# In-memory dedupe of git revisions already handed to the agent. Only the
# single consumer thread touches it, so no lock is needed.
_seen: TTLCache = TTLCache(
maxsize=settings.dedupe_max_size, ttl=settings.dedupe_ttl_seconds
)


def process(body: dict) -> str | None:
"""Handle one Taskcluster failure message. Returns the triggered run id."""
tags = (body.get("task") or {}).get("tags") or {}

if tags.get("kind") != "build":
return None

project = tags.get("project")
if project not in settings.watched_repos_set:
return None

task_id = body["status"]["taskId"]
task_name = tags.get("label") or task_id

revision = taskcluster.get_revision(task_id)
if not revision:
logger.warning("No GECKO_HEAD_REV for task %s; skipping", task_id)
return None

if revision in _seen:
logger.info("Revision %s already processed; skipping", revision)
return None
_seen[revision] = True

inputs: dict = {
"git_commit": revision,
"failure_tasks": {task_name: task_id},
"run_try_push": settings.run_try_push,
}
if settings.model:
inputs["model"] = settings.model
if settings.max_turns is not None:
inputs["max_turns"] = settings.max_turns

try:
run_id = client.trigger_run(inputs)
except Exception:
logger.exception("Failed to trigger build-repair run for %s", revision)
_seen.pop(revision, None)
return None

logger.info("Triggered build-repair run %s for %s@%s", run_id, project, revision)
return run_id


def make_handler():
def on_message(body, message):
try:
process(body)
except Exception:
logger.exception("Error handling pulse message")
finally:
message.ack()

return on_message


def _build_queues(user: str) -> list[Queue]:
queues = []
for exchange in EXCHANGES:
suffix = exchange.rsplit("/", 1)[-1]
queues.append(
Queue(
name=f"queue/{user}/build-repair-{suffix}",
exchange=Exchange(exchange, type="topic", no_declare=True),
routing_key="#",
durable=True,
auto_delete=True,
)
)
return queues


class BuildFailureConsumer(ConsumerMixin):
def __init__(self, connection, queues, on_message):
self.connection = connection
self.queues = queues
self.on_message = on_message

def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues, callbacks=[self.on_message])]


def build_consumer() -> BuildFailureConsumer:
connection = Connection(
CONNECTION_URL.format(settings.pulse_user, settings.pulse_password)
)
return BuildFailureConsumer(
connection, _build_queues(settings.pulse_user), make_handler()
)
26 changes: 26 additions & 0 deletions services/hackbot-pulse-listener/app/taskcluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

import taskcluster

from app.config import settings

logger = logging.getLogger(__name__)

_queue: taskcluster.Queue | None = None


def _get_queue() -> taskcluster.Queue:
global _queue
if _queue is None:
_queue = taskcluster.Queue({"rootUrl": settings.taskcluster_root_url})
return _queue


def get_revision(task_id: str) -> str | None:
"""Return the GECKO_HEAD_REV (git SHA) for a task, or None if unavailable.

The revision is not in the pulse message, so we fetch the full task
definition. Task definitions are public, so no credentials are needed.
"""
task = _get_queue().task(task_id)
return task.get("payload", {}).get("env", {}).get("GECKO_HEAD_REV")
79 changes: 79 additions & 0 deletions services/hackbot-pulse-listener/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env bash
#
# Deploy the build-failure Pulse listener to Cloud Run as a worker pool.
#
# A worker pool runs an always-on, non-request workload (no HTTP port). This
# service consumes Taskcluster build-failure pulse messages and triggers the
# build-repair hackbot agent via the hackbot-api.
#
# Prereqs (one-time):
# gcloud auth login
# gcloud config set project <PROJECT_ID>
# gcloud services enable run.googleapis.com artifactregistry.googleapis.com \
# cloudbuild.googleapis.com secretmanager.googleapis.com
#
# Secrets (one-time) — store in Secret Manager:
# printf '%s' '<pulse password>' | gcloud secrets create pulse-password --data-file=-
# # HACKBOT_API_KEY reuses the existing shared `external-api-key` secret.
#
# Usage:
# PROJECT=my-proj REGION=us-central1 \
# HACKBOT_API_URL=https://hackbot-api-xxxx.run.app \
# PULSE_USER=my-pulse-user \
# ./deploy.sh
set -euo pipefail

PROJECT="${PROJECT:?set PROJECT to your GCP project id}"
REGION="${REGION:-us-central1}"
SERVICE="${SERVICE:-hackbot-pulse-listener}"
REPO="${REPO:-hackbot}"
HACKBOT_API_URL="${HACKBOT_API_URL:?set HACKBOT_API_URL to the hackbot-api base URL}"
PULSE_USER="${PULSE_USER:?set PULSE_USER (https://pulseguardian.mozilla.org)}"
WATCHED_REPOS="${WATCHED_REPOS:-try}"

SA_NAME="${SA_NAME:-hackbot-pulse-listener-run}"
SA_EMAIL="${SA_EMAIL:-${SA_NAME}@${PROJECT}.iam.gserviceaccount.com}"

PULSE_SECRET="${PULSE_SECRET:-pulse-password}"
API_KEY_SECRET="${API_KEY_SECRET:-external-api-key}"

IMAGE="${REGION}-docker.pkg.dev/${PROJECT}/${REPO}/${SERVICE}:latest"
# Build context is the repo root (the Dockerfile needs the workspace lock files).
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"

echo "==> Ensuring runtime service account '${SA_EMAIL}' exists"
gcloud iam service-accounts describe "${SA_EMAIL}" >/dev/null 2>&1 || \
gcloud iam service-accounts create "${SA_NAME}" \
--display-name="Hackbot Pulse Listener (Cloud Run runtime)"

echo "==> Granting the SA read access to its secrets"
for s in "${PULSE_SECRET}" "${API_KEY_SECRET}"; do
gcloud secrets add-iam-policy-binding "$s" \
--member="serviceAccount:${SA_EMAIL}" \
--role=roles/secretmanager.secretAccessor >/dev/null
done

echo "==> Ensuring Artifact Registry repo '${REPO}' exists in ${REGION}"
gcloud artifacts repositories describe "${REPO}" --location="${REGION}" >/dev/null 2>&1 || \
gcloud artifacts repositories create "${REPO}" \
--repository-format=docker --location="${REGION}" \
--description="Hackbot container images"

echo "==> Building & pushing image with Cloud Build: ${IMAGE}"
gcloud builds submit "${ROOT_DIR}" \
--config <(printf 'steps:\n- name: gcr.io/cloud-builders/docker\n args: ["build","-t","%s","-f","services/%s/Dockerfile","."]\nimages: ["%s"]\n' "${IMAGE}" "${SERVICE}" "${IMAGE}")

echo "==> Deploying worker pool"
ENV_VARS="HACKBOT_API_URL=${HACKBOT_API_URL}"
ENV_VARS="${ENV_VARS},PULSE_USER=${PULSE_USER},WATCHED_REPOS=${WATCHED_REPOS}"

gcloud run worker-pools deploy "${SERVICE}" \
--image "${IMAGE}" \
--region "${REGION}" \
--min-instances 1 \
--max-instances 1 \
--service-account "${SA_EMAIL}" \
--set-env-vars "${ENV_VARS}" \
--set-secrets "PULSE_PASSWORD=${PULSE_SECRET}:latest,HACKBOT_API_KEY=${API_KEY_SECRET}:latest"

echo "==> Deployed worker pool '${SERVICE}'"
Loading