diff --git a/python/ray/serve/_private/haproxy_templates.py b/python/ray/serve/_private/haproxy_templates.py index 62f3dc546c9..b5694792bf3 100644 --- a/python/ray/serve/_private/haproxy_templates.py +++ b/python/ray/serve/_private/haproxy_templates.py @@ -123,16 +123,23 @@ http-request wait-for-body time {{ ingress_request_router_timeout_s }}s if METH_POST has_ingress_request_router_app {%- endif %} http-request lua.route_via_ingress_request_router if METH_POST has_ingress_request_router_app - # Fail loudly when the Lua dispatch did not pick a replica. Must appear - # before the use_backend rules below so the request never falls back to - # the primary backend (which would be a silent bypass of the configured - # router policy). - http-request return status 503 content-type text/plain lf-string "Ingress request router failed: %[var(txn.ingress_request_router_failed)]" hdr X-Serve-Reason %[var(txn.ingress_request_router_failed)] if { var(txn.ingress_request_router_failed) -m found } + # Fail loudly when the Lua dispatch could not pin a replica, EXCEPT for + # `unknown_replica_id` (the router pinned a real replica HAProxy has not + # loaded yet, the brief membership gap after the app becomes RUNNING). That + # reason is recovered below by routing to the fallback Serve proxy. Must + # appear before the use_backend rules so other failures never fall through + # to the primary backend (a silent bypass of the configured router policy). + http-request return status 503 content-type text/plain lf-string "Ingress request router failed: %[var(txn.ingress_request_router_failed)]" hdr X-Serve-Reason %[var(txn.ingress_request_router_failed)] if { var(txn.ingress_request_router_failed) -m found } !{ var(txn.ingress_request_router_failed) -m str "unknown_replica_id" } {%- endif %} # Static routing based on path prefixes in decreasing length then alphabetical order {%- for backend in backends %} {%- if has_ingress_request_router and backend.ingress_request_router_servers %} use_backend {{ backend.name or 'unknown' }}-via-ingress-request-router if is_{{ backend.name or 'unknown' }} { var(txn.via_ingress_request_router) -m found } + {%- if backend.fallback_server %} + # Pin-miss recovery: route the unloaded-replica reason into the router + # backend too, where it selects the fallback Serve proxy. + use_backend {{ backend.name or 'unknown' }}-via-ingress-request-router if is_{{ backend.name or 'unknown' }} { var(txn.ingress_request_router_failed) -m str "unknown_replica_id" } + {%- endif %} {%- endif %} use_backend {{ backend.name or 'unknown' }} if is_{{ backend.name or 'unknown' }} {%- endfor %} @@ -209,6 +216,14 @@ {%- for server in backend.servers %} use-server {{ server.name }} if { var(txn.ingress_request_router_target) -m str "{{ server.name }}" } {%- endfor %} + {%- if backend.fallback_server %} + # Pin-miss fallback: the router named a replica not yet in our server map + # (brief membership gap after the app becomes RUNNING). Hand the request to + # the fallback Serve proxy instead of 503ing; it re-pins via the same + # consistent-hash router. Other router failures still fail loud (see the + # frontend 503 rule). + use-server {{ backend.fallback_server.name }} if { var(txn.ingress_request_router_failed) -m str "unknown_replica_id" } + {%- endif %} # `track` allows us to mirror primary-backend health and avoid double-checking. {%- for server in backend.servers %} server {{ server.name }} {{ server.host }}:{{ server.port }} track {{ backend.name or 'unknown' }}/{{ server.name }} diff --git a/python/ray/serve/_private/ingress_request_router.lua.tmpl b/python/ray/serve/_private/ingress_request_router.lua.tmpl index 19488ea2896..a6b57b30ce8 100644 --- a/python/ray/serve/_private/ingress_request_router.lua.tmpl +++ b/python/ray/serve/_private/ingress_request_router.lua.tmpl @@ -159,6 +159,12 @@ local function _route_via_ingress_request_router(txn, router, replica_map) local server_name = replica_map[replica_id] if not server_name then + -- Pin-miss: the router named a replica HAProxy has not loaded into its + -- server map yet (the brief membership gap after an app becomes + -- RUNNING, where the router's in-process view runs ahead of HAProxy's + -- config reload). Arm `failed` so the failures metric counts it by + -- reason; the frontend routes this specific reason to the fallback + -- Serve proxy instead of 503ing. txn:set_var("txn.ingress_request_router_failed", "unknown_replica_id") return end @@ -172,6 +178,10 @@ end -- rule fires instead of letting the request silently fall through to the -- primary backend. The product invariant is: requests to a router-bearing app -- are served via the router or fail; there is no quiet alternative path. +-- Exception: `unknown_replica_id` (the router pinned a real replica HAProxy +-- has not loaded yet) is recovered by the frontend through the fallback Serve +-- proxy rather than 503ed, because that path re-pins via the same router and +-- so does not bypass the routing policy. It is still counted as a failure. -- Two silent returns are correct: (1) the request didn't target a -- router-bearing app (no txn var set, no app entry in our maps), and -- (2) the controller hasn't pushed router/replica state for this app yet. diff --git a/python/ray/serve/tests/test_haproxy_api.py b/python/ray/serve/tests/test_haproxy_api.py index e640ffc31f8..6e58222cad9 100644 --- a/python/ray/serve/tests/test_haproxy_api.py +++ b/python/ray/serve/tests/test_haproxy_api.py @@ -1140,6 +1140,122 @@ async def test_router_failure_fails_loud_with_reason(haproxy_api_cleanup): pass +@pytest.mark.asyncio +async def test_pin_miss_falls_back_to_fallback_server(haproxy_api_cleanup): + """When the router pins a replica_id that is not in HAProxy's server map + (the brief membership gap right after an app becomes RUNNING, where the + router's in-process view runs ahead of HAProxy's config reload), HAProxy + must hand the request to the fallback Serve proxy instead of returning 503. + The primary backend must not be load-balanced into, since that would break + session affinity.""" + with tempfile.TemporaryDirectory() as temp_dir: + haproxy_port = find_free_port() + stats_port = find_free_port() + replica_port = find_free_port() + fallback_port = find_free_port() + router_port = find_free_port() + + actor_name = "SERVE_REPLICA::app#dep#aaa" + # The router names a replica that is NOT among the configured servers, + # simulating HAProxy lagging the router's freshly-updated view. + unknown_actor_name = "SERVE_REPLICA::app#dep#not_loaded_yet" + + replica, replica_thread = _create_replica_server( + replica_port, replica_id_header="A" + ) + fallback, fallback_thread = _create_replica_server( + fallback_port, replica_id_header="FALLBACK" + ) + router, router_thread, _ = _create_router_server( + router_port, replica_id_to_return=unknown_actor_name + ) + + try: + config = HAProxyConfig( + http_options=HTTPOptions( + host="127.0.0.1", + port=haproxy_port, + keep_alive_timeout_s=58, + ), + stats_port=stats_port, + socket_path=os.path.join(temp_dir, "admin.sock"), + has_received_routes=True, + has_received_servers=True, + health_check_path="/-/healthz", + health_check_inter="500ms", + health_check_rise=1, + health_check_fall=2, + ) + + backend = BackendConfig( + name="llm", + path_prefix="/", + app_name="llm", + health_check_path="/-/healthz", + servers=[ + ServerConfig( + name="A", + host="127.0.0.1", + port=replica_port, + replica_id=actor_name, + ), + ], + ingress_request_router_servers=[ + ServerConfig(name="router", host="127.0.0.1", port=router_port), + ], + fallback_server=ServerConfig( + name="fallback", + host="127.0.0.1", + port=fallback_port, + ), + ) + + api = HAProxyApi( + cfg=config, + backend_configs={"llm": backend}, + config_file_path=os.path.join(temp_dir, "haproxy.cfg"), + ) + haproxy_api_cleanup(api) + await api.start() + + wait_for_condition(lambda: check_haproxy_ready(stats_port), timeout=10) + await async_wait_for_condition( + lambda: requests.get( + f"http://127.0.0.1:{haproxy_port}/-/healthz", timeout=2 + ).status_code + == 200, + timeout=10, + ) + + # The router pins an unknown replica, so every POST must land on the + # fallback proxy: 200, served by "FALLBACK", never 503. + for _ in range(3): + resp = requests.post( + f"http://127.0.0.1:{haproxy_port}/predict", + json={"prompt": "hi"}, + timeout=5, + ) + assert resp.status_code == 200, resp.text + assert resp.headers.get("x-replica-id") == "FALLBACK", resp.headers + + # The affinity-breaking primary backend must never be selected. + stats_csv = requests.get( + f"http://127.0.0.1:{stats_port}/stats;csv", timeout=5 + ).text + assert _backend_stot(stats_csv, "llm") == 0, stats_csv + finally: + for srv in (replica, fallback, router): + try: + srv.should_exit = True + except Exception: + pass + for thr in (replica_thread, fallback_thread, router_thread): + try: + thr.join(timeout=5) + except Exception: + pass + + @pytest.mark.asyncio async def test_graceful_reload(haproxy_api_cleanup): """Test that graceful reload preserves long-running connections."""