-
Notifications
You must be signed in to change notification settings - Fork 3.2k
feat(voice): surface inference quota errors instead of going silent #6012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
df9d897
e7357c6
8411107
4b82451
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| import logging | ||
|
|
||
| from dotenv import load_dotenv | ||
|
|
||
| from livekit.agents import ( | ||
| Agent, | ||
| AgentServer, | ||
| AgentSession, | ||
| APIQuotaExceededError, | ||
| JobContext, | ||
| cli, | ||
| inference, | ||
| ) | ||
| from livekit.agents.voice.events import CloseEvent, ErrorEvent | ||
| from livekit.plugins import silero | ||
|
|
||
| logger = logging.getLogger("quota-exceeded") | ||
|
|
||
| load_dotenv() | ||
|
|
||
| # This example shows how to keep a voice agent from going *silently* unresponsive | ||
| # when the LLM endpoint returns `429 inference_quota_exceeded` (e.g. the project ran | ||
| # out of LiveKit Inference credits). | ||
| # | ||
| # Without this handling, such an error used to make the agent join the room, publish | ||
| # its track, and then never speak. The SDK now surfaces a terminal quota error on the | ||
| # FIRST occurrence (instead of after several dead turns) and, by default, speaks the | ||
| # gateway's own `hint` before the session closes. This example builds on that: | ||
| # | ||
| # 1. `error_message=...` replaces the default spoken line with your own message | ||
| # (omit it to keep speaking the quota `hint`; pass None to disable spoken errors). | ||
| # | ||
| # 2. The `@session.on("error")` handler shows how to read the typed | ||
| # `APIQuotaExceededError` (status_code, quota_type, hint, ...) so you can forward | ||
| # a structured "out of credits" state to your frontend. | ||
|
|
||
| server = AgentServer() | ||
|
|
||
|
|
||
| @server.rtc_session() | ||
| async def entrypoint(ctx: JobContext): | ||
| session = AgentSession( | ||
| stt=inference.STT("deepgram/nova-3"), | ||
| llm=inference.LLM("openai/gpt-4.1-mini"), | ||
| tts=inference.TTS("cartesia/sonic-3"), | ||
| vad=silero.VAD.load(), | ||
| # spoken just before the session closes on an unrecoverable error so the agent | ||
| # is never silent. Omit this argument entirely to keep the default behavior | ||
| # (speak the quota `hint`); pass None to disable spoken errors. | ||
| error_message="Sorry, the assistant is temporarily unavailable. Please try again later.", | ||
| ) | ||
|
|
||
| @session.on("error") | ||
| def on_error(ev: ErrorEvent) -> None: | ||
| # ErrorEvent.error is the LLMError/STTError/TTSError wrapper; the underlying | ||
| # API exception is at ev.error.error | ||
| err = ev.error.error | ||
| # this handler also sees transient errors (e.g. rate limits, including retry | ||
| # attempts); only a *terminal* quota error means the project is out of credits | ||
| # and will fail identically every turn until the quota resets | ||
| if isinstance(err, APIQuotaExceededError) and err.terminal: | ||
| logger.warning( | ||
| "inference quota exceeded", | ||
| extra={ | ||
| "quota_type": err.quota_type, # "llm" | "stt" | "tts" | ... | ||
| "category": err.category, # e.g. "MaxGatewayCredits" | ||
| "hint": err.hint, | ||
| "remaining_limit": err.remaining_limit, | ||
| }, | ||
| ) | ||
| # forward a structured signal so the frontend can render an | ||
| # "out of credits" state instead of dead air. `session.on` handlers are | ||
| # sync, so spawn a task for async work (add `import asyncio` above), e.g.: | ||
| # | ||
| # asyncio.create_task( | ||
| # ctx.room.local_participant.set_attributes( | ||
| # {"agent_error": "quota_exceeded", "quota_type": err.quota_type or ""} | ||
| # ) | ||
| # ) | ||
|
|
||
| @session.on("close") | ||
| def on_close(ev: CloseEvent) -> None: | ||
| logger.info("session closed", extra={"reason": ev.reason}) | ||
|
|
||
| await session.start(agent=Agent(instructions="You are a helpful assistant."), room=ctx.room) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| cli.run_app(server) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,21 @@ | ||
| from __future__ import annotations | ||
|
|
||
| INFERENCE_QUOTA_EXCEEDED_TYPE = "inference_quota_exceeded" | ||
| """Value of the ``type`` field in a LiveKit Inference 429 quota response body.""" | ||
|
|
||
| # The gateway returns `inference_quota_exceeded` for two different classes of 429. | ||
| # These categories mean a billing quota is exhausted ("Wait for the next billing | ||
| # cycle …") — they will fail identically every turn until the quota resets, so they | ||
| # are terminal and non-retryable. Every other category (rate/concurrency limits like | ||
| # MaxConcurrentGatewayLLMRpm/Tpm) is transient: it recovers via backoff, so it stays | ||
| # retryable and non-terminal. See agent-gateway `pkg/quota/response.go::quotaHint`. | ||
| _TERMINAL_QUOTA_CATEGORIES = frozenset({"MaxGatewayCredits", "MaxBargeInRequests"}) | ||
|
|
||
|
|
||
| def _str_or_none(value: object) -> str | None: | ||
| """Coerce an untrusted JSON field to ``str``; non-str values become ``None``.""" | ||
| return value if isinstance(value, str) else None | ||
|
|
||
|
|
||
| class AssignmentTimeoutError(Exception): | ||
| """Raised when accepting a job but not receiving an assignment within the specified timeout. | ||
|
|
@@ -29,14 +45,32 @@ class APIError(Exception): | |
| """ | ||
|
|
||
| retryable: bool = False | ||
| """Whether the error can be retried.""" | ||
| """Whether the error can be retried (within the request's retry loop).""" | ||
|
|
||
| terminal: bool = False | ||
| """Whether the error is terminal — it will fail identically on every turn, so | ||
| callers should surface it immediately rather than absorbing it under a | ||
| transient-error tolerance (e.g. ``AgentSession``'s ``max_unrecoverable_errors``). | ||
|
|
||
| Independent of ``retryable``: ``retryable`` governs in-request retries, while | ||
| ``terminal`` governs whether higher-level loops should give up at once. A quota | ||
| error from depleted credits is both non-retryable and terminal; a transient | ||
| rate-limit is non-terminal (and may be retryable).""" | ||
|
|
||
| def __init__(self, message: str, *, body: object | None = None, retryable: bool = True) -> None: | ||
| def __init__( | ||
| self, | ||
| message: str, | ||
| *, | ||
| body: object | None = None, | ||
| retryable: bool = True, | ||
| terminal: bool = False, | ||
| ) -> None: | ||
| super().__init__(message) | ||
|
|
||
| self.message = message | ||
| self.body = body | ||
| self.retryable = retryable | ||
| self.terminal = terminal | ||
|
|
||
| def __str__(self) -> str: | ||
| return self.message | ||
|
|
@@ -62,6 +96,7 @@ def __init__( | |
| request_id: str | None = None, | ||
| body: object | None = None, | ||
| retryable: bool | None = None, | ||
| terminal: bool = False, | ||
| ) -> None: | ||
| if retryable is None: | ||
| retryable = True | ||
|
|
@@ -73,7 +108,7 @@ def __init__( | |
| if 400 <= status_code < 500 and status_code not in (408, 429, 499): | ||
| retryable = False | ||
|
|
||
| super().__init__(message, body=body, retryable=retryable) | ||
| super().__init__(message, body=body, retryable=retryable, terminal=terminal) | ||
|
|
||
| self.status_code = status_code | ||
| self.request_id = request_id | ||
|
|
@@ -100,6 +135,134 @@ def __repr__(self) -> str: | |
| ) | ||
|
|
||
|
|
||
| class APIQuotaExceededError(APIStatusError): | ||
| """Raised when the inference gateway rejects a request because a usage quota | ||
| or rate limit has been exhausted. | ||
|
|
||
| LiveKit Inference answers an exhausted project with ``HTTP 429`` and a | ||
| structured JSON body (``type == "inference_quota_exceeded"``). This error | ||
| surfaces the fields of that body directly so callers can render or speak a | ||
| precise, user-facing message (``hint``) instead of leaving the agent silent. | ||
|
|
||
| The gateway uses this single ``type`` for two different conditions, told apart by | ||
| ``category``: | ||
|
|
||
| * **Credit/quota exhaustion** (``MaxGatewayCredits``, ``MaxBargeInRequests``) — | ||
| recovers only at the next billing cycle, so it is :attr:`terminal` and | ||
| ``retryable=False``. | ||
| * **Rate / concurrency limits** (e.g. ``MaxConcurrentGatewayLLMRpm`` / ``…Tpm``) — | ||
| recover within ~a minute via backoff, so they stay ``retryable=True`` and | ||
| non-terminal (they fall through the usual transient-error handling). | ||
|
|
||
| ``retryable`` / ``terminal`` are derived from ``category`` automatically; pass them | ||
| explicitly to override. | ||
|
|
||
| On a terminal quota error, ``AgentSession`` by default speaks the ``hint`` and | ||
| closes on the first occurrence (see ``AgentSession(error_message=...)``); transient | ||
| variants go through the normal retry/tolerance path. Subscribe to ``error`` only | ||
| when you need the structured fields, e.g. to forward an "out of credits" state to | ||
| your frontend. ``ErrorEvent.error`` is the ``LLMError``/``STTError``/… wrapper, so | ||
| the underlying exception is at ``ev.error.error``: | ||
|
|
||
| Example: | ||
| ```python | ||
| from livekit.agents import APIQuotaExceededError, ErrorEvent | ||
|
|
||
|
|
||
| @session.on("error") | ||
| def _on_error(ev: ErrorEvent) -> None: | ||
| err = ev.error.error | ||
| if isinstance(err, APIQuotaExceededError): | ||
| logger.warning("inference quota exceeded: %s (%s)", err.hint, err.quota_type) | ||
| ``` | ||
| """ | ||
|
|
||
| quota_type: str | None | ||
| """Which resource ran out, e.g. ``"llm"``, ``"stt"``, ``"tts"`` or ``"bargein"``.""" | ||
|
|
||
| category: str | None | ||
| """Gateway category. Credit-exhaustion categories (``"MaxGatewayCredits"``, | ||
| ``"MaxBargeInRequests"``) are terminal; rate-limit variants such as | ||
| ``"MaxConcurrentGatewayLLMRpm"`` are transient.""" | ||
|
|
||
| hint: str | None | ||
| """Human-readable, user-appropriate explanation suitable to speak or display.""" | ||
|
|
||
| remaining_limit: str | None | ||
| """Remaining quota for ``quota_type`` as reported by the gateway; ``"0"`` when | ||
| fully exhausted. An opaque diagnostic string (not guaranteed numeric).""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| message: str, | ||
| *, | ||
| status_code: int = 429, | ||
| request_id: str | None = None, | ||
| body: object | None = None, | ||
| retryable: bool | None = None, | ||
| terminal: bool | None = None, | ||
| quota_type: str | None = None, | ||
| category: str | None = None, | ||
| hint: str | None = None, | ||
| remaining_limit: str | None = None, | ||
| ) -> None: | ||
| # the response body carries the structured fields; read category early so we | ||
| # can derive retryable/terminal from it when not given explicitly. The body is | ||
| # wire data from a user-configurable endpoint, so non-str values are dropped — | ||
| # they'd violate the `str | None` fields and break the category check below. | ||
| if isinstance(body, dict): | ||
| if quota_type is None: | ||
| quota_type = _str_or_none(body.get("quota_type")) | ||
| if category is None: | ||
| category = _str_or_none(body.get("category")) | ||
| if hint is None: | ||
| hint = _str_or_none(body.get("hint")) | ||
| if remaining_limit is None: | ||
| remaining_limit = _str_or_none(body.get("remaining_limit")) | ||
|
|
||
| # credit exhaustion is terminal and won't recover on retry; everything else | ||
| # (rate/concurrency limits, or an unknown category) is treated as transient | ||
| is_credit_exhaustion = category in _TERMINAL_QUOTA_CATEGORIES | ||
| if terminal is None: | ||
| terminal = is_credit_exhaustion | ||
| if retryable is None: | ||
| retryable = not is_credit_exhaustion | ||
|
|
||
| super().__init__( | ||
| message, | ||
| status_code=status_code, | ||
| request_id=request_id, | ||
| body=body, | ||
| retryable=retryable, | ||
| terminal=terminal, | ||
| ) | ||
|
|
||
| self.quota_type = quota_type | ||
| self.category = category | ||
| self.hint = hint | ||
| self.remaining_limit = remaining_limit | ||
|
|
||
| @classmethod | ||
| def from_response( | ||
| cls, | ||
| message: str, | ||
| *, | ||
| status_code: int = 429, | ||
| request_id: str | None = None, | ||
| body: object | None = None, | ||
| ) -> APIQuotaExceededError | None: | ||
| """Build an :class:`APIQuotaExceededError` from a response body, or return | ||
| ``None`` if the body isn't a LiveKit Inference quota-exceeded payload. | ||
|
|
||
| Lets plugins centralize quota detection: pass the decoded JSON body and | ||
| raise the result when it isn't ``None``. | ||
| """ | ||
| if not (isinstance(body, dict) and body.get("type") == INFERENCE_QUOTA_EXCEEDED_TYPE): | ||
| return None | ||
|
|
||
| return cls(message, status_code=status_code, request_id=request_id, body=body) | ||
|
|
||
|
|
||
| class APIConnectionError(APIError): | ||
| """Raised when an API request failed due to a connection error.""" | ||
|
|
||
|
|
@@ -142,6 +305,12 @@ def create_api_error_from_http( | |
| else: | ||
| display = f"{reason} ({status})" | ||
|
|
||
| quota_error = APIQuotaExceededError.from_response( | ||
| display, status_code=status, request_id=request_id, body=body | ||
| ) | ||
| if quota_error is not None: | ||
| return quota_error | ||
|
Comment on lines
+308
to
+312
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Inference STT/TTS don't pass response body to create_api_error_from_http The inference STT and TTS plugins call Was this helpful? React with 👍 or 👎 to provide feedback.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed and documented in 8411107. Traced the full path:
So it's a genuine limitation, but there's a sharper reason to leave STT/TTS as a plain (retryable) Added a code comment at both connect sites ( |
||
|
|
||
| return APIStatusError( | ||
| message=display, | ||
| status_code=status, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we need an example for gateway errors. Maybe we should put this in docs instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. For now I've kept the example but fixed a real bug in it (it used
ev.errorwhere it needsev.error.error—ErrorEvent.erroris theLLMError/STTErrorwrapper, so theisinstanceguard was always False). The minimal@session.on("error")recipe also lives in theAPIQuotaExceededErrordocstring.Happy to delete the example and move it to the docs site instead if you'd prefer that — just say the word and I'll drop
examples/voice_agents/quota_exceeded.py+ its README entry.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, we can leave it here so we have some example for other error handling as well. This is hard to document for all other vendors.