Skip to content
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3951d3d
Add pending message queue and background tool execution
DouweM Apr 4, 2026
f530e2a
Address review feedback and update snapshots after main rebase
DouweM Apr 24, 2026
11addf5
Merge remote-tracking branch 'origin/main' into background-tools
DouweM Apr 24, 2026
b1217e5
Use list instead of deque for pending_messages to fix Temporal serial…
DouweM Apr 24, 2026
648ca4c
Fix coverage gaps: ContextVar-based per-run state + test simplifications
DouweM Apr 24, 2026
98207fb
Remove incorrect 'pragma: no cover' on background task cleanup paths
DouweM Apr 24, 2026
eed6298
Add test exercising background task cancellation on run abort
DouweM Apr 24, 2026
cb91f8c
Refocus PR on pending message queue; drop background tools to harness
DouweM Apr 25, 2026
034d2c6
Merge remote-tracking branch 'origin/main' into background-tools
DouweM Apr 25, 2026
ff9e760
Thread pending_messages through Agent.system_prompt_parts
DouweM Apr 28, 2026
f8ba158
Address auto-review feedback on PR #4980
DouweM Apr 29, 2026
97ebbd2
Stamp steering ModelRequest with timestamp + run_id at construction
DouweM Apr 29, 2026
0d8ab31
Address auto-review round 2
DouweM Apr 29, 2026
ebfa2d0
Switch agent_run.enqueue example to follow_up
DouweM Apr 29, 2026
1a546ef
Merge remote-tracking branch 'origin/main' into background-tools
DouweM Apr 29, 2026
c4e3250
Round 4: cover system-prompt enqueue path + warn about follow-up loops
DouweM Apr 29, 2026
9b4a747
Round 5: dedup enqueue validation, consolidate UsageLimits warning
DouweM Apr 29, 2026
04317a6
Merge main into background-tools
DouweM May 11, 2026
a8cdef0
Accept strings and UserContent in `enqueue`
DouweM May 11, 2026
f362019
Allow `enqueue` to accept a full `ModelRequest`
DouweM May 11, 2026
0cfcaac
Merge parts-style enqueues at drain, keep ModelRequest passthrough di…
DouweM May 11, 2026
c3e7341
Merge main into branch
DouweM May 13, 2026
2efb668
Rename priorities to `'asap'` / `'when_idle'`, drop `SystemPromptPart…
DouweM May 13, 2026
12089ff
Add snapshot tests for `'asap'` end-of-run drain + rich message_histo…
DouweM May 14, 2026
9a5f4ab
Fix coverage gaps in enqueue tests
DouweM May 14, 2026
62f57f1
Merge remote-tracking branch 'origin/main' into background-tools
DouweM May 14, 2026
ce543f4
Suppress reportPrivateUsage on `_clean_message_history` import in wir…
DouweM May 14, 2026
9503185
Fix lint D417 + apply ruff format / autofixes after merge
DouweM May 14, 2026
d226586
Address auto-review: fix merge bugs + inline `coerce_enqueue_item`
DouweM May 14, 2026
a18e4c6
Address auto-review borderline items
DouweM May 15, 2026
9b1c6c2
Stamp `conversation_id` on drain-created `ModelRequest`s (Devin review)
DouweM May 15, 2026
070bdcc
Split 'asap' and 'when_idle' into separate ModelRequests at end-of-ru…
DouweM May 15, 2026
6c89afe
Cover the producer-supplied `conversation_id` branch in `_flatten_dra…
DouweM May 15, 2026
6965079
Document thread-safety contract on `enqueue` (Devin review)
DouweM May 15, 2026
b59f305
Merge remote-tracking branch 'origin/main' into background-tools
DouweM May 18, 2026
9fcc3ea
Fix RunUsage import to use canonical source
DouweM May 18, 2026
b60ebb1
Move enqueue helpers to `_enqueue.py`, pre-package to `ModelRequest`
DouweM May 18, 2026
f1d57bb
Apply auto-review fixes: drop redundant comment + migrate wire-merge …
DouweM May 19, 2026
dc32827
Address PR review: reinject pending_messages, bare-iteration warning,…
DouweM May 21, 2026
9d97861
Merge remote-tracking branch 'origin/main' into background-tools
DouweM May 21, 2026
ce266a0
Expand enqueue to accept request parts and interleaved message sequences
DouweM May 21, 2026
328aa88
Drop `pending_messages` param from `system_prompt_parts`; raise on en…
DouweM May 21, 2026
3618943
test: drop unreachable line in enqueue-without-queue test
DouweM May 22, 2026
6cea37b
Address review: use `UserError`/`list` for enqueue, centralize run-me…
DouweM May 22, 2026
d9cf1a9
Make `enqueue` flat-variadic over `UserContent`; keep run-metadata he…
DouweM May 22, 2026
4e6a3fd
Raise `UndrainedPendingMessagesError` on undrained bare-iteration que…
DouweM May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/capabilities.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ agent = Agent('openai:gpt-5.2', name='my_agent', capabilities=[hooks])

