Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
Shared helpers for direct-streaming session-affinity tests.
"""

import itertools

import httpx
import pytest

from ray import serve
from ray._common.test_utils import wait_for_condition
from ray.llm._internal.serve.constants import RAY_SERVE_LLM_ENABLE_DIRECT_STREAMING
from ray.serve._private.constants import RAY_SERVE_ENABLE_HA_PROXY, SERVE_SESSION_ID
from ray.serve._private.test_utils import check_running, get_application_url
from ray.serve._private.test_utils import (
check_running,
get_application_url,
get_application_urls,
)
from ray.serve.config import RequestRouterConfig

CONSISTENT_HASH_ROUTER = (
Expand Down Expand Up @@ -39,21 +45,51 @@ def consistent_hash_deployment_config() -> dict:


def run_app_through_haproxy(app, timeout_s: int = 60) -> str:
"""Run ``app`` and return its (HAProxy) URL once all replicas are RUNNING."""
"""Run ``app`` and return its (HAProxy) URL once HAProxy can route to every
data-plane replica.

``ApplicationStatus.RUNNING`` only means the controller sees every replica
running. HAProxy reloads its data-plane server map asynchronously after
that, and the ingress request router learns the running replicas through a
separate long poll that usually runs ahead of the reload. In that window the
router can pin a replica HAProxy has not loaded yet, which HAProxy rejects
with a 503 ``unknown_replica_id``. Warm up until a request has reached every
replica so the test body never races the reload.
"""
serve.run(app)
wait_for_condition(check_running, timeout=timeout_s)
return get_application_url(use_localhost=True)
base_url = get_application_url(use_localhost=True)
_wait_until_all_replicas_routable(base_url, timeout_s=timeout_s)
return base_url


def session_chat_response(base_url: str, session_id: str, model: str = "test-model"):
"""POST a one-token chat request carrying ``session_id`` through HAProxy.
def _wait_until_all_replicas_routable(base_url: str, timeout_s: int = 60) -> None:
"""Block until HAProxy routes successfully to every data-plane replica.

Asserts the request succeeded and the session id survived the HAProxy hop to
the serving replica. Returns the response so callers can read the serving
replica from the ``x-replica-id`` header (and, for P/D, the prefill replica
from ``kv_transfer_params.remote_engine_id``).
Probes distinct sessions so consistent hashing eventually covers every
replica. A 503 means HAProxy has not loaded that replica yet, so the probe
is retried under a fresh session until coverage is complete.
"""
resp = httpx.post(
# One HTTP data-plane target per running replica of the ingress deployment.
expected_replicas = len(get_application_urls(use_localhost=True))
reached = set()
sessions = itertools.count()

def _all_replicas_reached() -> bool:
for _ in range(max(expected_replicas * 6, 40)):
resp = _chat_request(base_url, f"warmup-session-{next(sessions)}")
if resp.status_code == 200:
reached.add(resp.headers["x-replica-id"])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

During HAProxy startup or reload, _chat_request might raise transient connection or timeout errors (e.g., httpx.ConnectError or httpx.HTTPError). If an exception is raised, it will abort the loop of probes immediately, which can cause the wait_for_condition check to fail or run much slower. Wrapping the request in a try-except block to catch httpx.HTTPError and safely checking for the presence of the x-replica-id header makes the warmup process significantly more robust.

Suggested change
resp = _chat_request(base_url, f"warmup-session-{next(sessions)}")
if resp.status_code == 200:
reached.add(resp.headers["x-replica-id"])
try:
resp = _chat_request(base_url, f"warmup-session-{next(sessions)}")
if resp.status_code == 200 and "x-replica-id" in resp.headers:
reached.add(resp.headers["x-replica-id"])
except httpx.HTTPError:
pass

if len(reached) >= expected_replicas:
return True
return False

wait_for_condition(_all_replicas_reached, timeout=timeout_s)


def _chat_request(base_url: str, session_id: str, model: str = "test-model"):
"""POST a one-token chat request carrying ``session_id`` through HAProxy."""
return httpx.post(
f"{base_url}/v1/chat/completions",
json={
"model": model,
Expand All @@ -63,6 +99,17 @@ def session_chat_response(base_url: str, session_id: str, model: str = "test-mod
headers={SERVE_SESSION_ID: session_id},
timeout=30,
)


def session_chat_response(base_url: str, session_id: str, model: str = "test-model"):
"""POST a one-token chat request carrying ``session_id`` through HAProxy.

Asserts the request succeeded and the session id survived the HAProxy hop to
the serving replica. Returns the response so callers can read the serving
replica from the ``x-replica-id`` header (and, for P/D, the prefill replica
from ``kv_transfer_params.remote_engine_id``).
"""
resp = _chat_request(base_url, session_id, model)
assert resp.status_code == 200, resp.text
assert resp.headers["x-serve-session-id"] == session_id
return resp
Loading