Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/src/client/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18918,6 +18918,7 @@ export const $Role = {
"tracecat-cli",
"tracecat-executor",
"tracecat-agent-executor",
"tracecat-case-duration-sync",
"tracecat-case-triggers",
"tracecat-llm-gateway",
"tracecat-mcp",
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/client/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10735,7 +10735,7 @@ export const caseDurationsDeleteCaseDurationDefinition = (

/**
* List Case Durations
* Sync and list case durations for the provided case.
* List materialized case durations for the provided case.
* @param data The data for the request.
* @param data.caseId
* @param data.workspaceId
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/client/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5780,6 +5780,7 @@ export type Role = {
| "tracecat-cli"
| "tracecat-executor"
| "tracecat-agent-executor"
| "tracecat-case-duration-sync"
| "tracecat-case-triggers"
| "tracecat-llm-gateway"
| "tracecat-mcp"
Expand All @@ -5800,6 +5801,7 @@ export type service_id =
| "tracecat-cli"
| "tracecat-executor"
| "tracecat-agent-executor"
| "tracecat-case-duration-sync"
| "tracecat-case-triggers"
| "tracecat-llm-gateway"
| "tracecat-mcp"
Expand Down
241 changes: 241 additions & 0 deletions tests/integration/test_case_duration_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
TRACECAT_CASE_DURATION_BENCHMARK_UPDATES_PER_CASE
TRACECAT_CASE_DURATION_BENCHMARK_HEALTH_INTERVAL_MS
TRACECAT_CASE_DURATION_BENCHMARK_HEALTH_TIMEOUT_MS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATORS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATIONS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOADS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_BASELINE_LOADS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOAD_INTERVAL_MS
TRACECAT_CASE_DURATION_BENCHMARK_OUTPUT
"""

from __future__ import annotations

import asyncio
import contextlib
import json
import math
import os
Expand All @@ -39,6 +45,8 @@
from tracecat.api.app import app
from tracecat.auth.types import Role
from tracecat.authz.scopes import ADMIN_SCOPES
from tracecat.cases.durations import consumer as duration_sync_consumer
from tracecat.cases.durations.consumer import CaseDurationSyncConsumer
from tracecat.cases.durations.schemas import (
CaseDurationAnchorSelection,
CaseDurationDefinitionCreate,
Expand All @@ -53,6 +61,7 @@
from tracecat.cases.schemas import CaseCreate, CaseUpdate
from tracecat.cases.service import CasesService
from tracecat.db.models import CaseEvent, Organization, Workspace
from tracecat.redis.client import get_redis_client

RUN_BENCHMARKS = os.environ.get("TRACECAT_RUN_CASE_DURATION_BENCHMARKS") == "1"
BENCHMARK_OUTPUT_PATH = os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_OUTPUT")
Expand Down Expand Up @@ -94,6 +103,25 @@ class CaseDurationBurstBenchmarkConfig:
)
/ 1000
)
hot_case_mutators: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATORS") or 4
)
hot_case_mutations: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATIONS") or 8
)
hot_case_loads: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOADS") or 12
)
hot_case_baseline_loads: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_BASELINE_LOADS") or 3
)
hot_case_load_interval_s: float = (
int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOAD_INTERVAL_MS")
or 10
)
/ 1000
)


def _percentile(values: list[float], percentile: float) -> float | None:
Expand Down Expand Up @@ -297,6 +325,77 @@ async def update_one_case(
)


async def _sync_initial_case_durations(*, async_engine, role: Role, case_id: uuid.UUID):
async with AsyncSession(async_engine, expire_on_commit=False) as session:
await CaseDurationService(session=session, role=role).sync_case_durations(
case_id
)
await session.commit()


async def _load_case_page_once(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
) -> float:
async def load_case_detail() -> None:
async with AsyncSession(async_engine, expire_on_commit=False) as session:
case = await CasesService(session=session, role=role).get_case(
case_id,
track_view=True,
)
if case is None:
raise AssertionError(f"Case {case_id} not found during benchmark")

async def load_case_durations() -> None:
async with AsyncSession(async_engine, expire_on_commit=False) as session:
await CaseDurationService(session=session, role=role).list_durations(
case_id
)

started = time.perf_counter()
await asyncio.gather(load_case_detail(), load_case_durations())
return time.perf_counter() - started


async def _load_case_page_repeatedly(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
load_count: int,
interval_s: float,
) -> list[float]:
latencies: list[float] = []
for _ in range(load_count):
latencies.append(
await _load_case_page_once(
async_engine=async_engine,
role=role,
case_id=case_id,
)
)
await asyncio.sleep(interval_s)
return latencies


async def _run_hot_case_update_burst(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
mutators: int,
mutations_per_mutator: int,
) -> tuple[list[float], int]:
return await _run_case_update_burst(
async_engine=async_engine,
role=role,
case_ids=[case_id for _ in range(mutators)],
updates_per_case=mutations_per_mutator,
)


@pytest.mark.anyio
async def test_case_duration_update_burst_health_latency(
monkeypatch: pytest.MonkeyPatch,
Expand Down Expand Up @@ -429,3 +528,145 @@ async def probe_health() -> None:
assert health_latencies["burst"]
finally:
await async_engine.dispose()


@pytest.mark.anyio
async def test_hot_case_load_latency_during_async_duration_mutation_burst(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Measure case-load latency while same-case mutations enqueue async sync."""

cfg = CaseDurationBurstBenchmarkConfig(case_count=1)
stream_suffix = uuid.uuid4().hex[:8]
monkeypatch.setattr(config, "TRACECAT__CASE_TRIGGERS_ENABLED", False)
monkeypatch.setattr(config, "TRACECAT__CASE_DURATION_SYNC_ENABLED", False)
monkeypatch.setattr(
config,
"TRACECAT__CASE_DURATION_SYNC_STREAM_KEY",
f"case-duration-sync-benchmark-{stream_suffix}",
)
monkeypatch.setattr(
config,
"TRACECAT__CASE_DURATION_SYNC_GROUP",
f"case-duration-sync-benchmark-{stream_suffix}",
)

async_engine = create_async_engine(
TEST_DB_CONFIG.test_url,
poolclass=NullPool,
)
consumer_task: asyncio.Task[None] | None = None

@contextlib.asynccontextmanager
async def benchmark_bypass_session():
async with AsyncSession(async_engine, expire_on_commit=False) as session:
yield session

try:
with (
patch.object(
CaseDurationDefinitionService,
"has_entitlement",
new=AsyncMock(return_value=True),
),
patch.object(
CaseDurationService,
"has_entitlement",
new=AsyncMock(return_value=True),
),
patch.object(
CasesService,
"has_entitlement",
new=AsyncMock(return_value=False),
),
patch.object(
duration_sync_consumer,
"get_async_session_bypass_rls_context_manager",
benchmark_bypass_session,
),
):
role = await _seed_benchmark_role(async_engine)
case_ids = await _seed_cases_definitions_and_history(
async_engine=async_engine,
role=role,
cfg=cfg,
)
case_id = case_ids[0]
await _sync_initial_case_durations(
async_engine=async_engine,
role=role,
case_id=case_id,
)
monkeypatch.setattr(config, "TRACECAT__CASE_DURATION_SYNC_ENABLED", True)
consumer = CaseDurationSyncConsumer(
await get_redis_client(),
consumer_name=f"duration-benchmark-{uuid.uuid4().hex[:8]}",
)
consumer_task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.1)

baseline_loads = await _load_case_page_repeatedly(
async_engine=async_engine,
role=role,
case_id=case_id,
load_count=cfg.hot_case_baseline_loads,
interval_s=cfg.hot_case_load_interval_s,
)

load_task = asyncio.create_task(
_load_case_page_repeatedly(
async_engine=async_engine,
role=role,
case_id=case_id,
load_count=cfg.hot_case_loads,
interval_s=cfg.hot_case_load_interval_s,
)
)
mutation_task = asyncio.create_task(
_run_hot_case_update_burst(
async_engine=async_engine,
role=role,
case_id=case_id,
mutators=cfg.hot_case_mutators,
mutations_per_mutator=cfg.hot_case_mutations,
)
)
burst_loads, (mutation_latencies, mutation_errors) = await asyncio.gather(
load_task,
mutation_task,
)
await asyncio.sleep(0.5)

summary: dict[str, object] = {
"config": {
"cases": cfg.case_count,
"definitions": cfg.definition_count,
"history_events_per_case": cfg.history_events_per_case,
"hot_case_mutators": cfg.hot_case_mutators,
"hot_case_mutations": cfg.hot_case_mutations,
"hot_case_loads": cfg.hot_case_loads,
"hot_case_baseline_loads": cfg.hot_case_baseline_loads,
"hot_case_load_interval_ms": round(cfg.hot_case_load_interval_s * 1000),
},
"case_load_baseline": _latency_stats(baseline_loads),
"case_load_burst": _latency_stats(burst_loads),
"mutation_latencies": _latency_stats(mutation_latencies),
"mutation_errors": mutation_errors,
}
_write_summary_to_file(summary)

print("\nHot case async duration sync benchmark:")
print(summary)

assert baseline_loads
assert burst_loads
assert mutation_latencies
assert mutation_errors == 0
finally:
if consumer_task is not None:
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass
await async_engine.dispose()
37 changes: 37 additions & 0 deletions tests/unit/test_case_duration_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import uuid
from unittest.mock import AsyncMock

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from tracecat.cases.durations.router import list_case_durations
from tracecat.cases.durations.service import CaseDurationService

pytestmark = pytest.mark.usefixtures("db")


@pytest.mark.anyio
async def test_list_case_durations_is_read_only(
session: AsyncSession,
svc_role,
monkeypatch: pytest.MonkeyPatch,
) -> None:
sync_mock = AsyncMock()
list_mock = AsyncMock(return_value=[])
commit_mock = AsyncMock()

monkeypatch.setattr(CaseDurationService, "sync_case_durations", sync_mock)
monkeypatch.setattr(CaseDurationService, "list_durations", list_mock)
monkeypatch.setattr(session, "commit", commit_mock)

case_id = uuid.uuid4()
result = await list_case_durations(
role=svc_role,
session=session,
case_id=case_id,
)

assert result == []
sync_mock.assert_not_awaited()
commit_mock.assert_not_awaited()
list_mock.assert_awaited_once_with(case_id)
Loading
Loading