All hooks receive [`RunContext`][pydantic_ai.tools.RunContext], which provides access to the running agent via [`ctx.agent`][pydantic_ai.tools.RunContext.agent] — useful for logging, metrics, and other cross-cutting concerns that need to identify which agent is running.

Hooks can also push follow-up messages into the conversation via
[`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] — useful for capability
authors that need to surface an event to the model mid-run without rebuilding the
cached system prompt. See [Injecting messages mid-run](message-history.md#injecting-messages-mid-run).

See the dedicated [Hooks](hooks.md) page for the full API: decorator and constructor registration, timeouts, tool filtering, wrap hooks, per-event hooks, and more.

### Provider-adaptive tools
Expand Down
101 changes: 101 additions & 0 deletions docs/message-history.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,107 @@ print(result2.all_messages())
"""
```

## Injecting messages mid-run
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves mention in at least one more place, like in the tool and hooks docs, where RunContext is available, telling people they can enqueue from there.


Tools, capability hooks, and external code driving an agent run can inject extra content
into the conversation mid-run with [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue]
(when a `RunContext` is in scope, e.g. inside a tool or capability hook) or
[`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] (from external code driving
[`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter]). Use this when something happens during a
run that the agent should know about — a tool wants to add follow-up context, an external event
needs to *steer* the agent's plan, or background work needs to reach the agent when it completes.

A `priority` controls when the enqueued content is delivered:

