-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add pending message queue (ctx.enqueue / agent_run.enqueue)
#4980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 43 commits
3951d3d
f530e2a
11addf5
b1217e5
648ca4c
98207fb
eed6298
cb91f8c
034d2c6
ff9e760
f8ba158
97ebbd2
0d8ab31
ebfa2d0
1a546ef
c4e3250
9b4a747
04317a6
a8cdef0
f362019
0cfcaac
c3e7341
2efb668
12089ff
9a5f4ab
62f57f1
ce543f4
9503185
d226586
a18e4c6
9b1c6c2
070bdcc
6c89afe
6965079
b59f305
9fcc3ea
b60ebb1
f1d57bb
dc32827
9d97861
ce266a0
328aa88
3618943
6cea37b
d9cf1a9
4e6a3fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -391,6 +391,107 @@ print(result2.all_messages()) | |
| """ | ||
| ``` | ||
|
|
||
| ## Injecting messages mid-run | ||
|
|
||
| Tools, capability hooks, and external code driving an agent run can inject extra content | ||
| into the conversation mid-run with [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] | ||
| (when a `RunContext` is in scope, e.g. inside a tool or capability hook) or | ||
| [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] (from external code driving | ||
| [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter]). Use this when something happens during a | ||
| run that the agent should know about — a tool wants to add follow-up context, an external event | ||
| needs to *steer* the agent's plan, or background work needs to reach the agent when it completes. | ||
|
|
||
| A `priority` controls when the enqueued content is delivered: | ||
|
|
||
| - `'asap'` (default): delivered at the earliest opportunity — added to the next [`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would otherwise terminate before another request, used to redirect the run into one more request. Use when the new context should reach the model as soon as possible; this is what other frameworks often call **steering** an in-flight agent. | ||
| - `'when_idle'`: delivered only when the agent would otherwise terminate, after any `'asap'` messages. Use when the agent shouldn't be interrupted but should pick up the new work — a follow-up task — once it's done with what it's doing. | ||
|
|
||
| Each positional argument to `enqueue` is one item, and can be: | ||
|
|
||
| - a `str` or [`Sequence[UserContent]`][pydantic_ai.messages.UserContent] — wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart], the same shape [`Agent.run(user_prompt=...)`][pydantic_ai.agent.AbstractAgent.run] accepts; | ||
| - a [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart], such as a [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]; | ||
| - a complete [`ModelRequest`][pydantic_ai.messages.ModelRequest] or [`ModelResponse`][pydantic_ai.messages.ModelResponse], to control request-level fields like `instructions`/`metadata` or to inject a synthetic prior turn. | ||
|
|
||
| Adjacent part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced into one [`ModelRequest`][pydantic_ai.messages.ModelRequest]; complete messages stay separate. This lets a single call inject an interleaved exchange — for example a synthetic tool call (a [`ModelResponse`][pydantic_ai.messages.ModelResponse]) followed by its result (a [`ModelRequest`][pydantic_ai.messages.ModelRequest]). The content must end in a request, so the agent has something to respond to. | ||
|
|
||
| ### From inside a tool or hook | ||
|
|
||
| Use [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] when you have a | ||
| `RunContext` in scope: | ||
|
|
||
| ```python {title="enqueue_from_tool.py"} | ||
| from pydantic_ai import Agent, RunContext | ||
|
|
||
| agent = Agent('openai:gpt-5.2') | ||
|
|
||
|
|
||
| @agent.tool | ||
| def trigger_alert(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue('Alert: production is degraded, prioritize triage.') | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have an example with a |
||
| return 'alert raised' | ||
| ``` | ||
|
|
||
| The `'asap'` message is appended to the agent's message history and is visible to the | ||
| model on the next request, alongside any tool returns from the same step. | ||
|
|
||
| ### From external code driving `agent.iter()` | ||
|
|
||
| Use [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] when you're driving a run | ||
| from outside (e.g. forwarding events from a webhook, chat platform, or job queue): | ||
|
|
||
| ```python {title="enqueue_from_agent_run.py"} | ||
| from pydantic_ai import Agent | ||
| from pydantic_graph import End | ||
|
|
||
| agent = Agent('openai:gpt-5.2') | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Opus 4.7 |
||
|
|
||
|
|
||
| async def main(): | ||
| async with agent.iter('Summarize the latest deploy report') as agent_run: | ||
| # An external system pushes a follow-up while the agent is working. | ||
| # When the agent would otherwise finish, the message redirects it | ||
| # into a fresh model request so it can incorporate the new context. | ||
| agent_run.enqueue( | ||
| 'A new error was just reported — include it in the summary.', | ||
| priority='when_idle', | ||
| ) | ||
| node = agent_run.next_node | ||
| while not isinstance(node, End): | ||
| node = await agent_run.next(node) | ||
| ``` | ||
|
Comment on lines
+461
to
+473
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This example uses Consider either:
|
||
|
|
||
| The example drives the run with [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] + | ||
| [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] because `'when_idle'` messages are only | ||
| drained when the agent would otherwise reach an `End` — that drain happens in `after_node_run`, | ||
| which doesn't fire inside a bare `async for node in agent_run:` loop. `'asap'` messages are | ||
| drained in `before_model_request` (which fires either way) and also at the same end-of-run point | ||
| if anything arrived during the final step. Reaching the end of a bare `async for` loop with | ||
| undrained pending messages emits a warning, since those messages would otherwise be silently lost. | ||
|
|
||
| !!! info "Limitations" | ||
| - End-of-run redirects need [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] or | ||
| explicit [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] driving — they | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " Can we detect the "misconfigured" usage, and raise an error when we had pending messages? |
||
| aren't drained inside a bare `async for node in agent_run:` loop (which emits a | ||
| warning if it ends with undrained messages). Messages delivered into a | ||
| `before_model_request` work in either case. | ||
| - Inside a [Temporal](durable_execution/temporal.md) workflow, tools run in | ||
| activities and don't share state with the workflow, so `ctx.enqueue` from a | ||
| tool doesn't currently propagate back to the run. Enqueue from the workflow | ||
| context (e.g. via `AgentRun.enqueue`) instead. | ||
| - Each end-of-run redirect opens a new model request. If something keeps | ||
| enqueueing on every step (e.g. a tool that always enqueues, or a | ||
| system-prompt callback that re-enqueues on each reinjection), the run will | ||
| loop indefinitely. Set [`UsageLimits`][pydantic_ai.usage.UsageLimits] on the | ||
| run as a safety net. | ||
| - `enqueue` is designed to be called from the same event loop that drives the | ||
| agent run. Inside the run that's automatic: async tools, sync tools (which | ||
| Pydantic AI auto-wraps in a thread executor), and capability hooks all | ||
| enqueue safely because the drain only iterates between graph nodes, never | ||
| concurrently with a tool body. If you're forwarding events from a *different* | ||
| thread or loop (e.g. a webhook handler), marshal the call onto the agent's | ||
| loop first — e.g. `loop.call_soon_threadsafe(agent_run.enqueue, msg)`. The | ||
| drain isn't atomic against concurrent cross-thread appends. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A note about infinite follow-up loops would be helpful: if something keeps enqueuing follow-up messages (e.g. a tool that always enqueues a follow-up and gets called on every iteration), the agent will loop indefinitely unless |
||
| ## Processing Message History | ||
|
|
||
| Sometimes you may want to modify the message history before it's sent to the model. This could be for privacy | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| """Internal helpers for the `RunContext.enqueue` / `AgentRun.enqueue` APIs. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per the project's requirements for all contributions: "update the relevant agent skills when introducing a new feature." The |
||
|
|
||
| These types live here (rather than in `messages.py`) because they're internal runtime | ||
| state for the pending message queue, not part of the wire-serializable message history. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING, Literal, TypeAlias | ||
|
|
||
| from .messages import ModelMessage, ModelRequest, ModelRequestPart, ModelResponse, UserPromptPart | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .messages import UserContent | ||
|
|
||
|
|
||
| PendingMessagePriority: TypeAlias = Literal['asap', 'when_idle'] | ||
| """When to deliver a pending message. | ||
|
|
||
| - `'asap'`: Delivered at the earliest opportunity — either prepended to the next | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would | ||
| otherwise terminate before another request, used to redirect the run into one | ||
| more request. | ||
| - `'when_idle'`: Delivered only when the agent would otherwise terminate, after | ||
| any `'asap'` messages. Doesn't interrupt in-flight work. | ||
| """ | ||
|
|
||
|
|
||
| EnqueueContent: TypeAlias = 'str | Sequence[UserContent] | ModelRequestPart | ModelMessage' | ||
| """A single item accepted by [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not |
||
| and [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue]. | ||
|
|
||
| - `str` or `Sequence[UserContent]`: wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart] | ||
| (matching the shape of `Agent.run(user_prompt=...)`). | ||
| - [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart] (e.g. a | ||
| [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]): included verbatim. | ||
| - [`ModelMessage`][pydantic_ai.messages.ModelMessage] (a complete | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest] or | ||
| [`ModelResponse`][pydantic_ai.messages.ModelResponse]): emitted as its own message. | ||
|
|
||
| Consecutive part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced | ||
| into a single `ModelRequest`; complete `ModelMessage`s stay separate. This lets one `enqueue` | ||
| call inject an interleaved exchange (e.g. a synthetic tool-search call + result — a `ModelResponse` | ||
| followed by a `ModelRequest`). The assembled sequence must end in a `ModelRequest` so the agent has | ||
| something to respond to. | ||
| """ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice expansion to support One question on the |
||
|
|
||
|
|
||
| def _build_enqueue_messages(items: Sequence[EnqueueContent]) -> list[ModelMessage]: | ||
| """Assemble enqueue items into a list of [`ModelMessage`][pydantic_ai.messages.ModelMessage]s. | ||
|
|
||
| Part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced into a | ||
| single [`ModelRequest`][pydantic_ai.messages.ModelRequest]; complete `ModelMessage`s are emitted | ||
| as-is. Order is preserved, so a `ModelResponse` followed by part-style items produces the | ||
| response then a request built from those parts. | ||
| """ | ||
| messages: list[ModelMessage] = [] | ||
| loose_parts: list[ModelRequestPart] = [] | ||
|
|
||
| def flush() -> None: | ||
| if loose_parts: | ||
| messages.append(ModelRequest(parts=list(loose_parts))) | ||
| loose_parts.clear() | ||
|
|
||
| for item in items: | ||
| if isinstance(item, (ModelRequest, ModelResponse)): | ||
| flush() | ||
| messages.append(item) | ||
| elif isinstance(item, (str, Sequence)): | ||
| loose_parts.append(UserPromptPart(content=item)) | ||
| else: | ||
| loose_parts.append(item) | ||
| flush() | ||
| return messages | ||
|
|
||
|
|
||
| @dataclass | ||
| class PendingMessage: | ||
| """One or more [`ModelMessage`][pydantic_ai.messages.ModelMessage]s queued for injection into the agent conversation. | ||
|
|
||
| Enqueued via [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] or | ||
| [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] and automatically drained | ||
| at the appropriate time during the agent run by | ||
| [`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability]. | ||
| """ | ||
|
|
||
| messages: tuple[ModelMessage, ...] | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just use a list, please |
||
| """The message(s) to inject, in order. Always ends in a | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest].""" | ||
|
|
||
| priority: PendingMessagePriority = 'asap' | ||
| """When to deliver these messages: | ||
|
|
||
| - `'asap'`: at the earliest opportunity (next model request, or redirect if the agent | ||
| would otherwise terminate). | ||
| - `'when_idle'`: only when the agent would otherwise terminate, after `'asap'` messages. | ||
| """ | ||
|
|
||
| @classmethod | ||
| def from_content(cls, *content: EnqueueContent, priority: PendingMessagePriority = 'asap') -> PendingMessage | None: | ||
| """Build a `PendingMessage` from `enqueue` arguments, or `None` when there's nothing to send. | ||
|
|
||
| Returns `None` for an empty call (enqueueing nothing is a no-op rather than an error). | ||
|
|
||
| Raises: | ||
| ValueError: If the assembled messages don't end in a | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest] — e.g. a lone `ModelResponse` — | ||
| since the agent needs a request to respond to. | ||
| """ | ||
| messages = _build_enqueue_messages(content) | ||
| if not messages: | ||
| return None | ||
| if not isinstance(messages[-1], ModelRequest): | ||
| raise ValueError( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use our own UserError for this stuff |
||
| 'Enqueued content must end with a `ModelRequest` (or `str` / `Sequence[UserContent]` / ' | ||
| '`ModelRequestPart` items that form one), so the agent has a request to respond to.' | ||
| ) | ||
| return cls(messages=tuple(messages), priority=priority) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,13 +13,15 @@ | |
| from pydantic_ai._instrumentation import DEFAULT_INSTRUMENTATION_VERSION | ||
|
|
||
| from . import _utils, messages as _messages | ||
| from ._enqueue import EnqueueContent, PendingMessage, PendingMessagePriority | ||
| from .exceptions import UserError | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .agent import Agent | ||
| from .models import Model | ||
| from .result import RunUsage | ||
| from .settings import ModelSettings | ||
| from .tool_manager import ToolManager | ||
| from .usage import RunUsage | ||
|
|
||
| # TODO (v2): Change the default for all typevars like this from `None` to `object` | ||
| AgentDepsT = TypeVar('AgentDepsT', default=None, contravariant=True) | ||
|
|
@@ -99,6 +101,14 @@ class RunContext(Generic[RunContextAgentDepsT]): | |
| `after_model_request`). Currently `None` in tool hooks, output validators, | ||
| and during agent construction. | ||
| """ | ||
| pending_messages: list[PendingMessage] | None = field(default=None, repr=False) | ||
| """Internal: queue read and mutated by [`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability]. | ||
|
|
||
| Set to the run's live queue during an agent run; `None` in synthetic contexts that aren't | ||
| backed by a running agent (e.g. the `RunContext` built by `Agent.system_prompt_parts`), where | ||
| [`enqueue`][pydantic_ai.tools.RunContext.enqueue] would have nowhere to drain to and so raises. | ||
| Use [`enqueue`][pydantic_ai.tools.RunContext.enqueue] to add messages — don't append directly. | ||
| """ | ||
|
|
||
| tool_manager: ToolManager[RunContextAgentDepsT] | None = None | ||
| """The tool manager for the current run step. | ||
|
|
@@ -116,6 +126,49 @@ def last_attempt(self) -> bool: | |
| """Whether this is the last attempt at running this tool before an error is raised.""" | ||
| return self.retry == self.max_retries | ||
|
|
||
| def enqueue( | ||
| self, | ||
| *content: EnqueueContent, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need |
||
| priority: PendingMessagePriority = 'asap', | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rrequire a kwarg? |
||
| ) -> None: | ||
| """Enqueue content to be injected into the conversation. | ||
|
|
||
| Safe to call from anywhere a `RunContext` is available — async tools, | ||
| sync tools (auto-wrapped in a thread executor by Pydantic AI), and | ||
| capability hooks. The drain only iterates the queue between graph nodes | ||
| (in `before_model_request` and `after_node_run`), never concurrently | ||
| with the tool body, so `list.append` from a worker thread doesn't race | ||
| the drain. | ||
|
|
||
| Args: | ||
| *content: One or more [`EnqueueContent`][pydantic_ai._enqueue.EnqueueContent] items. | ||
| Each `str` or `Sequence[UserContent]` (same shape `Agent.run(user_prompt=...)` accepts) | ||
| and each [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart] (e.g. a | ||
| [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]) is coalesced with adjacent | ||
| part-style items into one [`ModelRequest`][pydantic_ai.messages.ModelRequest]; a complete | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest] or | ||
| [`ModelResponse`][pydantic_ai.messages.ModelResponse] is kept as its own message. The | ||
| assembled sequence must end in a request. Calling with no positional args is a no-op. | ||
| priority: When to deliver: | ||
| `'asap'` (default) — at the earliest opportunity (next model request, | ||
| or a redirect if the agent would otherwise end). | ||
| `'when_idle'` — only when the agent would otherwise end, after `'asap'` messages. | ||
|
|
||
| Raises: | ||
| UserError: If this `RunContext` isn't backed by a running agent's queue (e.g. the | ||
| synthetic context from `Agent.system_prompt_parts`), since there'd be nowhere | ||
| to deliver the message. | ||
| """ | ||
| if self.pending_messages is None: | ||
| raise UserError( | ||
| '`enqueue` is only available during an agent run (from tools, capability hooks, or ' | ||
| '`AgentRun.enqueue`). This `RunContext` has no pending-message queue to drain.' | ||
| ) | ||
| pending = PendingMessage.from_content(*content, priority=priority) | ||
| if pending is None: | ||
| return | ||
| self.pending_messages.append(pending) | ||
|
|
||
| __repr__ = _utils.dataclasses_no_defaults_repr | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserves mention in at least one more place, like in the tool and hooks docs, where
RunContextis available, telling people they canenqueuefrom there.