-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add pending message queue (ctx.enqueue / agent_run.enqueue)
#4980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3951d3d
f530e2a
11addf5
b1217e5
648ca4c
98207fb
eed6298
cb91f8c
034d2c6
ff9e760
f8ba158
97ebbd2
0d8ab31
ebfa2d0
1a546ef
c4e3250
9b4a747
04317a6
a8cdef0
f362019
0cfcaac
c3e7341
2efb668
12089ff
9a5f4ab
62f57f1
ce543f4
9503185
d226586
a18e4c6
9b1c6c2
070bdcc
6c89afe
6965079
b59f305
9fcc3ea
b60ebb1
f1d57bb
dc32827
9d97861
ce266a0
328aa88
3618943
6cea37b
d9cf1a9
4e6a3fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -391,6 +391,121 @@ print(result2.all_messages()) | |
| """ | ||
| ``` | ||
|
|
||
| ## Injecting messages mid-run | ||
|
|
||
| Tools, capability hooks, and external code driving an agent run can inject extra content | ||
| into the conversation mid-run with [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] | ||
| (when a `RunContext` is in scope, e.g. inside a tool or capability hook) or | ||
| [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] (from external code driving | ||
| [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter]). Use this when something happens during a | ||
| run that the agent should know about — a tool wants to add follow-up context, an external event | ||
| needs to *steer* the agent's plan, or background work needs to reach the agent when it completes. | ||
|
|
||
| A `priority` controls when the enqueued content is delivered: | ||
|
|
||
| - `'asap'` (default): delivered at the earliest opportunity — added to the next [`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would otherwise terminate before another request, used to redirect the run into one more request. Use when the new context should reach the model as soon as possible; this is what other frameworks often call **steering** an in-flight agent. | ||
| - `'when_idle'`: delivered only when the agent would otherwise terminate, after any `'asap'` messages. Use when the agent shouldn't be interrupted but should pick up the new work — a follow-up task — once it's done with what it's doing. | ||
|
|
||
| `enqueue` is variadic — each positional argument is one item, and can be: | ||
|
|
||
| - a piece of [`UserContent`][pydantic_ai.messages.UserContent] — a `str` or multi-modal content like an [`ImageUrl`][pydantic_ai.messages.ImageUrl]. Adjacent user content is gathered into a single [`UserPromptPart`][pydantic_ai.messages.UserPromptPart], so `enqueue('caption', image)` forms one user turn. To pass an existing list, spread it: `enqueue(*items)`; | ||
| - a [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart], such as a [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]; | ||
| - a complete [`ModelRequest`][pydantic_ai.messages.ModelRequest] or [`ModelResponse`][pydantic_ai.messages.ModelResponse], to control request-level fields like `instructions`/`metadata` or to inject a synthetic prior turn. | ||
|
|
||
| Adjacent part-style items (user content and [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart]s) are coalesced into one [`ModelRequest`][pydantic_ai.messages.ModelRequest]; complete messages stay separate. This lets a single call inject an interleaved exchange — for example a synthetic tool call (a [`ModelResponse`][pydantic_ai.messages.ModelResponse]) followed by its result (a [`ModelRequest`][pydantic_ai.messages.ModelRequest]). The content must end in a request, so the agent has something to respond to. | ||
|
|
||
| ### From inside a tool or hook | ||
|
|
||
| Use [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] when you have a | ||
| `RunContext` in scope: | ||
|
|
||
| ```python {title="enqueue_from_tool.py"} | ||
| from pydantic_ai import Agent, RunContext | ||
| from pydantic_ai.messages import SystemPromptPart | ||
|
|
||
| agent = Agent('anthropic:claude-opus-4-7') | ||
|
|
||
|
|
||
| @agent.tool | ||
| def trigger_alert(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue('Alert: production is degraded, prioritize triage.') | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have an example with a |
||
| return 'alert raised' | ||
|
|
||
|
|
||
| @agent.tool | ||
| def enter_incident_mode(ctx: RunContext[None]) -> str: | ||
| # Enqueue a `SystemPromptPart` to adjust the agent's standing instructions mid-run. | ||
| ctx.enqueue(SystemPromptPart(content='You are now in incident mode: be terse and action-oriented.')) | ||
| return 'incident mode enabled' | ||
| ``` | ||
|
|
||
| The `'asap'` message is appended to the agent's message history and is visible to the | ||
| model on the next request, alongside any tool returns from the same step. A | ||
| [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart] is delivered the same way; on | ||
| providers that hoist system prompts (e.g. Anthropic, Google) a non-leading one is sent as a | ||
| `<system>`-tagged user-role message, so it keeps its mid-conversation position rather than being | ||
| lifted to the top. | ||
|
|
||
| ### From external code driving `agent.iter()` | ||
|
|
||
| Use [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] when you're driving a run | ||
| from outside (e.g. forwarding events from a webhook, chat platform, or job queue): | ||
|
|
||
| ```python {title="enqueue_from_agent_run.py"} | ||
| from pydantic_ai import Agent | ||
| from pydantic_graph import End | ||
|
|
||
| agent = Agent('anthropic:claude-opus-4-7') | ||
|
|
||
|
|
||
| async def main(): | ||
| async with agent.iter('Summarize the latest deploy report') as agent_run: | ||
| # An external system pushes a follow-up while the agent is working. | ||
| # When the agent would otherwise finish, the message redirects it | ||
| # into a fresh model request so it can incorporate the new context. | ||
| agent_run.enqueue( | ||
| 'A new error was just reported — include it in the summary.', | ||
| priority='when_idle', | ||
| ) | ||
| node = agent_run.next_node | ||
| while not isinstance(node, End): | ||
| node = await agent_run.next(node) | ||
| ``` | ||
|
Comment on lines
+461
to
+473
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This example uses Consider either:
|
||
|
|
||
| The example drives the run with [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] + | ||
| [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] because `'when_idle'` messages are only | ||
| drained when the agent would otherwise reach an `End` — that drain happens in `after_node_run`, | ||
| which doesn't fire inside a bare `async for node in agent_run:` loop. `'asap'` messages are | ||
| drained in `before_model_request` (which fires either way) and also at the same end-of-run point | ||
| if anything arrived during the final step. Reaching the end of a bare `async for` loop with | ||
| undrained pending messages raises [`UndrainedPendingMessagesError`][pydantic_ai.exceptions.UndrainedPendingMessagesError], | ||
| since those messages would otherwise be silently lost. | ||
|
|
||
| !!! info "Limitations" | ||
| - End-of-run redirects need [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] or | ||
| explicit [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] driving — they | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " Can we detect the "misconfigured" usage, and raise an error when we had pending messages? |
||
| aren't drained inside a bare `async for node in agent_run:` loop (which raises | ||
| [`UndrainedPendingMessagesError`][pydantic_ai.exceptions.UndrainedPendingMessagesError] | ||
| if it ends with undrained messages). Messages delivered into a | ||
| `before_model_request` work in either case. | ||
| - Inside a [Temporal](durable_execution/temporal.md) workflow, tools run in | ||
| activities and don't share state with the workflow, so `ctx.enqueue` from a | ||
| tool doesn't currently propagate back to the run. Enqueue from the workflow | ||
| context (e.g. via `AgentRun.enqueue`) instead. | ||
| - Each end-of-run redirect opens a new model request. If something keeps | ||
| enqueueing on every step (e.g. a tool that always enqueues, or a | ||
| system-prompt callback that re-enqueues on each reinjection), the run will | ||
| loop indefinitely. Set [`UsageLimits`][pydantic_ai.usage.UsageLimits] on the | ||
| run as a safety net. | ||
| - `enqueue` is designed to be called from the same event loop that drives the | ||
| agent run. Inside the run that's automatic: async tools, sync tools (which | ||
| Pydantic AI auto-wraps in a thread executor), and capability hooks all | ||
| enqueue safely because the drain only iterates between graph nodes, never | ||
| concurrently with a tool body. If you're forwarding events from a *different* | ||
| thread or loop (e.g. a webhook handler), marshal the call onto the agent's | ||
| loop first — e.g. `loop.call_soon_threadsafe(agent_run.enqueue, msg)`. The | ||
| drain isn't atomic against concurrent cross-thread appends. | ||
|
|
||
| ## Processing Message History | ||
|
|
||
| Sometimes you may want to modify the message history before it's sent to the model. This could be for privacy | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserves mention in at least one more place, like in the tool and hooks docs, where
RunContextis available, telling people they canenqueuefrom there.