diff --git a/docs/src/content.config.ts b/docs/src/content.config.ts index 5dbc874545d..5cba9253694 100644 --- a/docs/src/content.config.ts +++ b/docs/src/content.config.ts @@ -16,6 +16,12 @@ export const collections = { owner: 'invoke-ai', repo: 'InvokeAI', pagefind: false, + // Authenticate GitHub API requests so the release changelog loader uses + // the 5000 req/hr authenticated rate limit instead of the 60 req/hr + // unauthenticated limit (shared per CI runner IP), which causes + // intermittent "403 - rate limit exceeded" build failures. The token is + // optional, so local builds without it fall back to unauthenticated. + token: process.env.GITHUB_TOKEN, } ]), }) diff --git a/docs/src/content/docs/features/Multi-User Mode/admin-guide.mdx b/docs/src/content/docs/features/Multi-User Mode/admin-guide.mdx index bcef946fb3c..38c654f7d7c 100644 --- a/docs/src/content/docs/features/Multi-User Mode/admin-guide.mdx +++ b/docs/src/content/docs/features/Multi-User Mode/admin-guide.mdx @@ -42,6 +42,34 @@ Then restart the InvokeAI server backend from the command line or using the laun If at any time you wish to revert to single-user mode, simply comment out the `multiuser` line, or change "true" to "false". Then restart the server. Because of the way that browsers cache pages, users with open InvokeAI sessions may need to force-refresh their browsers. ::: +### Queue Scheduling + +The image generation queue is processed by a single worker, so jobs run one at a time. The `session_queue_mode` option controls the order in which pending jobs are selected: + +| Mode | Behavior | +|------|----------| +| `round_robin` (default) | Interleaves jobs across users — each user is served one job before any user is served a second. A single user enqueuing a large batch can no longer monopolize the queue. | +| `FIFO` | Strict first-come, first-serve. Jobs run in the order they were enqueued (respecting priority), so a large batch drains completely before the next user's jobs start. | + +Set it in `invokeai.yaml`: + +```yaml +multiuser: true +session_queue_mode: round_robin # or FIFO +``` + +Or via the `INVOKEAI_SESSION_QUEUE_MODE` environment variable: + +```bash +INVOKEAI_SESSION_QUEUE_MODE=FIFO +``` + +:::note +`session_queue_mode` only applies in multiuser mode. In single-user mode the queue is always FIFO regardless of this setting, since all jobs belong to the same account. +::: + +Round-robin fairness is determined from when each user's jobs were last started. Retained terminal queue history (see [`max_queue_history`](#configuration-reference)) does not slow dequeue scheduling — each dequeue's cost scales with the number of users who currently have pending jobs, not with the size of the history. + ### First Administrator Account When InvokeAI starts for the first time in multi-user mode, you'll see the **Administrator Setup** dialog. @@ -594,7 +622,7 @@ hashing_algorithm: blake3_multi ### How many users can InvokeAI support? -The backend will support dozens of concurrent users. However, because the image generation queue is single-threaded, image generation tasks are processed on a first-come, first-serve basis. This means that a user may have to wait for all the other users' image generation jobs to complete before their generation job starts to execute. +The backend will support dozens of concurrent users. However, because the image generation queue is single-threaded, only one job runs at a time. By default jobs are scheduled **round-robin** across users, so each user is served one job per turn and no single user can monopolize the queue with a large batch. You can switch to strict first-come, first-serve ordering with `session_queue_mode: FIFO` — see [Queue Scheduling](#queue-scheduling). A future version of InvokeAI may support concurrent execution on systems with multiple GPUs/graphics cards. diff --git a/docs/src/content/docs/features/Multi-User Mode/user-guide.mdx b/docs/src/content/docs/features/Multi-User Mode/user-guide.mdx index d595668a65e..f333be851b8 100644 --- a/docs/src/content/docs/features/Multi-User Mode/user-guide.mdx +++ b/docs/src/content/docs/features/Multi-User Mode/user-guide.mdx @@ -130,9 +130,9 @@ You cannot: - Cancel other users' generation jobs :::tip[The generation queue] -When two or more users are accessing InvokeAI at the same time, their image generation jobs will be placed on the session queue on a first-come, first-serve basis. This means that you will have to wait for other users' image rendering jobs to complete before yours will start. +When two or more users are accessing InvokeAI at the same time, their image generation jobs share a single session queue. By default the server schedules jobs **round-robin** across users: each user gets one turn before anyone gets a second turn. This means a user who enqueues a large batch can no longer monopolize the queue — your jobs are interleaved with theirs rather than waiting for their entire batch to drain first. (An administrator can switch the server back to strict first-come, first-serve ordering; see the admin guide.) -While other users' jobs are running you will see the shared image generation progress bar, and the queue badge will show a single number — the count of your own jobs that are pending or in progress. It does not show other users' counts. +While other users' jobs are running you will see the shared image generation progress bar, and the queue badge shows **`your jobs / all jobs`** — for example `2/5` means 2 of the 5 pending-or-in-progress jobs are yours. (In single-user mode the badge shows just a single total.) Open the Queue tab to see where your job sits in relation to the other queued tasks. ::: diff --git a/docs/src/generated/settings.json b/docs/src/generated/settings.json index fcb47dbfb23..f2c305fdd21 100644 --- a/docs/src/generated/settings.json +++ b/docs/src/generated/settings.json @@ -590,6 +590,20 @@ "type": "", "validation": {} }, + { + "category": "GENERATION", + "default": "round_robin", + "description": "Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.", + "env_var": "INVOKEAI_SESSION_QUEUE_MODE", + "literal_values": [ + "FIFO", + "round_robin" + ], + "name": "session_queue_mode", + "required": false, + "type": "typing.Literal['FIFO', 'round_robin']", + "validation": {} + }, { "category": "GENERATION", "default": false, diff --git a/invokeai/app/api/routers/session_queue.py b/invokeai/app/api/routers/session_queue.py index cd2260d2271..2be3e2d44b8 100644 --- a/invokeai/app/api/routers/session_queue.py +++ b/invokeai/app/api/routers/session_queue.py @@ -442,7 +442,9 @@ async def get_queue_status( current_user: CurrentUserOrDefault, queue_id: str = Path(description="The queue id to perform this operation on"), ) -> SessionQueueAndProcessorStatus: - """Gets the status of the session queue. Non-admin users see only their own counts and cannot see current item details unless they own it.""" + """Gets the status of the session queue. Returns global counts; non-admin users additionally + get their own pending/in_progress counts (so the UI can show an X/Y badge) and cannot see the + current item's identifiers unless they own it.""" try: user_id = None if current_user.is_admin else current_user.user_id queue = ApiDependencies.invoker.services.session_queue.get_queue_status(queue_id, user_id=user_id) diff --git a/invokeai/app/api/sockets.py b/invokeai/app/api/sockets.py index 7e93c332d64..8e48b40233c 100644 --- a/invokeai/app/api/sockets.py +++ b/invokeai/app/api/sockets.py @@ -260,20 +260,37 @@ async def _handle_sub_bulk_download(self, sid: str, data: Any) -> None: async def _handle_unsub_bulk_download(self, sid: str, data: Any) -> None: await self._sio.leave_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id) + def _owner_and_admin_sids(self, owner_user_id: str) -> list[str]: + """Sids belonging to the event's owner or to any admin. + + Used as `skip_sid` when broadcasting a sanitized companion event to the queue room, + so the owner and admins (who already received the full event) don't get a second + copy that would clobber their cache with redacted values. + """ + return [ + sid + for sid, info in self._socket_users.items() + if info.get("user_id") == owner_user_id or info.get("is_admin") + ] + async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): """Handle queue events with user isolation. - All queue item events (invocation events AND QueueItemStatusChangedEvent) are - private to the owning user and admins. They carry unsanitized user_id, batch_id, - session_id, origin, destination and error metadata, and must never be broadcast - to the whole queue room — otherwise any other authenticated subscriber could - observe cross-user queue activity. + Queue events split into two routing paths: - RecallParametersUpdatedEvent is also private to the owner + admins. + 1. The owner and admins receive the full unsanitized event in their `user:{id}` / + `admin` rooms. The full payload may include batch_id, session_id, origin, + destination, error metadata, etc. - BatchEnqueuedEvent carries the enqueuing user's batch_id/origin/counts and - is also routed privately. QueueClearedEvent is the only queue event that - is still broadcast to the whole queue room. + 2. For events that other authenticated users need to know about so their queue list + and badge counts stay in sync (QueueItemStatusChangedEvent and BatchEnqueuedEvent), + a sanitized companion event is also emitted to the full queue room with the + owner's and admins' sids in `skip_sid`. The companion uses `user_id="redacted"` + as a sentinel so the frontend handler knows to do tag invalidation only and skip + per-session side effects. + + InvocationEventBase events stay private (owner + admins only). RecallParametersUpdatedEvent + is also private. QueueClearedEvent has no user identity and is broadcast to the queue room. IMPORTANT: Check InvocationEventBase BEFORE QueueItemEventBase since InvocationEventBase inherits from QueueItemEventBase. The order of isinstance checks matters! @@ -302,10 +319,51 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): logger.debug(f"Emitted private invocation event {event_name} to user room {user_room} and admin room") - # Other queue item events (QueueItemStatusChangedEvent) carry unsanitized - # user_id, batch_id, session_id, origin, destination and error metadata. - # They are private to the owning user + admins — never broadcast to the - # full queue room. + # QueueItemStatusChangedEvent: full to owner+admin, sanitized to everyone else in + # the queue room so their queue list, badge, and item caches refresh. + elif isinstance(event_data, QueueItemStatusChangedEvent): + user_room = f"user:{event_data.user_id}" + await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room) + await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin") + + sanitized = event_data.model_copy( + update={ + "user_id": "redacted", + "batch_id": "redacted", + "session_id": "redacted", + "origin": None, + "destination": None, + "error_type": None, + "error_message": None, + "error_traceback": None, + } + ) + # Strip identifying fields out of the embedded batch_status / queue_status too. + sanitized.batch_status = sanitized.batch_status.model_copy( + update={"batch_id": "redacted", "origin": None, "destination": None} + ) + sanitized.queue_status = sanitized.queue_status.model_copy( + update={ + "item_id": None, + "session_id": None, + "batch_id": None, + "user_pending": None, + "user_in_progress": None, + } + ) + await self._sio.emit( + event=event_name, + data=sanitized.model_dump(mode="json"), + room=event_data.queue_id, + skip_sid=self._owner_and_admin_sids(event_data.user_id), + ) + + logger.debug( + f"Emitted queue_item_status_changed: full to {user_room}+admin, sanitized to queue {event_data.queue_id}" + ) + + # Other queue item events (currently none beyond QueueItemStatusChangedEvent that + # carry user_id) stay private to owner + admins. elif isinstance(event_data, QueueItemEventBase) and hasattr(event_data, "user_id"): user_room = f"user:{event_data.user_id}" await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room) @@ -331,14 +389,25 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): ) logger.debug(f"Emitted private recall_parameters_updated event to user room {user_room} and admin room") - # BatchEnqueuedEvent carries the enqueuing user's batch_id, origin, and - # enqueued counts. Route it privately to the owner + admins so other - # users do not observe cross-user batch activity. + # BatchEnqueuedEvent: full to owner+admin, sanitized to everyone else in the queue + # room so their badge total and queue list pick up the new items. elif isinstance(event_data, BatchEnqueuedEvent): user_room = f"user:{event_data.user_id}" await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room=user_room) await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin") - logger.debug(f"Emitted private batch_enqueued event to user room {user_room} and admin room") + + sanitized = event_data.model_copy( + update={"user_id": "redacted", "batch_id": "redacted", "origin": None} + ) + await self._sio.emit( + event=event_name, + data=sanitized.model_dump(mode="json"), + room=event_data.queue_id, + skip_sid=self._owner_and_admin_sids(event_data.user_id), + ) + logger.debug( + f"Emitted batch_enqueued: full to {user_room}+admin, sanitized to queue {event_data.queue_id}" + ) else: # For remaining queue events (e.g. QueueClearedEvent) that do not diff --git a/invokeai/app/services/config/config_default.py b/invokeai/app/services/config/config_default.py index e6cc7c2798c..b25b72f1405 100644 --- a/invokeai/app/services/config/config_default.py +++ b/invokeai/app/services/config/config_default.py @@ -30,6 +30,7 @@ ATTENTION_SLICE_SIZE = Literal["auto", "balanced", "max", 1, 2, 3, 4, 5, 6, 7, 8] LOG_FORMAT = Literal["plain", "color", "syslog", "legacy"] LOG_LEVEL = Literal["debug", "info", "warning", "error", "critical"] +SESSION_QUEUE_MODE = Literal["FIFO", "round_robin"] IMAGE_SUBFOLDER_STRATEGY = Literal["flat", "date", "type", "hash"] CONFIG_SCHEMA_VERSION = "4.0.3" EXTERNAL_PROVIDER_CONFIG_FIELDS = ( @@ -114,6 +115,7 @@ class InvokeAIAppConfig(BaseSettings): force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty). pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting. max_queue_size: Maximum number of items in the session queue. + session_queue_mode: Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.
Valid values: `FIFO`, `round_robin` clear_queue_on_startup: Empties session queue on startup. If true, disables `max_queue_history`. max_queue_history: Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true. allow_nodes: List of nodes to allow. Omit to allow all. @@ -214,6 +216,7 @@ class InvokeAIAppConfig(BaseSettings): force_tiled_decode: bool = Field(default=False, description="Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).") pil_compress_level: int = Field(default=1, description="The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.") max_queue_size: int = Field(default=10000, gt=0, description="Maximum number of items in the session queue.") + session_queue_mode: SESSION_QUEUE_MODE = Field(default="round_robin", description="Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.") clear_queue_on_startup: bool = Field(default=False, description="Empties session queue on startup. If true, disables `max_queue_history`.") max_queue_history: Optional[int] = Field(default=None, ge=0, description="Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true.") diff --git a/invokeai/app/services/session_queue/session_queue_sqlite.py b/invokeai/app/services/session_queue/session_queue_sqlite.py index c29ed9b0038..ca9c1ca607b 100644 --- a/invokeai/app/services/session_queue/session_queue_sqlite.py +++ b/invokeai/app/services/session_queue/session_queue_sqlite.py @@ -38,6 +38,67 @@ from invokeai.app.services.shared.sqlite.sqlite_common import SQLiteDirection from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase +# Round-robin dequeue (multiuser fairness): pick the next pending item from the user who was +# least-recently served. +# +# The "whose turn is it" ordering key is each candidate user's most recent started_at. We compute +# it with a *correlated* MAX subquery rather than a GROUP BY over all started rows: the candidate +# set is one row per user with pending work, and each MAX(started_at) WHERE user_id = ? is +# satisfied by an indexed seek (idx_session_queue_user_started_at) instead of scanning the full +# retained queue history. This keeps dequeue cost proportional to the number of active users, not +# to total history (which is unbounded by default). MAX() ignores NULL started_at values, so users +# with only pending items fall back to the epoch via COALESCE and are served first. +# +# Kept as a module constant so the scaling test can EXPLAIN QUERY PLAN the exact production SQL. +ROUND_ROBIN_DEQUEUE_QUERY = """--sql + WITH user_next_item AS ( + -- For each user, select their single best pending item (highest priority, then oldest). + SELECT + user_id, + item_id, + ROW_NUMBER() OVER ( + PARTITION BY user_id + ORDER BY priority DESC, item_id ASC + ) AS rn + FROM session_queue + WHERE status = 'pending' + ) + SELECT + sq.*, + u.display_name AS user_display_name, + u.email AS user_email + FROM session_queue sq + LEFT JOIN users u ON sq.user_id = u.user_id + JOIN user_next_item uni ON sq.item_id = uni.item_id AND uni.rn = 1 + ORDER BY + COALESCE( + ( + SELECT MAX(served.started_at) + FROM session_queue served + WHERE served.user_id = sq.user_id + ), + '1970-01-01' + ) ASC, + sq.item_id ASC + LIMIT 1 + """ + +# FIFO dequeue (single-user mode, or round_robin explicitly disabled): strict priority then +# insertion order. +FIFO_DEQUEUE_QUERY = """--sql + SELECT + sq.*, + u.display_name as user_display_name, + u.email as user_email + FROM session_queue sq + LEFT JOIN users u ON sq.user_id = u.user_id + WHERE sq.status = 'pending' + ORDER BY + sq.priority DESC, + sq.item_id ASC + LIMIT 1 + """ + class SqliteSessionQueue(SessionQueueBase): __invoker: Invoker @@ -210,22 +271,13 @@ async def enqueue_batch( return enqueue_result def dequeue(self) -> Optional[SessionQueueItem]: + config = self.__invoker.services.configuration + use_round_robin = config.multiuser and config.session_queue_mode == "round_robin" + + query = ROUND_ROBIN_DEQUEUE_QUERY if use_round_robin else FIFO_DEQUEUE_QUERY + with self._db.transaction() as cursor: - cursor.execute( - """--sql - SELECT - sq.*, - u.display_name as user_display_name, - u.email as user_email - FROM session_queue sq - LEFT JOIN users u ON sq.user_id = u.user_id - WHERE sq.status = 'pending' - ORDER BY - sq.priority DESC, - sq.item_id ASC - LIMIT 1 - """ - ) + cursor.execute(query) result = cast(Union[sqlite3.Row, None], cursor.fetchone()) if result is None: return None diff --git a/invokeai/app/services/shared/sqlite/sqlite_util.py b/invokeai/app/services/shared/sqlite/sqlite_util.py index 3e1d5c53f3e..14b6c61a85a 100644 --- a/invokeai/app/services/shared/sqlite/sqlite_util.py +++ b/invokeai/app/services/shared/sqlite/sqlite_util.py @@ -35,6 +35,7 @@ from invokeai.app.services.shared.sqlite_migrator.migrations.migration_30 import build_migration_30 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_31 import build_migration_31 from invokeai.app.services.shared.sqlite_migrator.migrations.migration_32 import build_migration_32 +from invokeai.app.services.shared.sqlite_migrator.migrations.migration_33 import build_migration_33 from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_impl import SqliteMigrator @@ -87,6 +88,7 @@ def init_db(config: InvokeAIAppConfig, logger: Logger, image_files: ImageFileSto migrator.register_migration(build_migration_30()) migrator.register_migration(build_migration_31()) migrator.register_migration(build_migration_32()) + migrator.register_migration(build_migration_33()) migrator.run_migrations() return db diff --git a/invokeai/app/services/shared/sqlite_migrator/migrations/migration_33.py b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_33.py new file mode 100644 index 00000000000..bf9e149e37d --- /dev/null +++ b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_33.py @@ -0,0 +1,59 @@ +"""Migration 33: Add indexes supporting round-robin dequeue. + +The round-robin dequeue in multiuser mode relies on two access shapes: + +1. A per-user "best pending item" selection (``status = 'pending'`` partitioned by + ``user_id`` and ordered by ``priority DESC, item_id ASC``). +2. A per-candidate "last served" lookup (``MAX(started_at)`` for a single ``user_id``), + evaluated as a correlated subquery once per user that has pending work. + +With only the pre-existing single-column indexes on ``status``, ``priority``, and +``user_id``, the pending selection falls back to scanning the table and the last-served +lookup scans all retained history. Because completed/failed/canceled history is retained +(and ``max_queue_history`` defaults to unbounded), that cost would grow with total queue +history rather than with the number of pending items / active users. + +``idx_session_queue_round_robin_pending`` lets the planner satisfy the pending selection +without touching historical rows. ``idx_session_queue_user_started_at`` turns each +``MAX(started_at) WHERE user_id = ?`` into an indexed seek (the planner's min/max +optimization reads the tail of the user's index range) rather than a scan, so the +last-served lookup costs ``O(log n)`` per active user instead of scanning history. +""" + +import sqlite3 + +from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_common import Migration + + +class Migration33Callback: + """Add composite indexes matching the round-robin dequeue query shapes.""" + + def __call__(self, cursor: sqlite3.Cursor) -> None: + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='session_queue';") + if cursor.fetchone() is None: + return + + # Pending-item selection: WHERE status = 'pending', PARTITION BY user_id, + # ORDER BY priority DESC, item_id ASC. + cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_session_queue_round_robin_pending + ON session_queue (status, user_id, priority DESC, item_id ASC); + """ + ) + + # Last-served lookup: MAX(started_at) WHERE user_id = ?) + cursor.execute( + """--sql + CREATE INDEX IF NOT EXISTS idx_session_queue_user_started_at + ON session_queue (user_id, started_at); + """ + ) + + +def build_migration_33() -> Migration: + return Migration( + from_version=32, + to_version=33, + callback=Migration33Callback(), + ) diff --git a/invokeai/frontend/web/openapi.json b/invokeai/frontend/web/openapi.json index 7033408b197..9b00a4ec45a 100644 --- a/invokeai/frontend/web/openapi.json +++ b/invokeai/frontend/web/openapi.json @@ -7648,7 +7648,7 @@ "get": { "tags": ["queue"], "summary": "Get Queue Status", - "description": "Gets the status of the session queue. Non-admin users see only their own counts and cannot see current item details unless they own it.", + "description": "Gets the status of the session queue. Returns global counts; non-admin users additionally\nget their own pending/in_progress counts (so the UI can show an X/Y badge) and cannot see the\ncurrent item's identifiers unless they own it.", "operationId": "get_queue_status", "security": [ { @@ -41198,6 +41198,13 @@ "description": "Maximum number of items in the session queue.", "default": 10000 }, + "session_queue_mode": { + "type": "string", + "enum": ["FIFO", "round_robin"], + "title": "Session Queue Mode", + "description": "Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.", + "default": "round_robin" + }, "clear_queue_on_startup": { "type": "boolean", "title": "Clear Queue On Startup", @@ -41423,7 +41430,7 @@ "additionalProperties": false, "type": "object", "title": "InvokeAIAppConfig", - "description": "Invoke's global app configuration.\n\nTypically, you won't need to interact with this class directly. Instead, use the `get_config` function from `invokeai.app.services.config` to get a singleton config object.\n\nAttributes:\n host: IP address to bind to. Use `0.0.0.0` to serve to your local network.\n port: Port to bind to.\n allow_origins: Allowed CORS origins.\n allow_credentials: Allow CORS credentials.\n allow_methods: Methods allowed for CORS.\n allow_headers: Headers allowed for CORS.\n ssl_certfile: SSL certificate file for HTTPS. See https://www.uvicorn.dev/settings/#https.\n ssl_keyfile: SSL key file for HTTPS. See https://www.uvicorn.dev/settings/#https.\n log_tokenization: Enable logging of parsed prompt tokens.\n patchmatch: Enable patchmatch inpaint code.\n models_dir: Path to the models directory.\n convert_cache_dir: Path to the converted models cache directory (DEPRECATED, but do not delete because it is needed for migration from previous versions).\n download_cache_dir: Path to the directory that contains dynamically downloaded models.\n legacy_conf_dir: Path to directory of legacy checkpoint config files.\n db_dir: Path to InvokeAI databases directory.\n outputs_dir: Path to directory for outputs.\n image_subfolder_strategy: Strategy for organizing images into subfolders. 'flat' stores all images in a single folder. 'date' organizes by YYYY/MM/DD. 'type' organizes by image category. 'hash' uses first 2 characters of UUID for filesystem performance.
Valid values: `flat`, `date`, `type`, `hash`\n custom_nodes_dir: Path to directory for custom nodes.\n style_presets_dir: Path to directory for style presets.\n workflow_thumbnails_dir: Path to directory for workflow thumbnails.\n log_handlers: Log handler. Valid options are \"console\", \"file=\", \"syslog=path|address:host:port\", \"http=\".\n log_format: Log format. Use \"plain\" for text-only, \"color\" for colorized output, \"legacy\" for 2.3-style logging and \"syslog\" for syslog-style.
Valid values: `plain`, `color`, `syslog`, `legacy`\n log_level: Emit logging messages at this level or higher.
Valid values: `debug`, `info`, `warning`, `error`, `critical`\n log_sql: Log SQL queries. `log_level` must be `debug` for this to do anything. Extremely verbose.\n log_level_network: Log level for network-related messages. 'info' and 'debug' are very verbose.
Valid values: `debug`, `info`, `warning`, `error`, `critical`\n use_memory_db: Use in-memory database. Useful for development.\n dev_reload: Automatically reload when Python sources are changed. Does not reload node definitions.\n profile_graphs: Enable graph profiling using `cProfile`.\n profile_prefix: An optional prefix for profile output files.\n profiles_dir: Path to profiles output directory.\n max_cache_ram_gb: The maximum amount of CPU RAM to use for model caching in GB. If unset, the limit will be configured based on the available RAM. In most cases, it is recommended to leave this unset.\n max_cache_vram_gb: The amount of VRAM to use for model caching in GB. If unset, the limit will be configured based on the available VRAM and the device_working_mem_gb. In most cases, it is recommended to leave this unset.\n log_memory_usage: If True, a memory snapshot will be captured before and after every model cache operation, and the result will be logged (at debug level). There is a time cost to capturing the memory snapshots, so it is recommended to only enable this feature if you are actively inspecting the model cache's behaviour.\n model_cache_keep_alive_min: How long to keep models in cache after last use, in minutes. A value of 0 (the default) means models are kept in cache indefinitely. If no model generations occur within the timeout period, the model cache is cleared using the same logic as the 'Clear Model Cache' button.\n device_working_mem_gb: The amount of working memory to keep available on the compute device (in GB). Has no effect if running on CPU. If you are experiencing OOM errors, try increasing this value.\n enable_partial_loading: Enable partial loading of models. This enables models to run with reduced VRAM requirements (at the cost of slower speed) by streaming the model from RAM to VRAM as its used. In some edge cases, partial loading can cause models to run more slowly if they were previously being fully loaded into VRAM.\n keep_ram_copy_of_weights: Whether to keep a full RAM copy of a model's weights when the model is loaded in VRAM. Keeping a RAM copy increases average RAM usage, but speeds up model switching and LoRA patching (assuming there is sufficient RAM). Set this to False if RAM pressure is consistently high.\n ram: DEPRECATED: This setting is no longer used. It has been replaced by `max_cache_ram_gb`, but most users will not need to use this config since automatic cache size limits should work well in most cases. This config setting will be removed once the new model cache behavior is stable.\n vram: DEPRECATED: This setting is no longer used. It has been replaced by `max_cache_vram_gb`, but most users will not need to use this config since automatic cache size limits should work well in most cases. This config setting will be removed once the new model cache behavior is stable.\n lazy_offload: DEPRECATED: This setting is no longer used. Lazy-offloading is enabled by default. This config setting will be removed once the new model cache behavior is stable.\n pytorch_cuda_alloc_conf: Configure the Torch CUDA memory allocator. This will impact peak reserved VRAM usage and performance. Setting to \"backend:cudaMallocAsync\" works well on many systems. The optimal configuration is highly dependent on the system configuration (device type, VRAM, CUDA driver version, etc.), so must be tuned experimentally.\n device: Preferred execution device. `auto` will choose the device depending on the hardware platform and the installed torch capabilities.
Valid values: `auto`, `cpu`, `cuda`, `mps`, `cuda:N` (where N is a device number)\n precision: Floating point precision. `float16` will consume half the memory of `float32` but produce slightly lower-quality images. The `auto` setting will guess the proper precision based on your video card and operating system.
Valid values: `auto`, `float16`, `bfloat16`, `float32`\n sequential_guidance: Whether to calculate guidance in serial instead of in parallel, lowering memory requirements.\n attention_type: Attention type.
Valid values: `auto`, `normal`, `xformers`, `sliced`, `torch-sdp`\n attention_slice_size: Slice size, valid when attention_type==\"sliced\".
Valid values: `auto`, `balanced`, `max`, `1`, `2`, `3`, `4`, `5`, `6`, `7`, `8`\n force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).\n pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.\n max_queue_size: Maximum number of items in the session queue.\n clear_queue_on_startup: Empties session queue on startup. If true, disables `max_queue_history`.\n max_queue_history: Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true.\n allow_nodes: List of nodes to allow. Omit to allow all.\n deny_nodes: List of nodes to deny. Omit to deny none.\n node_cache_size: How many cached nodes to keep in memory.\n hashing_algorithm: Model hashing algorthim for model installs. 'blake3_multi' is best for SSDs. 'blake3_single' is best for spinning disk HDDs. 'random' disables hashing, instead assigning a UUID to models. Useful when using a memory db to reduce model installation time, or if you don't care about storing stable hashes for models. Alternatively, any other hashlib algorithm is accepted, though these are not nearly as performant as blake3.
Valid values: `blake3_multi`, `blake3_single`, `random`, `md5`, `sha1`, `sha224`, `sha256`, `sha384`, `sha512`, `blake2b`, `blake2s`, `sha3_224`, `sha3_256`, `sha3_384`, `sha3_512`, `shake_128`, `shake_256`\n remote_api_tokens: List of regular expression and token pairs used when downloading models from URLs. The download URL is tested against the regex, and if it matches, the token is provided in as a Bearer token.\n scan_models_on_startup: Scan the models directory on startup, registering orphaned models. This is typically only used in conjunction with `use_memory_db` for testing purposes.\n unsafe_disable_picklescan: UNSAFE. Disable the picklescan security check during model installation. Recommended only for development and testing purposes. This will allow arbitrary code execution during model installation, so should never be used in production.\n allow_unknown_models: Allow installation of models that we are unable to identify. If enabled, models will be marked as `unknown` in the database, and will not have any metadata associated with them. If disabled, unknown models will be rejected during installation.\n multiuser: Enable multiuser support. When disabled, the application runs in single-user mode using a default system account with administrator privileges. When enabled, requires user authentication and authorization.\n strict_password_checking: Enforce strict password requirements. When True, passwords must contain uppercase, lowercase, and numbers. When False (default), any password is accepted but its strength (weak/moderate/strong) is reported to the user.\n external_alibabacloud_api_key: API key for Alibaba Cloud DashScope image generation.\n external_alibabacloud_base_url: Base URL override for Alibaba Cloud DashScope image generation.\n external_gemini_api_key: API key for Gemini image generation.\n external_openai_api_key: API key for OpenAI image generation.\n external_gemini_base_url: Base URL override for Gemini image generation.\n external_openai_base_url: Base URL override for OpenAI image generation.\n external_seedream_api_key: API key for Seedream image generation.\n external_seedream_base_url: Base URL override for Seedream image generation." + "description": "Invoke's global app configuration.\n\nTypically, you won't need to interact with this class directly. Instead, use the `get_config` function from `invokeai.app.services.config` to get a singleton config object.\n\nAttributes:\n host: IP address to bind to. Use `0.0.0.0` to serve to your local network.\n port: Port to bind to.\n allow_origins: Allowed CORS origins.\n allow_credentials: Allow CORS credentials.\n allow_methods: Methods allowed for CORS.\n allow_headers: Headers allowed for CORS.\n ssl_certfile: SSL certificate file for HTTPS. See https://www.uvicorn.dev/settings/#https.\n ssl_keyfile: SSL key file for HTTPS. See https://www.uvicorn.dev/settings/#https.\n log_tokenization: Enable logging of parsed prompt tokens.\n patchmatch: Enable patchmatch inpaint code.\n models_dir: Path to the models directory.\n convert_cache_dir: Path to the converted models cache directory (DEPRECATED, but do not delete because it is needed for migration from previous versions).\n download_cache_dir: Path to the directory that contains dynamically downloaded models.\n legacy_conf_dir: Path to directory of legacy checkpoint config files.\n db_dir: Path to InvokeAI databases directory.\n outputs_dir: Path to directory for outputs.\n image_subfolder_strategy: Strategy for organizing images into subfolders. 'flat' stores all images in a single folder. 'date' organizes by YYYY/MM/DD. 'type' organizes by image category. 'hash' uses first 2 characters of UUID for filesystem performance.
Valid values: `flat`, `date`, `type`, `hash`\n custom_nodes_dir: Path to directory for custom nodes.\n style_presets_dir: Path to directory for style presets.\n workflow_thumbnails_dir: Path to directory for workflow thumbnails.\n log_handlers: Log handler. Valid options are \"console\", \"file=\", \"syslog=path|address:host:port\", \"http=\".\n log_format: Log format. Use \"plain\" for text-only, \"color\" for colorized output, \"legacy\" for 2.3-style logging and \"syslog\" for syslog-style.
Valid values: `plain`, `color`, `syslog`, `legacy`\n log_level: Emit logging messages at this level or higher.
Valid values: `debug`, `info`, `warning`, `error`, `critical`\n log_sql: Log SQL queries. `log_level` must be `debug` for this to do anything. Extremely verbose.\n log_level_network: Log level for network-related messages. 'info' and 'debug' are very verbose.
Valid values: `debug`, `info`, `warning`, `error`, `critical`\n use_memory_db: Use in-memory database. Useful for development.\n dev_reload: Automatically reload when Python sources are changed. Does not reload node definitions.\n profile_graphs: Enable graph profiling using `cProfile`.\n profile_prefix: An optional prefix for profile output files.\n profiles_dir: Path to profiles output directory.\n max_cache_ram_gb: The maximum amount of CPU RAM to use for model caching in GB. If unset, the limit will be configured based on the available RAM. In most cases, it is recommended to leave this unset.\n max_cache_vram_gb: The amount of VRAM to use for model caching in GB. If unset, the limit will be configured based on the available VRAM and the device_working_mem_gb. In most cases, it is recommended to leave this unset.\n log_memory_usage: If True, a memory snapshot will be captured before and after every model cache operation, and the result will be logged (at debug level). There is a time cost to capturing the memory snapshots, so it is recommended to only enable this feature if you are actively inspecting the model cache's behaviour.\n model_cache_keep_alive_min: How long to keep models in cache after last use, in minutes. A value of 0 (the default) means models are kept in cache indefinitely. If no model generations occur within the timeout period, the model cache is cleared using the same logic as the 'Clear Model Cache' button.\n device_working_mem_gb: The amount of working memory to keep available on the compute device (in GB). Has no effect if running on CPU. If you are experiencing OOM errors, try increasing this value.\n enable_partial_loading: Enable partial loading of models. This enables models to run with reduced VRAM requirements (at the cost of slower speed) by streaming the model from RAM to VRAM as its used. In some edge cases, partial loading can cause models to run more slowly if they were previously being fully loaded into VRAM.\n keep_ram_copy_of_weights: Whether to keep a full RAM copy of a model's weights when the model is loaded in VRAM. Keeping a RAM copy increases average RAM usage, but speeds up model switching and LoRA patching (assuming there is sufficient RAM). Set this to False if RAM pressure is consistently high.\n ram: DEPRECATED: This setting is no longer used. It has been replaced by `max_cache_ram_gb`, but most users will not need to use this config since automatic cache size limits should work well in most cases. This config setting will be removed once the new model cache behavior is stable.\n vram: DEPRECATED: This setting is no longer used. It has been replaced by `max_cache_vram_gb`, but most users will not need to use this config since automatic cache size limits should work well in most cases. This config setting will be removed once the new model cache behavior is stable.\n lazy_offload: DEPRECATED: This setting is no longer used. Lazy-offloading is enabled by default. This config setting will be removed once the new model cache behavior is stable.\n pytorch_cuda_alloc_conf: Configure the Torch CUDA memory allocator. This will impact peak reserved VRAM usage and performance. Setting to \"backend:cudaMallocAsync\" works well on many systems. The optimal configuration is highly dependent on the system configuration (device type, VRAM, CUDA driver version, etc.), so must be tuned experimentally.\n device: Preferred execution device. `auto` will choose the device depending on the hardware platform and the installed torch capabilities.
Valid values: `auto`, `cpu`, `cuda`, `mps`, `cuda:N` (where N is a device number)\n precision: Floating point precision. `float16` will consume half the memory of `float32` but produce slightly lower-quality images. The `auto` setting will guess the proper precision based on your video card and operating system.
Valid values: `auto`, `float16`, `bfloat16`, `float32`\n sequential_guidance: Whether to calculate guidance in serial instead of in parallel, lowering memory requirements.\n attention_type: Attention type.
Valid values: `auto`, `normal`, `xformers`, `sliced`, `torch-sdp`\n attention_slice_size: Slice size, valid when attention_type==\"sliced\".
Valid values: `auto`, `balanced`, `max`, `1`, `2`, `3`, `4`, `5`, `6`, `7`, `8`\n force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty).\n pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.\n max_queue_size: Maximum number of items in the session queue.\n session_queue_mode: Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.
Valid values: `FIFO`, `round_robin`\n clear_queue_on_startup: Empties session queue on startup. If true, disables `max_queue_history`.\n max_queue_history: Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true.\n allow_nodes: List of nodes to allow. Omit to allow all.\n deny_nodes: List of nodes to deny. Omit to deny none.\n node_cache_size: How many cached nodes to keep in memory.\n hashing_algorithm: Model hashing algorthim for model installs. 'blake3_multi' is best for SSDs. 'blake3_single' is best for spinning disk HDDs. 'random' disables hashing, instead assigning a UUID to models. Useful when using a memory db to reduce model installation time, or if you don't care about storing stable hashes for models. Alternatively, any other hashlib algorithm is accepted, though these are not nearly as performant as blake3.
Valid values: `blake3_multi`, `blake3_single`, `random`, `md5`, `sha1`, `sha224`, `sha256`, `sha384`, `sha512`, `blake2b`, `blake2s`, `sha3_224`, `sha3_256`, `sha3_384`, `sha3_512`, `shake_128`, `shake_256`\n remote_api_tokens: List of regular expression and token pairs used when downloading models from URLs. The download URL is tested against the regex, and if it matches, the token is provided in as a Bearer token.\n scan_models_on_startup: Scan the models directory on startup, registering orphaned models. This is typically only used in conjunction with `use_memory_db` for testing purposes.\n unsafe_disable_picklescan: UNSAFE. Disable the picklescan security check during model installation. Recommended only for development and testing purposes. This will allow arbitrary code execution during model installation, so should never be used in production.\n allow_unknown_models: Allow installation of models that we are unable to identify. If enabled, models will be marked as `unknown` in the database, and will not have any metadata associated with them. If disabled, unknown models will be rejected during installation.\n multiuser: Enable multiuser support. When disabled, the application runs in single-user mode using a default system account with administrator privileges. When enabled, requires user authentication and authorization.\n strict_password_checking: Enforce strict password requirements. When True, passwords must contain uppercase, lowercase, and numbers. When False (default), any password is accepted but its strength (weak/moderate/strong) is reported to the user.\n external_alibabacloud_api_key: API key for Alibaba Cloud DashScope image generation.\n external_alibabacloud_base_url: Base URL override for Alibaba Cloud DashScope image generation.\n external_gemini_api_key: API key for Gemini image generation.\n external_openai_api_key: API key for OpenAI image generation.\n external_gemini_base_url: Base URL override for Gemini image generation.\n external_openai_base_url: Base URL override for OpenAI image generation.\n external_seedream_api_key: API key for Seedream image generation.\n external_seedream_base_url: Base URL override for Seedream image generation." }, "InvokeAIAppConfigWithSetFields": { "properties": { diff --git a/invokeai/frontend/web/src/services/api/schema.ts b/invokeai/frontend/web/src/services/api/schema.ts index 7864579706a..1992ac0d1e5 100644 --- a/invokeai/frontend/web/src/services/api/schema.ts +++ b/invokeai/frontend/web/src/services/api/schema.ts @@ -2143,7 +2143,9 @@ export type paths = { }; /** * Get Queue Status - * @description Gets the status of the session queue. Non-admin users see only their own counts and cannot see current item details unless they own it. + * @description Gets the status of the session queue. Returns global counts; non-admin users additionally + * get their own pending/in_progress counts (so the UI can show an X/Y badge) and cannot see the + * current item's identifiers unless they own it. */ get: operations["get_queue_status"]; put?: never; @@ -16211,6 +16213,7 @@ export type components = { * force_tiled_decode: Whether to enable tiled VAE decode (reduces memory consumption with some performance penalty). * pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting. * max_queue_size: Maximum number of items in the session queue. + * session_queue_mode: Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.
Valid values: `FIFO`, `round_robin` * clear_queue_on_startup: Empties session queue on startup. If true, disables `max_queue_history`. * max_queue_history: Keep the last N completed, failed, and canceled queue items. Older items are deleted on startup. Set to 0 to prune all terminal items. Ignored if `clear_queue_on_startup` is true. * allow_nodes: List of nodes to allow. Omit to allow all. @@ -16553,6 +16556,13 @@ export type components = { * @default 10000 */ max_queue_size?: number; + /** + * Session Queue Mode + * @description Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting. + * @default round_robin + * @enum {string} + */ + session_queue_mode?: "FIFO" | "round_robin"; /** * Clear Queue On Startup * @description Empties session queue on startup. If true, disables `max_queue_history`. diff --git a/invokeai/frontend/web/src/services/events/setEventListeners.tsx b/invokeai/frontend/web/src/services/events/setEventListeners.tsx index e6010ce4ca1..774d0aa9108 100644 --- a/invokeai/frontend/web/src/services/events/setEventListeners.tsx +++ b/invokeai/frontend/web/src/services/events/setEventListeners.tsx @@ -362,6 +362,24 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis }); socket.on('queue_item_status_changed', (data) => { + // Sanitized companion event sent to non-owner queue subscribers in multiuser mode. The + // backend sets user_id="redacted" and clears identifiers/error fields. We must not run + // payload-driven cache mutations or per-session side effects (node state reset, progress + // clear, completion bookkeeping) — those belong to the owner. Just invalidate queue tags + // so the non-owner's queue list and badge counts refetch with sanitized data. + if (data.user_id === 'redacted') { + log.trace({ data }, `Sanitized queue_item_status_changed for item ${data.item_id}`); + const tags: ApiTagDescription[] = [ + 'SessionQueueStatus', + 'SessionQueueItemIdList', + { type: 'SessionQueueItem', id: data.item_id }, + { type: 'SessionQueueItem', id: LIST_TAG }, + { type: 'SessionQueueItem', id: LIST_ALL_TAG }, + ]; + dispatch(queueApi.util.invalidateTags(tags)); + return; + } + if (!workflowExecutionCoordinator.onQueueItemStatusChanged(data)) { return; } diff --git a/tests/app/routers/test_multiuser_authorization.py b/tests/app/routers/test_multiuser_authorization.py index 8cca29dee69..95efbf6690c 100644 --- a/tests/app/routers/test_multiuser_authorization.py +++ b/tests/app/routers/test_multiuser_authorization.py @@ -223,6 +223,31 @@ def _create_workflow(client: TestClient, token: str) -> str: return r.json()["workflow_id"] +def _insert_pending_queue_item(session_queue: Any, user_id: str, queue_id: str = "default") -> int: + """Insert a pending queue item owned by ``user_id`` directly into the queue's database.""" + import uuid + + from invokeai.app.services.shared.graph import Graph, GraphExecutionState + from tests.test_nodes import PromptTestInvocation + + graph = Graph() + graph.add_node(PromptTestInvocation(id="prompt", prompt="test")) + session = GraphExecutionState(graph=graph) + session_json = session.model_dump_json(warnings=False, exclude_none=True) + with session_queue._db.transaction() as cursor: + cursor.execute( + """--sql + INSERT INTO session_queue ( + queue_id, session, session_id, batch_id, field_values, priority, + workflow, origin, destination, retried_from_item_id, user_id + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (queue_id, session_json, session.id, str(uuid.uuid4()), None, 0, None, None, None, None, user_id), + ) + return cursor.lastrowid + + # =========================================================================== # 1. Board-image mutation authorization # =========================================================================== @@ -1366,6 +1391,62 @@ def test_session_queue_status_has_user_fields(self): assert scoped.pending == 5 # global, unchanged assert scoped.user_pending == 2 # this user's share + def _setup_queue_router(self, mock_invoker: Invoker): + """Wire a real session queue and a stub processor into the invoker the router uses, + so GET /queue/{queue_id}/status exercises the real service contract.""" + from unittest.mock import MagicMock + + from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus + from invokeai.app.services.session_queue.session_queue_sqlite import SqliteSessionQueue + + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + mock_invoker.services.session_queue = queue + + processor = MagicMock() + processor.get_status.return_value = SessionProcessorStatus(is_started=True, is_processing=False) + mock_invoker.services.session_processor = processor + return queue + + def test_get_queue_status_route_returns_global_and_user_counts( + self, setup_jwt_secret: None, enable_multiuser: Any, mock_invoker: Invoker, client: TestClient + ): + """Regression test: GET /api/v1/queue/{queue_id}/status must return 200 (not 500) and the + expected global and per-user counts for both non-admin and admin callers. Previously the + router called get_queue_status() with a keyword the service did not accept, raising a + TypeError that the broad except turned into a 500 for every status request.""" + queue = self._setup_queue_router(mock_invoker) + + user1_id = _create_user(mock_invoker, "user1@test.com", "User One") + user2_id = _create_user(mock_invoker, "user2@test.com", "User Two") + _create_user(mock_invoker, "admin@test.com", "Admin", is_admin=True) + user1_tok = _login(client, "user1@test.com") + admin_tok = _login(client, "admin@test.com") + + # Three pending jobs globally: two owned by user1, one by user2; none by the admin. + _insert_pending_queue_item(queue, user_id=user1_id) + _insert_pending_queue_item(queue, user_id=user1_id) + _insert_pending_queue_item(queue, user_id=user2_id) + + # Non-admin caller sees the global total but only their own pending count. + r = client.get("/api/v1/queue/default/status", headers=_auth(user1_tok)) + assert r.status_code == 200 + queue_status = r.json()["queue"] + assert queue_status["pending"] == 3 + assert queue_status["total"] == 3 + assert queue_status["user_pending"] == 2 + assert queue_status["user_in_progress"] == 0 + + # Admin caller sees the same global total. Admins query with user_id=None, so no + # per-user counts are computed (the badge falls back to the global total for admins). + r = client.get("/api/v1/queue/default/status", headers=_auth(admin_tok)) + assert r.status_code == 200 + queue_status = r.json()["queue"] + assert queue_status["pending"] == 3 + assert queue_status["total"] == 3 + assert queue_status["user_pending"] is None + # =========================================================================== # 10b. Model install job authorization @@ -1732,8 +1813,11 @@ def test_batch_enqueued_event_carries_user_id(self) -> None: assert event.queue_id == "default" def test_queue_item_status_changed_routed_privately(self, socketio: Any) -> None: - """Verify that _handle_queue_event emits QueueItemStatusChangedEvent ONLY to - user:{user_id} and admin rooms, never to the queue_id room.""" + """_handle_queue_event must emit the FULL QueueItemStatusChangedEvent only to the + owner's user room and the admin room. A sanitized companion (user_id="redacted", + identifiers stripped) is also emitted to the queue_id room so other users' UIs can + refresh, with the owner's and admins' sids in skip_sid so they don't get a duplicate + that would clobber their cache.""" import asyncio from unittest.mock import AsyncMock @@ -1782,20 +1866,60 @@ def test_queue_item_status_changed_routed_privately(self, socketio: Any) -> None ), ) + # Track owner sid so we can verify skip_sid is honored + socketio._socket_users["sid-owner"] = {"user_id": "owner-xyz", "is_admin": False} + socketio._socket_users["sid-admin"] = {"user_id": "admin-1", "is_admin": True} + socketio._socket_users["sid-other"] = {"user_id": "other-user", "is_admin": False} + mock_emit = AsyncMock() socketio._sio.emit = mock_emit asyncio.run(socketio._handle_queue_event(("queue_item_status_changed", event))) - rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list] - assert "user:owner-xyz" in rooms_emitted_to - assert "admin" in rooms_emitted_to - # CRITICAL: must NOT emit to the queue_id room — that would leak to other users - assert "default" not in rooms_emitted_to + # Collect (room, payload, skip_sid) for each emit call + emits = [ + (c.kwargs.get("room"), c.kwargs.get("data"), c.kwargs.get("skip_sid")) for c in mock_emit.call_args_list + ] + + # Full event must go to owner room and admin room with original sensitive fields + owner_emits = [(p, s) for r, p, s in emits if r == "user:owner-xyz"] + admin_emits = [(p, s) for r, p, s in emits if r == "admin"] + assert len(owner_emits) == 1 and len(admin_emits) == 1 + for payload, _ in owner_emits + admin_emits: + assert payload["user_id"] == "owner-xyz" + assert payload["batch_id"] == "batch-private" + assert payload["session_id"] == "sess-private" + assert payload["destination"] == "canvas" + + # A sanitized companion event must go to the queue_id room with sensitive fields cleared + queue_emits = [(p, s) for r, p, s in emits if r == "default"] + assert len(queue_emits) == 1, "expected exactly one sanitized emit to queue room" + sanitized_payload, skip_sid = queue_emits[0] + assert sanitized_payload["user_id"] == "redacted" + assert sanitized_payload["batch_id"] == "redacted" + assert sanitized_payload["session_id"] == "redacted" + assert sanitized_payload["origin"] is None + assert sanitized_payload["destination"] is None + assert sanitized_payload["error_type"] is None + assert sanitized_payload["batch_status"]["batch_id"] == "redacted" + assert sanitized_payload["batch_status"]["destination"] is None + assert sanitized_payload["queue_status"]["item_id"] is None + assert sanitized_payload["queue_status"]["batch_id"] is None + assert sanitized_payload["queue_status"]["user_pending"] is None + # Owner and admin sids must be skipped so they don't receive the duplicate + assert "sid-owner" in skip_sid + assert "sid-admin" in skip_sid + # Third-party user must NOT be skipped — they need the sanitized event + assert "sid-other" not in skip_sid + # Status (non-sensitive) is preserved so the non-owner UI knows what changed + assert sanitized_payload["status"] == "in_progress" + assert sanitized_payload["item_id"] == 1 def test_batch_enqueued_routed_privately(self, socketio: Any) -> None: - """Verify that _handle_queue_event emits BatchEnqueuedEvent ONLY to - user:{user_id} and admin rooms, never to the queue_id room.""" + """_handle_queue_event must emit the FULL BatchEnqueuedEvent only to the owner's + user room and the admin room. A sanitized companion (user_id="redacted", batch_id + and origin stripped) is also emitted to the queue_id room so other users' badge + totals refresh, with owner/admin sids in skip_sid.""" import asyncio from unittest.mock import AsyncMock @@ -1816,15 +1940,39 @@ def test_batch_enqueued_routed_privately(self, socketio: Any) -> None: ) event = BatchEnqueuedEvent.build(enqueue_result, user_id="owner-zzz") + socketio._socket_users["sid-owner"] = {"user_id": "owner-zzz", "is_admin": False} + socketio._socket_users["sid-admin"] = {"user_id": "admin-1", "is_admin": True} + socketio._socket_users["sid-other"] = {"user_id": "other-user", "is_admin": False} + mock_emit = AsyncMock() socketio._sio.emit = mock_emit asyncio.run(socketio._handle_queue_event(("batch_enqueued", event))) - rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list] - assert "user:owner-zzz" in rooms_emitted_to - assert "admin" in rooms_emitted_to - assert "default" not in rooms_emitted_to + emits = [ + (c.kwargs.get("room"), c.kwargs.get("data"), c.kwargs.get("skip_sid")) for c in mock_emit.call_args_list + ] + + # Full event to owner + admin contains the real batch_id and origin + owner_emits = [(p, s) for r, p, s in emits if r == "user:owner-zzz"] + admin_emits = [(p, s) for r, p, s in emits if r == "admin"] + assert len(owner_emits) == 1 and len(admin_emits) == 1 + for payload, _ in owner_emits + admin_emits: + assert payload["user_id"] == "owner-zzz" + assert payload["batch_id"] == "batch-pvt" + assert payload["origin"] == "workflows" + + # Sanitized event to queue room: user/batch/origin redacted, owner+admin skipped + queue_emits = [(p, s) for r, p, s in emits if r == "default"] + assert len(queue_emits) == 1 + sanitized_payload, skip_sid = queue_emits[0] + assert sanitized_payload["user_id"] == "redacted" + assert sanitized_payload["batch_id"] == "redacted" + assert sanitized_payload["origin"] is None + assert sanitized_payload["enqueued"] == 5 # count is non-sensitive + assert "sid-owner" in skip_sid + assert "sid-admin" in skip_sid + assert "sid-other" not in skip_sid def test_queue_cleared_still_broadcast(self, socketio: Any) -> None: """QueueClearedEvent does not carry user identity and should still be broadcast diff --git a/tests/app/services/session_queue/test_session_queue_dequeue.py b/tests/app/services/session_queue/test_session_queue_dequeue.py new file mode 100644 index 00000000000..fcefad1834d --- /dev/null +++ b/tests/app/services/session_queue/test_session_queue_dequeue.py @@ -0,0 +1,293 @@ +"""Tests for session queue dequeue() ordering: FIFO and round-robin modes.""" + +import json +import uuid +from typing import Optional + +import pytest +from pydantic_core import to_jsonable_python + +from invokeai.app.services.config.config_default import InvokeAIAppConfig +from invokeai.app.services.invoker import Invoker +from invokeai.app.services.session_queue.session_queue_sqlite import ( + ROUND_ROBIN_DEQUEUE_QUERY, + SqliteSessionQueue, +) +from invokeai.app.services.shared.graph import Graph, GraphExecutionState + +_EMPTY_SESSION_JSON = json.dumps(to_jsonable_python(GraphExecutionState(graph=Graph()).model_dump())) + + +@pytest.fixture +def session_queue_fifo(mock_invoker: Invoker) -> SqliteSessionQueue: + """Queue backed by a single-user (FIFO) invoker.""" + # Default config has multiuser=False, so FIFO is always used. + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + return queue + + +@pytest.fixture +def session_queue_round_robin(mock_invoker: Invoker) -> SqliteSessionQueue: + """Queue backed by a multiuser invoker with round_robin mode.""" + mock_invoker.services.configuration = InvokeAIAppConfig( + use_memory_db=True, + node_cache_size=0, + multiuser=True, + session_queue_mode="round_robin", + ) + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + return queue + + +def _insert_queue_item( + session_queue: SqliteSessionQueue, + queue_id: str, + user_id: str, + priority: int = 0, +) -> int: + """Directly insert a minimal queue item and return its item_id.""" + session_id = str(uuid.uuid4()) + batch_id = str(uuid.uuid4()) + with session_queue._db.transaction() as cursor: + cursor.execute( + """--sql + INSERT INTO session_queue (queue_id, session, session_id, batch_id, field_values, priority, workflow, origin, destination, retried_from_item_id, user_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (queue_id, _EMPTY_SESSION_JSON, session_id, batch_id, None, priority, None, None, None, None, user_id), + ) + return cursor.lastrowid # type: ignore[return-value] + + +def _dequeue_user_ids(session_queue: SqliteSessionQueue, count: int) -> list[Optional[str]]: + """Dequeue `count` items and return the list of user_ids in dequeue order.""" + result = [] + for _ in range(count): + item = session_queue.dequeue() + result.append(item.user_id if item is not None else None) + return result + + +# --------------------------------------------------------------------------- +# FIFO tests +# --------------------------------------------------------------------------- + + +def test_fifo_single_user_order(session_queue_fifo: SqliteSessionQueue) -> None: + """FIFO: items from a single user are dequeued in insertion order.""" + queue_id = "default" + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + + user_ids = _dequeue_user_ids(session_queue_fifo, 3) + assert user_ids == ["user_a", "user_a", "user_a"] + + +def test_fifo_multi_user_preserves_insertion_order(session_queue_fifo: SqliteSessionQueue) -> None: + """FIFO: jobs from multiple users are dequeued in strict insertion order, not interleaved.""" + queue_id = "default" + # Insert A1, A2, B1, C1, C2, A3 – FIFO should preserve this exact order. + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + _insert_queue_item(session_queue_fifo, queue_id, "user_b") + _insert_queue_item(session_queue_fifo, queue_id, "user_c") + _insert_queue_item(session_queue_fifo, queue_id, "user_c") + _insert_queue_item(session_queue_fifo, queue_id, "user_a") + + user_ids = _dequeue_user_ids(session_queue_fifo, 6) + assert user_ids == ["user_a", "user_a", "user_b", "user_c", "user_c", "user_a"] + + +def test_fifo_priority_respected(session_queue_fifo: SqliteSessionQueue) -> None: + """FIFO: higher-priority items are dequeued before lower-priority ones.""" + queue_id = "default" + _insert_queue_item(session_queue_fifo, queue_id, "user_a", priority=0) + _insert_queue_item(session_queue_fifo, queue_id, "user_a", priority=10) + + user_ids = _dequeue_user_ids(session_queue_fifo, 2) + # Both are user_a; second inserted item has higher priority and should come first. + assert user_ids == ["user_a", "user_a"] + + +def test_fifo_returns_none_when_empty(session_queue_fifo: SqliteSessionQueue) -> None: + """FIFO: dequeue returns None when the queue is empty.""" + assert session_queue_fifo.dequeue() is None + + +# --------------------------------------------------------------------------- +# Round-robin tests +# --------------------------------------------------------------------------- + + +def test_round_robin_interleaves_users(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin: jobs from multiple users are interleaved one per user per round. + + Queue insertion order (matching the issue example): + A job 1, A job 2, B job 1, C job 1, C job 2, A job 3 + + Expected dequeue order: + A job 1, B job 1, C job 1, A job 2, C job 2, A job 3 + """ + queue_id = "default" + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_b") + _insert_queue_item(session_queue_round_robin, queue_id, "user_c") + _insert_queue_item(session_queue_round_robin, queue_id, "user_c") + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + + user_ids = _dequeue_user_ids(session_queue_round_robin, 6) + assert user_ids == ["user_a", "user_b", "user_c", "user_a", "user_c", "user_a"] + + +def test_round_robin_single_user_behaves_like_fifo(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin with only one user produces the same order as FIFO.""" + queue_id = "default" + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + + user_ids = _dequeue_user_ids(session_queue_round_robin, 3) + assert user_ids == ["user_a", "user_a", "user_a"] + + +def test_round_robin_handles_user_joining_mid_queue(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin: a user who joins later is correctly interleaved.""" + queue_id = "default" + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_a") + _insert_queue_item(session_queue_round_robin, queue_id, "user_b") + + user_ids = _dequeue_user_ids(session_queue_round_robin, 3) + # Round 1: A (oldest rank-1 item), B (rank-1 item) + # Round 2: A (rank-2 item) + assert user_ids == ["user_a", "user_b", "user_a"] + + +def test_round_robin_returns_none_when_empty(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin: dequeue returns None when the queue is empty.""" + assert session_queue_round_robin.dequeue() is None + + +def test_round_robin_priority_within_user_respected(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin: within a single user's items, higher priority is dequeued first.""" + queue_id = "default" + # Insert low-priority item first, then high-priority for same user. + _insert_queue_item(session_queue_round_robin, queue_id, "user_a", priority=0) + _insert_queue_item(session_queue_round_robin, queue_id, "user_a", priority=10) + _insert_queue_item(session_queue_round_robin, queue_id, "user_b", priority=0) + + # Round 1: user_a's best item (priority 10), user_b's only item. + # Round 2: user_a's remaining item (priority 0). + items = [] + for _ in range(3): + item = session_queue_round_robin.dequeue() + assert item is not None + items.append((item.user_id, item.priority)) + + assert items[0] == ("user_a", 10) + assert items[1] == ("user_b", 0) + assert items[2] == ("user_a", 0) + + +def _seed_completed_history( + session_queue: SqliteSessionQueue, + queue_id: str, + user_id: str, + count: int, +) -> None: + """Insert `count` completed items (with started_at set) for a user, simulating retained history.""" + with session_queue._db.transaction() as cursor: + for i in range(count): + session_id = str(uuid.uuid4()) + batch_id = str(uuid.uuid4()) + cursor.execute( + """--sql + INSERT INTO session_queue + (queue_id, session, session_id, batch_id, field_values, priority, workflow, origin, destination, retried_from_item_id, user_id, status, started_at, completed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'completed', ?, ?) + """, + ( + queue_id, + _EMPTY_SESSION_JSON, + session_id, + batch_id, + None, + 0, + None, + None, + None, + None, + user_id, + # Monotonically increasing timestamps so MAX(started_at) is well-defined per user. + f"2026-01-01 {i // 3600 % 24:02d}:{i // 60 % 60:02d}:{i % 60:02d}", + f"2026-01-01 {i // 3600 % 24:02d}:{i // 60 % 60:02d}:{i % 60:02d}", + ), + ) + + +def test_round_robin_dequeue_does_not_scan_full_history(session_queue_round_robin: SqliteSessionQueue) -> None: + """Round-robin dequeue cost must scale with active users, not retained queue history. + + Regression guard for the scaling concern: the per-user "last served" lookup must be an + indexed seek (MAX(started_at) WHERE user_id = ?) rather than a GROUP BY / scan over every + historical started row. `max_queue_history` is unbounded by default, so a plan that scans + the full history makes each dequeue O(total history) instead of O(active users). + + We seed a large completed history across several users plus a few pending items, then assert + the dequeue query plan never scans the `session_queue` base table and resolves the + last-served lookup via a seek on `idx_session_queue_user_started_at`. + """ + queue_id = "default" + for u in ("user_a", "user_b", "user_c"): + _seed_completed_history(session_queue_round_robin, queue_id, u, count=500) + _insert_queue_item(session_queue_round_robin, queue_id, u) + + with session_queue_round_robin._db.transaction() as cursor: + plan_rows = cursor.execute("EXPLAIN QUERY PLAN " + ROUND_ROBIN_DEQUEUE_QUERY).fetchall() + details = [row["detail"] for row in plan_rows] + + # No step may scan the session_queue base table — that is the full-history scan we are + # eliminating. (CTE result scans like "SCAN uni" / "SCAN (subquery-N)" are fine; those are + # one row per pending user.) + offending = [d for d in details if d.startswith("SCAN session_queue")] + assert not offending, f"dequeue plan scans full queue history: {offending}\nfull plan: {details}" + + # The last-served lookup must use the started_at index as a per-user seek. + assert any("idx_session_queue_user_started_at" in d and "user_id=?" in d for d in details), ( + f"last-served lookup is not an indexed seek; plan: {details}" + ) + + # And the dequeue must still return the least-recently-served user (correctness under history). + # user_a's history ends earliest only if seeded first; all three were seeded equal counts with + # identical timestamps, so item_id ASC tie-breaks to the first-inserted pending item (user_a). + item = session_queue_round_robin.dequeue() + assert item is not None + assert item.user_id == "user_a" + + +def test_round_robin_ignored_in_single_user_mode(mock_invoker: Invoker) -> None: + """When multiuser=False, round_robin config is ignored and FIFO is used.""" + mock_invoker.services.configuration = InvokeAIAppConfig( + use_memory_db=True, + node_cache_size=0, + multiuser=False, + session_queue_mode="round_robin", + ) + db = mock_invoker.services.board_records._db + queue = SqliteSessionQueue(db=db) + queue.start(mock_invoker) + + queue_id = "default" + _insert_queue_item(queue, queue_id, "user_a") + _insert_queue_item(queue, queue_id, "user_a") + _insert_queue_item(queue, queue_id, "user_b") + + # FIFO order: user_a, user_a, user_b + user_ids = _dequeue_user_ids(queue, 3) + assert user_ids == ["user_a", "user_a", "user_b"]