Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions api/entrypoints/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@
from oss.src.core.triggers.registry import TriggersGatewayRegistry
from oss.src.core.triggers.service import TriggersService
from oss.src.apis.fastapi.triggers.router import TriggersRouter
from oss.src.tasks.asyncio.triggers.dispatcher import TriggersDispatcher
from oss.src.tasks.taskiq.triggers.worker import TriggersWorker
from taskiq_redis import RedisStreamBroker
from oss.src.apis.fastapi.shared.utils import SupportHeadersMiddleware


Expand Down Expand Up @@ -214,8 +217,12 @@ async def lifespan(*args, **kwargs):
warn_deprecated_env_vars()
validate_required_env_vars()

await _triggers_broker.startup()

yield

await _triggers_broker.shutdown()

for adapter in _composio_adapters.values():
await adapter.close()

Expand Down Expand Up @@ -651,6 +658,26 @@ async def lifespan(*args, **kwargs):
connections_service=connections_service,
)

# Producer side of the inbound dispatch pipeline: the ingress route enqueues
# `triggers.dispatch` tasks here; entrypoints/worker_triggers.py consumes them.
_triggers_broker = RedisStreamBroker(
url=env.redis.uri_durable,
queue_name="queues:triggers",
consumer_group_name="api-triggers-producer",
maxlen=100_000,
approximate=True,
)

_triggers_dispatcher = TriggersDispatcher(
triggers_dao=triggers_dao,
workflows_service=workflows_service,
)

_triggers_worker = TriggersWorker(
broker=_triggers_broker,
dispatcher=_triggers_dispatcher,
)

_t_services_done = time.perf_counter() - _t_services
print(f"[STARTUP] Service initialization completed (+{_t_services_done:.3f}s)")
_t_routers = time.perf_counter()
Expand Down Expand Up @@ -767,6 +794,7 @@ async def lifespan(*args, **kwargs):

triggers = TriggersRouter(
triggers_service=triggers_service,
dispatch_task=_triggers_worker.dispatch_trigger,
)

