Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/voice_agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ session = AgentSession(

- [`langfuse_trace.py`](./langfuse_trace.py) - LangFuse integration for conversation tracing
- [`error_callback.py`](./error_callback.py) - Error handling callback
- [`quota_exceeded.py`](./quota_exceeded.py) - Surface inference quota/credit errors instead of going silent
- [`session_close_callback.py`](./session_close_callback.py) - Session lifecycle management

## 📖 Additional Resources
Expand Down
80 changes: 80 additions & 0 deletions examples/voice_agents/quota_exceeded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we need an example for gateway errors. Maybe we should put this in docs instead?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. For now I've kept the example but fixed a real bug in it (it used ev.error where it needs ev.error.errorErrorEvent.error is the LLMError/STTError wrapper, so the isinstance guard was always False). The minimal @session.on("error") recipe also lives in the APIQuotaExceededError docstring.

Happy to delete the example and move it to the docs site instead if you'd prefer that — just say the word and I'll drop examples/voice_agents/quota_exceeded.py + its README entry.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, we can leave it here so we have some example for other error handling as well. This is hard to document for all other vendors.


from dotenv import load_dotenv

from livekit.agents import (
Agent,
AgentServer,
AgentSession,
APIQuotaExceededError,
JobContext,
cli,
inference,
)
from livekit.agents.voice.events import CloseEvent, ErrorEvent
from livekit.plugins import silero

logger = logging.getLogger("quota-exceeded")

load_dotenv()

# This example shows how to keep a voice agent from going *silently* unresponsive
# when the LLM endpoint returns `429 inference_quota_exceeded` (e.g. the project ran
# out of LiveKit Inference credits).
#
# By default such an error makes the agent join the room, publish its track, and then
# never speak. With the changes below the user always gets a perceptible signal:
#
# 1. `error_message=...` speaks a fallback line before the session closes. When left
# at its default, a quota error speaks the gateway's own `hint`. The session also
# surfaces the error on the FIRST occurrence instead of after several dead turns.
#
# 2. The `@session.on("error")` handler shows how to read the typed
# `APIQuotaExceededError` (status_code, quota_type, hint, ...) so you can forward
# a structured "out of credits" state to your frontend.

server = AgentServer()


@server.rtc_session()
async def entrypoint(ctx: JobContext):
session = AgentSession(
stt=inference.STT("deepgram/nova-3"),
llm=inference.LLM("openai/gpt-4.1-mini"),
tts=inference.TTS("cartesia/sonic-3"),
vad=silero.VAD.load(),
# spoken just before the session closes on an unrecoverable error so the agent
# is never silent. Omit this argument entirely to keep the default behavior
# (speak the quota `hint`); pass None to disable spoken errors.
error_message="Sorry, the assistant is temporarily unavailable. Please try again later.",
)

@session.on("error")
def on_error(ev: ErrorEvent) -> None:
# quota errors are non-retryable; they will fail identically every turn
if isinstance(ev.error, APIQuotaExceededError):
logger.warning(
"inference quota exceeded",
extra={
"quota_type": ev.error.quota_type, # "llm" | "stt" | "tts" | ...
"category": ev.error.category, # e.g. "MaxGatewayCredits"
"hint": ev.error.hint,
"remaining_limit": ev.error.remaining_limit,
},
)
# forward a structured signal so the frontend can render an
# "out of credits" state instead of dead air, e.g.:
#
# await ctx.room.local_participant.set_attributes(
# {"agent_error": "quota_exceeded", "quota_type": ev.error.quota_type or ""}
# )

@session.on("close")
def on_close(ev: CloseEvent) -> None:
logger.info("session closed", extra={"reason": ev.reason})

await session.start(agent=Agent(instructions="You are a helpful assistant."), room=ctx.room)


if __name__ == "__main__":
cli.run_app(server)
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ._exceptions import (
APIConnectionError,
APIError,
APIQuotaExceededError,
APIStatusError,
APITimeoutError,
AssignmentTimeoutError,
Expand Down Expand Up @@ -194,6 +195,7 @@ def __getattr__(name: str) -> typing.Any:
"AssignmentTimeoutError",
"APIConnectionError",
"APIError",
"APIQuotaExceededError",
"APIStatusError",
"APITimeoutError",
"create_api_error_from_http",
Expand Down
108 changes: 108 additions & 0 deletions livekit-agents/livekit/agents/_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

INFERENCE_QUOTA_EXCEEDED_TYPE = "inference_quota_exceeded"
"""Value of the ``type`` field in a LiveKit Inference 429 quota response body."""


class AssignmentTimeoutError(Exception):
"""Raised when accepting a job but not receiving an assignment within the specified timeout.
Expand Down Expand Up @@ -100,6 +103,105 @@ def __repr__(self) -> str:
)


class APIQuotaExceededError(APIStatusError):
"""Raised when the inference gateway rejects a request because a usage quota
or rate limit has been exhausted.

LiveKit Inference answers an exhausted project with ``HTTP 429`` and a
structured JSON body (``type == "inference_quota_exceeded"``). This error
surfaces the fields of that body directly so callers can render or speak a
precise, user-facing message (``hint``) instead of leaving the agent silent.

Unlike a transient ``429`` (rate limit), quota exhaustion will not recover on
an immediate retry, so the error defaults to ``retryable=False``.

Example:
```python
from livekit.agents import APIQuotaExceededError, ErrorEvent


@session.on("error")
def _on_error(ev: ErrorEvent) -> None:
if isinstance(ev.error, APIQuotaExceededError):
session.say(ev.error.hint or "The assistant is temporarily unavailable.")
```
"""

quota_type: str | None
"""Which resource ran out, e.g. ``"llm"``, ``"stt"``, ``"tts"`` or ``"bargein"``."""

category: str | None
"""Gateway category, e.g. ``"MaxGatewayCredits"`` (credits exhausted) or a
rate-limit variant such as ``"MaxConcurrentGatewayLLMRpm"``."""

hint: str | None
"""Human-readable, user-appropriate explanation suitable to speak or display."""

remaining_limit: str | None
"""Remaining quota for ``quota_type``; ``"0"`` when fully exhausted."""

def __init__(
self,
message: str,
*,
status_code: int = 429,
request_id: str | None = None,
body: object | None = None,
retryable: bool | None = None,
quota_type: str | None = None,
category: str | None = None,
hint: str | None = None,
remaining_limit: str | None = None,
) -> None:
# quota exhaustion won't recover on an immediate retry
if retryable is None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we differentiate retry behaviour based on concurrency/RPM quotas vs credits difference?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in e7357c6APIQuotaExceededError now derives retryable/terminal from category (verified against agent-gateway/pkg/quota/response.go::quotaHint):

  • Terminal + non-retryable: credit exhaustion — MaxGatewayCredits, MaxBargeInRequests ("Wait for the next billing cycle…").
  • Retryable + non-terminal: rate/concurrency limits — MaxConcurrentGatewayLLMRpm/Tpm, MaxConcurrentGatewaySTT/TTS, MaxBargeInRPM, etc. (and any unknown/missing category, to avoid regressions).

So a transient rate-limit 429 is now retried with backoff by the stream and falls through max_unrecoverable_errors exactly as before this PR; only true credit exhaustion closes on the first turn. Added tests for both classes (retried vs terminal).

retryable = False

super().__init__(
message,
status_code=status_code,
request_id=request_id,
body=body,
retryable=retryable,
)

# backfill the structured fields from the response body when not given explicitly
if isinstance(body, dict):
if quota_type is None:
quota_type = body.get("quota_type")
if category is None:
category = body.get("category")
if hint is None:
hint = body.get("hint")
if remaining_limit is None:
remaining_limit = body.get("remaining_limit")

self.quota_type = quota_type
self.category = category
self.hint = hint
self.remaining_limit = remaining_limit

@classmethod
def from_response(
cls,
message: str,
*,
status_code: int = 429,
request_id: str | None = None,
body: object | None = None,
) -> APIQuotaExceededError | None:
"""Build an :class:`APIQuotaExceededError` from a response body, or return
``None`` if the body isn't a LiveKit Inference quota-exceeded payload.

Lets plugins centralize quota detection: pass the decoded JSON body and
raise the result when it isn't ``None``.
"""
if not (isinstance(body, dict) and body.get("type") == INFERENCE_QUOTA_EXCEEDED_TYPE):
return None

return cls(message, status_code=status_code, request_id=request_id, body=body)


class APIConnectionError(APIError):
"""Raised when an API request failed due to a connection error."""

Expand Down Expand Up @@ -142,6 +244,12 @@ def create_api_error_from_http(
else:
display = f"{reason} ({status})"

quota_error = APIQuotaExceededError.from_response(
display, status_code=status, request_id=request_id, body=body
)
if quota_error is not None:
return quota_error
Comment on lines +308 to +312

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Inference STT/TTS don't pass response body to create_api_error_from_http

The inference STT and TTS plugins call create_api_error_from_http(e.message, status=e.status) without a body= parameter (e.g. livekit-agents/livekit/agents/inference/tts.py:494, livekit-agents/livekit/agents/inference/stt.py:890). Since the quota detection in create_api_error_from_http (livekit-agents/livekit/agents/_exceptions.py:300-304) relies on body being a dict with type == "inference_quota_exceeded", quota errors from the STT/TTS websocket connection path will never produce a typed APIQuotaExceededError — they'll remain plain APIStatusError. The LLM path works because it catches openai.APIStatusError which provides e.body as a parsed dict. This means that if STT or TTS hits a quota exhaustion, the agent won't get the terminal/immediate-close behavior or the spoken hint. This is likely a known limitation — aiohttp.ClientResponseError doesn't expose a parsed JSON body — but it's worth noting for future work.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed and documented in 8411107. Traced the full path:

  • The gateway's limitsMiddleware rejects a quota-exhausted STT/TTS connection pre-upgrade (agent-gateway/pkg/middleware/limits.goHandleJSONError) with a JSON 429 body, setting only Content-Type (no useful headers).
  • On a failed handshake aiohttp raises WSServerHandshakeError, which on 3.14 exposes only status/message/headers — the response body is discarded. So there's no body= to pass at the connect site.

So it's a genuine limitation, but there's a sharper reason to leave STT/TTS as a plain (retryable) APIStatusError rather than guess: without the body's category the SDK can't tell terminal credit exhaustion from a transient rate limit. Forcing terminal/non-retryable on every STT/TTS 429 would reintroduce exactly the rate-limit regression fixed earlier in this PR; leaving it untyped keeps the safe (retryable → existing tolerance) behavior.

Added a code comment at both connect sites (inference/tts.py, inference/stt.py) explaining this so it's discoverable. A real fix would need the gateway to surface the category another way (e.g. a response header aiohttp keeps, or a post-upgrade close frame) — tracked as future work.


return APIStatusError(
message=display,
status_code=status,
Expand Down
18 changes: 17 additions & 1 deletion livekit-agents/livekit/agents/inference/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from typing_extensions import TypedDict

from .. import llm
from .._exceptions import APIConnectionError, APIStatusError, APITimeoutError
from .._exceptions import (
APIConnectionError,
APIQuotaExceededError,
APIStatusError,
APITimeoutError,
)
from ..llm import ToolChoice, utils as llm_utils
from ..llm.chat_context import ChatContext
from ..llm.tool_context import Tool
Expand Down Expand Up @@ -454,6 +459,17 @@ async def _run(self) -> None:
except openai.APITimeoutError:
raise APITimeoutError(retryable=retryable) from None
except openai.APIStatusError as e:
# a depleted project answers 429 with a structured `inference_quota_exceeded`
# body; surface it as a typed, non-retryable error carrying the gateway hint
quota_error = APIQuotaExceededError.from_response(
e.message,
status_code=e.status_code,
request_id=e.request_id,
body=e.body,
)
if quota_error is not None:
raise quota_error from None

raise APIStatusError(
e.message,
status_code=e.status_code,
Expand Down
Loading
Loading