-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add pending message queue (ctx.enqueue / agent_run.enqueue)
#4980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
3951d3d
f530e2a
11addf5
b1217e5
648ca4c
98207fb
eed6298
cb91f8c
034d2c6
ff9e760
f8ba158
97ebbd2
0d8ab31
ebfa2d0
1a546ef
c4e3250
9b4a747
04317a6
a8cdef0
f362019
0cfcaac
c3e7341
2efb668
12089ff
9a5f4ab
62f57f1
ce543f4
9503185
d226586
a18e4c6
9b1c6c2
070bdcc
6c89afe
6965079
b59f305
9fcc3ea
b60ebb1
f1d57bb
dc32827
9d97861
ce266a0
328aa88
3618943
6cea37b
d9cf1a9
4e6a3fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -334,6 +334,75 @@ print(result2.all_messages()) | |
| """ | ||
| ``` | ||
|
|
||
| ## Injecting messages mid-run | ||
|
|
||
| Tools, capability hooks, and external code driving an agent run can inject extra | ||
| [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart]s into the conversation | ||
| mid-run via a pending message queue. Use this when something happens during a run | ||
| that the agent should know about — a tool wants to add follow-up context, an external | ||
| event needs to redirect the agent's plan, or background work needs to reach the agent | ||
| when it completes. | ||
|
|
||
| Enqueued parts are bundled into a [`PendingMessage`][pydantic_ai.messages.PendingMessage] | ||
| and drained automatically based on a `priority`: | ||
|
|
||
| - `'steering'` (default): drained into the next [`ModelRequest`][pydantic_ai.messages.ModelRequest] before the model call. Use when the new context should influence the agent's *next* step. | ||
| - `'follow_up'`: drained only when the agent would otherwise end. The agent run continues with a new model request that includes the follow-up parts. Use when the agent shouldn't stop while there's still pending work. | ||
|
|
||
| ### From inside a tool or hook | ||
|
|
||
| Use [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] when you have a | ||
| `RunContext` in scope: | ||
|
|
||
| ```python {title="enqueue_from_tool.py"} | ||
| from pydantic_ai import Agent, RunContext, SystemPromptPart | ||
|
|
||
| agent = Agent('test') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All other examples in this file use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both new examples use (Already flagged in an earlier comment but still unaddressed.) |
||
|
|
||
|
|
||
| @agent.tool | ||
| def trigger_alert(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue(SystemPromptPart('Alert: production is degraded, prioritize triage.')) | ||
| return 'alert raised' | ||
| ``` | ||
|
|
||
| The steering message is appended to the agent's message history and is visible to the | ||
| model on the next request, alongside any tool returns from the same step. | ||
|
|
||
| ### From external code driving `agent.iter()` | ||
|
|
||
| Use [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] when you're driving a run | ||
| from outside (e.g. forwarding events from a webhook, chat platform, or job queue): | ||
|
|
||
| ```python {title="enqueue_from_agent_run.py"} | ||
| from pydantic_ai import Agent, UserPromptPart | ||
|
|
||
| agent = Agent('test') | ||
|
|
||
|
|
||
| async def main(): | ||
| async with agent.iter('Start drafting the report') as agent_run: | ||
| agent_run.enqueue( | ||
| UserPromptPart('Change of plan: focus on Q3 revenue first.'), | ||
| priority='steering', | ||
| ) | ||
| async for _ in agent_run: | ||
| ... | ||
| ``` | ||
|
Comment on lines
+461
to
+473
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This example uses Consider either:
|
||
|
|
||
| [`AgentRun.pending_messages`][pydantic_ai.run.AgentRun.pending_messages] exposes the | ||
| current queue for inspection. | ||
|
|
||
| !!! info "Limitations" | ||
| - Follow-up messages need [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] or | ||
| explicit [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] driving — they | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " Can we detect the "misconfigured" usage, and raise an error when we had pending messages? |
||
| aren't drained inside a bare `async for node in agent_run:` loop. Steering | ||
| messages work in either case. | ||
| - Inside a [Temporal](durable_execution/temporal.md) workflow, tools run in | ||
| activities and don't share state with the workflow, so `ctx.enqueue` from a | ||
| tool doesn't currently propagate back to the run. Enqueue from the workflow | ||
| context (e.g. via `AgentRun.enqueue`) instead. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A note about infinite follow-up loops would be helpful: if something keeps enqueuing follow-up messages (e.g. a tool that always enqueues a follow-up and gets called on every iteration), the agent will loop indefinitely unless
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Limitations box should warn about infinite follow-up loops. If a tool always enqueues a follow-up and gets called on every iteration, the agent will loop indefinitely. A brief mention of
|
||
| ## Processing Message History | ||
|
|
||
| Sometimes you may want to modify the message history before it's sent to the model. This could be for privacy | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| from pydantic_ai._instrumentation import DEFAULT_INSTRUMENTATION_VERSION | ||
|
|
||
| from . import _utils, messages as _messages | ||
| from .messages import PendingMessage, PendingMessagePriority | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .agent.abstract import AbstractAgent | ||
|
|
@@ -92,6 +93,14 @@ class RunContext(Generic[RunContextAgentDepsT]): | |
| `after_model_request`). Currently `None` in tool hooks, output validators, | ||
| and during agent construction. | ||
| """ | ||
| pending_messages: list[PendingMessage] = field(default_factory=list[PendingMessage]) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah now I see where this is public. It does make sense here if users want to inspect it, but do they really need to be able to? Is that required for the harness features we've worked on? I prefer starting with things private, and making them public only if we have a use case. So I'm ok dropping this |
||
| """Queue of messages waiting to be injected into the conversation. | ||
|
|
||
| Messages are drained automatically: `'steering'` messages before the next model | ||
| request, `'follow_up'` messages when the agent would otherwise end. | ||
|
|
||
| Use [`enqueue`][pydantic_ai.tools.RunContext.enqueue] to add messages. | ||
| """ | ||
|
|
||
| tool_manager: ToolManager[RunContextAgentDepsT] | None = None | ||
| """The tool manager for the current run step. | ||
|
|
@@ -109,6 +118,21 @@ def last_attempt(self) -> bool: | |
| """Whether this is the last attempt at running this tool before an error is raised.""" | ||
| return self.retry == self.max_retries | ||
|
|
||
| def enqueue( | ||
| self, | ||
| *parts: _messages.ModelRequestPart, | ||
| priority: PendingMessagePriority = 'steering', | ||
| ) -> None: | ||
| """Enqueue message parts to be injected into the conversation. | ||
|
|
||
| Args: | ||
| *parts: One or more message parts (e.g. `SystemPromptPart`, `UserPromptPart`). | ||
| priority: When to inject: | ||
| `'steering'` (default) — before the next model request. | ||
| `'follow_up'` — when the agent would otherwise end. | ||
| """ | ||
| self.pending_messages.append(PendingMessage(parts=parts, priority=priority)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider validating that at least one part is passed. Currently A simple guard would prevent confusing behavior: if not parts:
raise ValueError('enqueue() requires at least one ModelRequestPart')Same applies to |
||
|
|
||
| __repr__ = _utils.dataclasses_no_defaults_repr | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| from typing import Any | ||
|
|
||
| from ._pending_messages import PendingMessageDrainCapability | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| from .abstract import ( | ||
| AbstractCapability, | ||
| AgentNode, | ||
|
|
@@ -64,6 +65,7 @@ | |
|
|
||
| __all__ = [ | ||
| 'AbstractCapability', | ||
| 'PendingMessageDrainCapability', | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other auto-injected internal capability ( If users need to reference it for |
||
| 'AgentNode', | ||
| 'CapabilityOrdering', | ||
| 'CapabilityPosition', | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| """Auto-injected capability that drains the pending message queue at appropriate times.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from pydantic_ai.capabilities.abstract import AbstractCapability, CapabilityOrdering | ||
| from pydantic_ai.messages import ModelRequest, PendingMessage, PendingMessagePriority | ||
| from pydantic_ai.tools import RunContext | ||
|
|
||
| if TYPE_CHECKING: | ||
| from pydantic_ai import _agent_graph | ||
| from pydantic_ai.models import ModelRequestContext | ||
| from pydantic_ai.result import FinalResult | ||
| from pydantic_graph import End | ||
|
|
||
|
|
||
| def _drain_by_priority( | ||
| queue: list[PendingMessage], | ||
| priority: PendingMessagePriority, | ||
| ) -> list[PendingMessage]: | ||
| """Remove and return all messages with the given priority from the queue.""" | ||
| drained: list[PendingMessage] = [] | ||
| remaining: list[PendingMessage] = [] | ||
| for msg in queue: | ||
| if msg.priority == priority: | ||
| drained.append(msg) | ||
| else: | ||
| remaining.append(msg) | ||
| queue[:] = remaining | ||
| return drained | ||
|
Comment on lines
+22
to
+35
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: Thread safety relies on GIL atomicity of The docstring on Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
|
|
||
| class PendingMessageDrainCapability(AbstractCapability[Any]): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs a (Already flagged in earlier comments but still unaddressed — wanted to make sure it doesn't get lost.) |
||
| """Drains the pending message queue at appropriate times. | ||
|
|
||
| - Steering messages are injected before each model request. | ||
| - Follow-up messages are injected when the agent would otherwise end, | ||
| redirecting to a new ModelRequestNode to continue the conversation. | ||
|
|
||
| This capability is always auto-injected and placed outermost via | ||
| [`CapabilityOrdering`][pydantic_ai.capabilities.abstract.CapabilityOrdering] | ||
| so it wraps around other capabilities. This ensures steering messages are | ||
| drained into the model request before user capabilities see it, and follow-up | ||
| redirection runs after all other `after_node_run` hooks (which run in reverse). | ||
| """ | ||
|
|
||
| def get_ordering(self) -> CapabilityOrdering: | ||
| # Outermost so steering messages are drained into the request before other | ||
| # capabilities see it, and follow-up redirection runs after all other | ||
| # after_node_run hooks (which run in reverse order). | ||
| return CapabilityOrdering(position='outermost') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@classmethod
def get_serialization_name(cls) -> str | None:
return None # not spec-constructible (auto-injected)Without this, if
devin-ai-integration[bot] marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment repeats what the class docstring already explains at lines 74-79. Per project style, comments should only be added when the WHY is non-obvious — the class-level documentation already covers why
Comment on lines
+78
to
+79
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment repeats the explanation already in the class docstring at lines 74–79. Per the project's no-comments-unless-WHY-is-non-obvious convention, it can be dropped — the class docstring is the canonical place for this.
Comment on lines
+78
to
+79
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Two Both Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| async def before_model_request( | ||
| self, | ||
| ctx: RunContext[Any], | ||
| request_context: ModelRequestContext, | ||
| ) -> ModelRequestContext: | ||
| """Drain steering messages into the model request. | ||
|
|
||
| Appends to both `request_context.messages` (so the model sees them in this | ||
| request) and `ctx.messages` (so they persist in the agent's message history). | ||
| """ | ||
| drained = _drain_by_priority(ctx.pending_messages, 'steering') | ||
| if drained: | ||
| parts = [part for msg in drained for part in msg.parts] | ||
| steering_request = ModelRequest(parts=parts) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This means the steering Suggest setting these explicitly: steering_request = ModelRequest(parts=parts, timestamp=_utils.now_utc(), run_id=ctx.run_id)(with |
||
| request_context.messages.append(steering_request) | ||
| ctx.messages.append(steering_request) | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
|
||
| return request_context | ||
|
|
||
| async def after_node_run( | ||
| self, | ||
| ctx: RunContext[Any], | ||
| *, | ||
| node: _agent_graph.AgentNode[Any, Any], | ||
| result: _agent_graph.AgentNode[Any, Any] | End[FinalResult[Any]], | ||
| ) -> _agent_graph.AgentNode[Any, Any] | End[FinalResult[Any]]: | ||
| """Drain follow-up messages when the agent would otherwise end.""" | ||
| from pydantic_ai._agent_graph import ModelRequestNode | ||
| from pydantic_graph import End | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to import these here inline? |
||
|
|
||
| if not isinstance(result, End): | ||
| return result | ||
|
|
||
| follow_ups = _drain_by_priority(ctx.pending_messages, 'follow_up') | ||
| if not follow_ups: | ||
| return result | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| parts = [part for msg in follow_ups for part in msg.parts] | ||
| request = ModelRequest(parts=parts) | ||
| return ModelRequestNode(request=request) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Follow-up drain creates ModelRequestNode without run_step increment or usage limit check When Was this helpful? React with 👍 or 👎 to provide feedback.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 No usage limit / max iteration guard for follow-up message loops The follow-up message drain mechanism ( Was this helpful? React with 👍 or 👎 to provide feedback.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The steering path (line 77) has a detailed comment explaining why explicit Without this, someone reading both paths might think the follow-up case is a bug and "fix" it to match the steering case. Something like: # No explicit timestamp/run_id needed: ModelRequestNode.run() stamps
# self.request during the graph lifecycle (_agent_graph.py:758-760).
request = ModelRequest(parts=parts) |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2036,6 +2036,38 @@ def provider_request_id(self) -> str | None: | |
| ModelMessage = Annotated[ModelRequest | ModelResponse, pydantic.Discriminator('kind')] | ||
| """Any message sent to or returned by a model.""" | ||
|
|
||
|
|
||
| PendingMessagePriority = Literal['steering', 'follow_up'] | ||
| """Priority level for a pending message. | ||
|
|
||
| - `'steering'`: Drained into the next model request (before the model call). | ||
| - `'follow_up'`: Drained only when the agent would otherwise end, preventing | ||
| premature termination while follow-up work is pending. | ||
| """ | ||
|
|
||
|
|
||
| @dataclass | ||
| class PendingMessage: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be a public type? Do we expose |
||
| """A message queued for injection into the agent conversation. | ||
|
|
||
| Pending messages are enqueued via [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] | ||
| or [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue] and are | ||
| automatically drained at the appropriate time during the agent run. | ||
| """ | ||
|
|
||
| parts: Sequence[ModelRequestPart] | ||
| """The message parts to inject.""" | ||
|
|
||
| _: KW_ONLY | ||
|
|
||
| priority: PendingMessagePriority = 'steering' | ||
| """When to drain this message: | ||
|
|
||
| - `'steering'`: injected before the next model request. | ||
| - `'follow_up'`: injected only when the agent would otherwise finish. | ||
| """ | ||
|
|
||
|
|
||
| ModelMessagesTypeAdapter = pydantic.TypeAdapter( | ||
| list[ModelMessage], config=pydantic.ConfigDict(defer_build=True, ser_json_bytes='base64', val_json_bytes='base64') | ||
| ) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: The Was this helpful? React with 👍 or 👎 to provide feedback. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -394,6 +394,30 @@ def run_id(self) -> str: | |
| """The unique identifier for the agent run.""" | ||
| return self._graph_run.state.run_id | ||
|
|
||
| @property | ||
| def pending_messages(self) -> list[_messages.PendingMessage]: | ||
| """Queue of messages waiting to be injected into the conversation. | ||
|
|
||
| Messages are drained automatically: `'steering'` messages before the next model | ||
| request, `'follow_up'` messages when the agent would otherwise end. | ||
| """ | ||
| return self._graph_run.state.pending_messages | ||
|
|
||
| def enqueue( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adtyavrdhn Here's a reason to bring the |
||
| self, | ||
| *parts: _messages.ModelRequestPart, | ||
| priority: _messages.PendingMessagePriority = 'steering', | ||
| ) -> None: | ||
| """Enqueue message parts to be injected into the conversation. | ||
|
|
||
| Args: | ||
| *parts: One or more message parts (e.g. `SystemPromptPart`, `UserPromptPart`). | ||
| priority: When to inject: | ||
| `'steering'` (default) — before the next model request. | ||
| `'follow_up'` — when the agent would otherwise end. | ||
| """ | ||
| self._graph_run.state.pending_messages.append(_messages.PendingMessage(parts=parts, priority=priority)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The validation + append logic here is duplicated verbatim from |
||
|
|
||
| def __repr__(self) -> str: # pragma: no cover | ||
| result = self._graph_run.output | ||
| result_repr = '<run not finished>' if result is None else repr(result.output) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deserves mention in at least one more place, like in the tool and hooks docs, where
RunContextis available, telling people they canenqueuefrom there.