simple_traces = SimpleTracesRouter(
Expand Down
142 changes: 142 additions & 0 deletions api/entrypoints/worker_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import sys

from taskiq.cli.worker.run import run_worker
from taskiq.cli.worker.args import WorkerArgs
from taskiq_redis import RedisStreamBroker

from oss.src.utils.logging import get_module_logger
from oss.src.utils.helpers import warn_deprecated_env_vars, validate_required_env_vars
from oss.src.utils.env import env

from oss.src.utils.common import is_ee
from oss.src.dbs.postgres.git.dao import GitDAO
from oss.src.dbs.postgres.triggers.dao import TriggersDAO
from oss.src.dbs.postgres.workflows.dbes import (
WorkflowArtifactDBE,
WorkflowVariantDBE,
WorkflowRevisionDBE,
)
from oss.src.dbs.postgres.environments.dbes import (
EnvironmentArtifactDBE,
EnvironmentVariantDBE,
EnvironmentRevisionDBE,
)
from oss.src.core.workflows.service import WorkflowsService
from oss.src.core.environments.service import EnvironmentsService
from oss.src.core.embeds.service import EmbedsService
from oss.src.tasks.asyncio.triggers.dispatcher import TriggersDispatcher
from oss.src.tasks.taskiq.triggers.worker import TriggersWorker

# Guard EE imports — see worker_tracing.py for the rationale.
if is_ee():
from ee.src.core.access.entitlements.service import bootstrap_entitlements_services


import agenta as ag

log = get_module_logger(__name__)

# Initialize Agenta SDK
ag.init(
api_url=env.agenta.api_url,
)

# Bound the stream so acked entries are trimmed; without this it grows unbounded.
MAXLEN_QUEUES_TRIGGERS = 100_000

# BROKER -------------------------------------------------------------------
broker = RedisStreamBroker(
url=env.redis.uri_durable,
queue_name="queues:triggers",
consumer_group_name="worker-triggers",
maxlen=MAXLEN_QUEUES_TRIGGERS,
approximate=True,
)


# WORKERS ------------------------------------------------------------------
triggers_dao = TriggersDAO()

workflows_dao = GitDAO(
ArtifactDBE=WorkflowArtifactDBE,
VariantDBE=WorkflowVariantDBE,
RevisionDBE=WorkflowRevisionDBE,
)

environments_dao = GitDAO(
ArtifactDBE=EnvironmentArtifactDBE,
VariantDBE=EnvironmentVariantDBE,
RevisionDBE=EnvironmentRevisionDBE,
)

workflows_service = WorkflowsService(
workflows_dao=workflows_dao,
)

environments_service = EnvironmentsService(
environments_dao=environments_dao,
)

embeds_service = EmbedsService(
workflows_service=workflows_service,
environments_service=environments_service,
)

workflows_service.environments_service = environments_service
workflows_service.embeds_service = embeds_service
environments_service.embeds_service = embeds_service

triggers_dispatcher = TriggersDispatcher(
triggers_dao=triggers_dao,
workflows_service=workflows_service,
)

triggers_worker = TriggersWorker(
broker=broker,
dispatcher=triggers_dispatcher,
)


def main() -> int:
"""
Main entry point for the worker.

Returns:
Exit code (0 for success, non-zero for failure)
"""
try:
log.info("[TRIGGERS] Initializing Taskiq worker")

# Validate environment
warn_deprecated_env_vars()
validate_required_env_vars()

# Wire EE entitlement services so `check_entitlements` works in
# this worker process. Gated on `is_ee()` to match the import above.
if is_ee():
bootstrap_entitlements_services()

log.info("[TRIGGERS] Starting Taskiq worker with Redis Streams")

# Run Taskiq worker
args = WorkerArgs(
broker="entrypoints.worker_triggers:broker", # Reference broker from this module
modules=[],
fs_discover=False,
workers=1,
max_async_tasks=50,
)

result = run_worker(args)
return result if result is not None else 0

except KeyboardInterrupt:
log.info("[TRIGGERS] Shutdown requested")
return 0
except Exception as e:
log.error("[TRIGGERS] Fatal error", error=str(e))
return 1


if __name__ == "__main__":
sys.exit(main())
25 changes: 24 additions & 1 deletion api/oss/src/apis/fastapi/triggers/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -91,3 +91,26 @@ class TriggerDeliveryResponse(BaseModel):
class TriggerDeliveriesResponse(BaseModel):
count: int = 0
deliveries: List[TriggerDelivery] = Field(default_factory=list)


# ---------------------------------------------------------------------------
# Trigger Ingress (inbound provider events)
# ---------------------------------------------------------------------------


class TriggerEventAck(BaseModel):
status: str = "accepted"
detail: Optional[str] = None


class ComposioEventEnvelope(BaseModel):
"""Loose view of a Composio trigger webhook envelope (`{data, type, ...}`).

Demultiplexing keys live under ``metadata`` (``trigger_id``, ``id``); the rest
is passed through to the resolver as the inbound event.
"""

type: Optional[str] = None
timestamp: Optional[str] = None
data: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
98 changes: 97 additions & 1 deletion api/oss/src/apis/fastapi/triggers/router.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import hashlib
import hmac
from functools import wraps
from typing import Optional
from json import JSONDecodeError, loads
from typing import Any, Optional
from uuid import UUID

import httpx
Expand All @@ -10,6 +13,7 @@
from oss.src.utils.logging import get_module_logger
from oss.src.utils.caching import get_cache, set_cache
from oss.src.utils.common import is_ee
from oss.src.utils.env import env

from oss.src.apis.fastapi.triggers.models import (
TriggerCatalogEventResponse,
Expand All @@ -19,6 +23,7 @@
TriggerDeliveriesResponse,
TriggerDeliveryQueryRequest,
TriggerDeliveryResponse,
TriggerEventAck,
TriggerSubscriptionCreateRequest,
TriggerSubscriptionEditRequest,
TriggerSubscriptionQueryRequest,
Expand Down Expand Up @@ -76,16 +81,58 @@ async def wrapper(*args, **kwargs):
return decorator


def _verify_composio_signature(
*,
body: bytes,
headers: Any,
) -> bool:
"""HMAC-SHA256 verify over ``{id}.{ts}.{body}`` with ``COMPOSIO_WEBHOOK_SECRET``.

Returns True when the secret is unset (no-op) or the signature matches.
"""
secret = env.composio.webhook_secret
if not secret:
return True

signature = headers.get("webhook-signature") or headers.get("x-composio-signature")
webhook_id = headers.get("webhook-id") or ""
timestamp = headers.get("webhook-timestamp") or ""
if not signature:
return False

signed = f"{webhook_id}.{timestamp}.{body.decode('utf-8', errors='replace')}"
expected = hmac.new(
secret.encode("utf-8"),
signed.encode("utf-8"),
hashlib.sha256,
).hexdigest()

provided = signature.split(",")[-1].strip()
return hmac.compare_digest(expected, provided)


class TriggersRouter:
def __init__(
self,
*,
triggers_service: TriggersService,
dispatch_task: Optional[Any] = None,
):
self.triggers_service = triggers_service
self.dispatch_task = dispatch_task

self.router = APIRouter()

# --- Trigger Ingress (inbound provider events) ---
self.router.add_api_route(
"/composio/events",
self.ingest_composio_event,
methods=["POST"],
operation_id="ingest_composio_event",
response_model=TriggerEventAck,
status_code=status.HTTP_202_ACCEPTED,
)

# --- Trigger Catalog ---
self.router.add_api_route(
"/catalog/providers/",
Expand Down Expand Up @@ -711,3 +758,52 @@ async def fetch_delivery(
count=1,
delivery=delivery,
)

# -----------------------------------------------------------------------
# Trigger Ingress (inbound provider events)
# -----------------------------------------------------------------------

@intercept_exceptions()
async def ingest_composio_event(
self,
request: Request,
) -> Any:
"""Receive a Composio provider event; verify, demux, ack-fast, enqueue.

Public (no Agenta auth) — mirrors the Stripe events receiver. Scope and
attribution are recovered downstream from the resolved subscription row.
"""
body = await request.body()

if not _verify_composio_signature(body=body, headers=request.headers):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"status": "error", "detail": "Signature verification failed"},
)

try:
envelope = loads(body) if body else {}
except JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid payload",
)

metadata = envelope.get("metadata") or {}
trigger_id = metadata.get("trigger_id") or metadata.get("nano_id")
event_id = metadata.get("id")

if not trigger_id or not event_id:
# Nothing to route — accept (no-op) so the provider does not retry.
return TriggerEventAck(
status="accepted", detail="No trigger_id/id to route"
)

if self.dispatch_task is not None:
await self.dispatch_task.kiq(
trigger_id=str(trigger_id),
event_id=str(event_id),
event=envelope,
)

return TriggerEventAck(status="accepted")
7 changes: 7 additions & 0 deletions api/oss/src/core/triggers/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
)


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

TRIGGER_MAX_RETRIES = 5


# ---------------------------------------------------------------------------
# Trigger Enums
# ---------------------------------------------------------------------------
Expand Down
Loading
Loading