- `'asap'` (default): delivered at the earliest opportunity — added to the next [`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would otherwise terminate before another request, used to redirect the run into one more request. Use when the new context should reach the model as soon as possible; this is what other frameworks often call **steering** an in-flight agent.
- `'when_idle'`: delivered only when the agent would otherwise terminate, after any `'asap'` messages. Use when the agent shouldn't be interrupted but should pick up the new work — a follow-up task — once it's done with what it's doing.

Each positional argument to `enqueue` is one item, and can be:

- a `str` or [`Sequence[UserContent]`][pydantic_ai.messages.UserContent] — wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart], the same shape [`Agent.run(user_prompt=...)`][pydantic_ai.agent.AbstractAgent.run] accepts;
- a [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart], such as a [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart];
- a complete [`ModelRequest`][pydantic_ai.messages.ModelRequest] or [`ModelResponse`][pydantic_ai.messages.ModelResponse], to control request-level fields like `instructions`/`metadata` or to inject a synthetic prior turn.

Adjacent part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced into one [`ModelRequest`][pydantic_ai.messages.ModelRequest]; complete messages stay separate. This lets a single call inject an interleaved exchange — for example a synthetic tool call (a [`ModelResponse`][pydantic_ai.messages.ModelResponse]) followed by its result (a [`ModelRequest`][pydantic_ai.messages.ModelRequest]). The content must end in a request, so the agent has something to respond to.

### From inside a tool or hook

Use [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] when you have a
`RunContext` in scope:

```python {title="enqueue_from_tool.py"}
from pydantic_ai import Agent, RunContext

agent = Agent('openai:gpt-5.2')


@agent.tool
def trigger_alert(ctx: RunContext[None]) -> str:
ctx.enqueue('Alert: production is degraded, prioritize triage.')
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have an example with a SystemPromptPart as well

return 'alert raised'
```

The `'asap'` message is appended to the agent's message history and is visible to the
model on the next request, alongside any tool returns from the same step.

### From external code driving `agent.iter()`

Use [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] when you're driving a run
from outside (e.g. forwarding events from a webhook, chat platform, or job queue):

```python {title="enqueue_from_agent_run.py"}
from pydantic_ai import Agent
from pydantic_graph import End

agent = Agent('openai:gpt-5.2')
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Opus 4.7



async def main():
async with agent.iter('Summarize the latest deploy report') as agent_run:
# An external system pushes a follow-up while the agent is working.
# When the agent would otherwise finish, the message redirects it
# into a fresh model request so it can incorporate the new context.
agent_run.enqueue(
'A new error was just reported — include it in the summary.',
priority='when_idle',
)
node = agent_run.next_node
while not isinstance(node, End):
node = await agent_run.next(node)
```
Comment on lines +461 to +473
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example uses async for _ in agent_run: to iterate, but the Limitations box immediately below warns that follow-up messages aren't drained inside bare async for loops. While this example uses priority='steering' (which does work), showing async for right before calling out its limitation with follow-ups could mislead users into thinking async for works for all priorities.

Consider either:

  • Using agent_run.next() in this example (showing the recommended pattern that works for both priorities), or
  • Adding a brief inline note that this works because steering messages are drained before model requests regardless of iteration style


The example drives the run with [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] +
[`AgentRun.next()`][pydantic_ai.run.AgentRun.next] because `'when_idle'` messages are only
drained when the agent would otherwise reach an `End` — that drain happens in `after_node_run`,
which doesn't fire inside a bare `async for node in agent_run:` loop. `'asap'` messages are
drained in `before_model_request` (which fires either way) and also at the same end-of-run point
if anything arrived during the final step. Reaching the end of a bare `async for` loop with
undrained pending messages emits a warning, since those messages would otherwise be silently lost.

!!! info "Limitations"
- End-of-run redirects need [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] or
explicit [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] driving — they
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Agent.iter + AgentRun.next driving"

Can we detect the "misconfigured" usage, and raise an error when we had pending messages?

aren't drained inside a bare `async for node in agent_run:` loop (which emits a
warning if it ends with undrained messages). Messages delivered into a
`before_model_request` work in either case.
- Inside a [Temporal](durable_execution/temporal.md) workflow, tools run in
activities and don't share state with the workflow, so `ctx.enqueue` from a
tool doesn't currently propagate back to the run. Enqueue from the workflow
context (e.g. via `AgentRun.enqueue`) instead.
- Each end-of-run redirect opens a new model request. If something keeps
enqueueing on every step (e.g. a tool that always enqueues, or a
system-prompt callback that re-enqueues on each reinjection), the run will
loop indefinitely. Set [`UsageLimits`][pydantic_ai.usage.UsageLimits] on the
run as a safety net.
- `enqueue` is designed to be called from the same event loop that drives the
agent run. Inside the run that's automatic: async tools, sync tools (which
Pydantic AI auto-wraps in a thread executor), and capability hooks all
enqueue safely because the drain only iterates between graph nodes, never
concurrently with a tool body. If you're forwarding events from a *different*
thread or loop (e.g. a webhook handler), marshal the call onto the agent's
loop first — e.g. `loop.call_soon_threadsafe(agent_run.enqueue, msg)`. The
drain isn't atomic against concurrent cross-thread appends.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note about infinite follow-up loops would be helpful: if something keeps enqueuing follow-up messages (e.g. a tool that always enqueues a follow-up and gets called on every iteration), the agent will loop indefinitely unless usage_limits are configured. A brief mention of UsageLimits as the safety net would help users avoid this pitfall — something like "Set usage_limits to guard against unbounded follow-up cycles."

## Processing Message History

Sometimes you may want to modify the message history before it's sent to the model. This could be for privacy
Expand Down
8 changes: 8 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ _(This example is complete, it can be run "as is")_

This visibility helps you understand why an agent made specific decisions and identify issues in tool implementations.

## Injecting Follow-up Messages from a Tool

A tool can push extra messages into the conversation via
[`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] — useful when a tool wants
to add follow-up context, redirect the agent's plan, or surface an event the model
should react to. See [Injecting messages mid-run](message-history.md#injecting-messages-mid-run)
for the full pattern.

## See Also

For more tool features and integrations, see:
Expand Down
6 changes: 5 additions & 1 deletion pydantic_ai_slim/pydantic_ai/_agent_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pydantic_graph.basenode import End, NodeRunEndT
from pydantic_graph.graph_builder import Graph

from . import _output, _system_prompt, exceptions, messages as _messages, models, result, usage as _usage
from . import _enqueue, _output, _system_prompt, exceptions, messages as _messages, models, result, usage as _usage
from ._run_context import set_current_run_context
from .exceptions import ToolRetryError
from .output import OutputDataT, OutputSpec
Expand Down Expand Up @@ -135,6 +135,9 @@ class GraphAgentState:
"""Last-resolved `max_tokens` from model settings, used only in error messages."""
last_model_request_parameters: models.ModelRequestParameters | None = None
"""Last-resolved model request parameters, used for OTel span attributes."""
pending_messages: list[_enqueue.PendingMessage] = dataclasses.field(default_factory=list[_enqueue.PendingMessage])
"""Internal: queue used by [`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability]
for messages enqueued via [`enqueue`][pydantic_ai.tools.RunContext.enqueue] or [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue]."""

def check_incomplete_tool_call(self) -> None:
"""Raise `IncompleteToolCall` if the last model response was truncated mid-tool-call."""
Expand Down Expand Up @@ -1410,6 +1413,7 @@ def build_run_context(ctx: GraphRunContext[GraphAgentState, GraphAgentDeps[DepsT
conversation_id=ctx.state.conversation_id,
metadata=ctx.state.metadata,
tool_manager=ctx.deps.tool_manager,
pending_messages=ctx.state.pending_messages,
)
validation_context = build_validation_context(ctx.deps.validation_context, run_context)
run_context = replace(run_context, validation_context=validation_context)
Expand Down
120 changes: 120 additions & 0 deletions pydantic_ai_slim/pydantic_ai/_enqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Internal helpers for the `RunContext.enqueue` / `AgentRun.enqueue` APIs.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the project's requirements for all contributions: "update the relevant agent skills when introducing a new feature." The building-pydantic-ai-agents skill (pydantic_ai_slim/pydantic_ai/.agents/skills/building-pydantic-ai-agents/SKILL.md) doesn't mention enqueue, pending_messages, or mid-run message injection. Since this is a new public API on RunContext and AgentRun, the skill should document the feature so that agents building with Pydantic AI know how to use it — even just a bullet point under the tools or capabilities section pointing at RunContext.enqueue and when to use it.


These types live here (rather than in `messages.py`) because they're internal runtime
state for the pending message queue, not part of the wire-serializable message history.
"""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, TypeAlias

from .messages import ModelMessage, ModelRequest, ModelRequestPart, ModelResponse, UserPromptPart

if TYPE_CHECKING:
from .messages import UserContent


PendingMessagePriority: TypeAlias = Literal['asap', 'when_idle']
"""When to deliver a pending message.

- `'asap'`: Delivered at the earliest opportunity — either prepended to the next
[`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would
otherwise terminate before another request, used to redirect the run into one
more request.
- `'when_idle'`: Delivered only when the agent would otherwise terminate, after
any `'asap'` messages. Doesn't interrupt in-flight work.
"""


EnqueueContent: TypeAlias = 'str | Sequence[UserContent] | ModelRequestPart | ModelMessage'
"""A single item accepted by [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not UserContent by itself? So you can pass multiple images for example?

and [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue].

- `str` or `Sequence[UserContent]`: wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart]
(matching the shape of `Agent.run(user_prompt=...)`).
- [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart] (e.g. a
[`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]): included verbatim.
- [`ModelMessage`][pydantic_ai.messages.ModelMessage] (a complete
[`ModelRequest`][pydantic_ai.messages.ModelRequest] or
[`ModelResponse`][pydantic_ai.messages.ModelResponse]): emitted as its own message.

Consecutive part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced
into a single `ModelRequest`; complete `ModelMessage`s stay separate. This lets one `enqueue`
call inject an interleaved exchange (e.g. a synthetic tool-search call + result — a `ModelResponse`
followed by a `ModelRequest`). The assembled sequence must end in a `ModelRequest` so the agent has
something to respond to.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice expansion to support ModelRequestPart and ModelResponse (via ModelMessage) — the interleaved-exchange pattern for synthetic tool-search pairs is a good concrete use case and the docstring explains it well.

One question on the _build_enqueue_messages implementation below (line 71): the isinstance(item, (str, Sequence)) check is quite broad — Sequence from collections.abc matches bytes, tuple, range, etc., not just list[UserContent]. This doesn't cause runtime bugs because Pydantic validation on UserPromptPart.content would catch truly invalid types downstream, and typing enforces the contract at the call site. But if this ever needs runtime validation, narrowing the check (e.g. excluding str since it's already handled, or checking for list specifically) would make the boundary tighter.



def _build_enqueue_messages(items: Sequence[EnqueueContent]) -> list[ModelMessage]:
"""Assemble enqueue items into a list of [`ModelMessage`][pydantic_ai.messages.ModelMessage]s.

Part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced into a
single [`ModelRequest`][pydantic_ai.messages.ModelRequest]; complete `ModelMessage`s are emitted
as-is. Order is preserved, so a `ModelResponse` followed by part-style items produces the
response then a request built from those parts.
"""
messages: list[ModelMessage] = []
loose_parts: list[ModelRequestPart] = []

def flush() -> None:
if loose_parts:
messages.append(ModelRequest(parts=list(loose_parts)))
loose_parts.clear()

for item in items:
if isinstance(item, (ModelRequest, ModelResponse)):
flush()
messages.append(item)
elif isinstance(item, (str, Sequence)):
loose_parts.append(UserPromptPart(content=item))
else:
loose_parts.append(item)
flush()
return messages


@dataclass
class PendingMessage:
"""One or more [`ModelMessage`][pydantic_ai.messages.ModelMessage]s queued for injection into the agent conversation.

Enqueued via [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] or
[`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] and automatically drained
at the appropriate time during the agent run by
[`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability].
"""

messages: tuple[ModelMessage, ...]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use a list, please

"""The message(s) to inject, in order. Always ends in a
[`ModelRequest`][pydantic_ai.messages.ModelRequest]."""

priority: PendingMessagePriority = 'asap'
"""When to deliver these messages:

- `'asap'`: at the earliest opportunity (next model request, or redirect if the agent
would otherwise terminate).
- `'when_idle'`: only when the agent would otherwise terminate, after `'asap'` messages.
"""

@classmethod
def from_content(cls, *content: EnqueueContent, priority: PendingMessagePriority = 'asap') -> PendingMessage | None:
"""Build a `PendingMessage` from `enqueue` arguments, or `None` when there's nothing to send.

Returns `None` for an empty call (enqueueing nothing is a no-op rather than an error).

Raises:
ValueError: If the assembled messages don't end in a
[`ModelRequest`][pydantic_ai.messages.ModelRequest] — e.g. a lone `ModelResponse` —
since the agent needs a request to respond to.
"""
messages = _build_enqueue_messages(content)
if not messages:
return None
if not isinstance(messages[-1], ModelRequest):
raise ValueError(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use our own UserError for this stuff

'Enqueued content must end with a `ModelRequest` (or `str` / `Sequence[UserContent]` / '
'`ModelRequestPart` items that form one), so the agent has a request to respond to.'
)
return cls(messages=tuple(messages), priority=priority)
41 changes: 40 additions & 1 deletion pydantic_ai_slim/pydantic_ai/_run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
from pydantic_ai._instrumentation import DEFAULT_INSTRUMENTATION_VERSION

from . import _utils, messages as _messages
from ._enqueue import EnqueueContent, PendingMessage, PendingMessagePriority

if TYPE_CHECKING:
from .agent import Agent
from .models import Model
from .result import RunUsage
from .settings import ModelSettings
from .tool_manager import ToolManager
from .usage import RunUsage

# TODO (v2): Change the default for all typevars like this from `None` to `object`
AgentDepsT = TypeVar('AgentDepsT', default=None, contravariant=True)
Expand Down Expand Up @@ -99,6 +100,11 @@ class RunContext(Generic[RunContextAgentDepsT]):
`after_model_request`). Currently `None` in tool hooks, output validators,
and during agent construction.
"""
pending_messages: list[PendingMessage] = field(default_factory=list[PendingMessage], repr=False)
"""Internal: queue read and mutated by [`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability].

Use [`enqueue`][pydantic_ai.tools.RunContext.enqueue] to add messages — don't append directly.
"""

tool_manager: ToolManager[RunContextAgentDepsT] | None = None
"""The tool manager for the current run step.
Expand All @@ -116,6 +122,39 @@ def last_attempt(self) -> bool:
"""Whether this is the last attempt at running this tool before an error is raised."""
return self.retry == self.max_retries

def enqueue(
self,
*content: EnqueueContent,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Sequence in EnqueueContent if we use splat here?

priority: PendingMessagePriority = 'asap',
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rrequire a kwarg?

) -> None:
"""Enqueue content to be injected into the conversation.

Safe to call from anywhere a `RunContext` is available — async tools,
sync tools (auto-wrapped in a thread executor by Pydantic AI), and
capability hooks. The drain only iterates the queue between graph nodes
(in `before_model_request` and `after_node_run`), never concurrently
with the tool body, so `list.append` from a worker thread doesn't race
the drain.

Args:
*content: One or more [`EnqueueContent`][pydantic_ai._enqueue.EnqueueContent] items.
Each `str` or `Sequence[UserContent]` (same shape `Agent.run(user_prompt=...)` accepts)
and each [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart] (e.g. a
[`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]) is coalesced with adjacent
part-style items into one [`ModelRequest`][pydantic_ai.messages.ModelRequest]; a complete
[`ModelRequest`][pydantic_ai.messages.ModelRequest] or
[`ModelResponse`][pydantic_ai.messages.ModelResponse] is kept as its own message. The
assembled sequence must end in a request. Calling with no positional args is a no-op.
priority: When to deliver:
`'asap'` (default) — at the earliest opportunity (next model request,
or a redirect if the agent would otherwise end).
`'when_idle'` — only when the agent would otherwise end, after `'asap'` messages.
"""
pending = PendingMessage.from_content(*content, priority=priority)
if pending is None:
return
self.pending_messages.append(pending)

__repr__ = _utils.dataclasses_no_defaults_repr


Expand Down
Loading
Loading