Add pending message queue (ctx.enqueue / agent_run.enqueue)#4980
Conversation
Docs Preview
|
There was a problem hiding this comment.
📝 Info: when_idle drain only works with agent.run() or agent_run.next(), not bare async for
The when_idle drain fires in after_node_run, which is a capability hook invoked by _run_node_with_hooks (used by AgentRun.next() and Agent.run()). The bare async for node in agent_run: path uses __anext__ which calls the graph runner directly without firing capability hooks. This means when_idle messages are never drained in bare iteration mode. The asap drain still works because it fires in before_model_request which runs inside ModelRequestNode.run() regardless of the driving mode. This limitation is clearly documented in the PR at docs/message-history.md:465-469, including the recommendation to use AgentRun.next() for when_idle messages.
Was this helpful? React with 👍 or 👎 to provide feedback.
| follow_ups = _drain_by_priority(ctx.pending_messages, 'follow_up') | ||
| if not follow_ups: | ||
| return result | ||
|
|
||
| parts = [part for msg in follow_ups for part in msg.parts] | ||
| request = ModelRequest(parts=parts) | ||
| return ModelRequestNode(request=request) |
There was a problem hiding this comment.
🚩 Follow-up drain creates ModelRequestNode without run_step increment or usage limit check
When PendingMessageDrainCapability.after_node_run redirects End to a ModelRequestNode (_pending_messages.py:83-84), this creates a new model request that bypasses the normal UserPromptNode flow. The new ModelRequestNode will trigger a model call, consuming tokens and incrementing usage. However, there's no explicit usage_limits check before this redirect — the check happens inside ModelRequestNode.run(). If the agent is near its usage limit, this redirect could cause UsageLimitExceeded to be raised during the follow-up model call, which would lose the follow-up context. This is arguably correct behavior (limits should be respected), but users may find it surprising that background tool results are silently lost when usage limits are reached.
Was this helpful? React with 👍 or 👎 to provide feedback.
d12f52e to
192ce3f
Compare
Introduces two framework-level features, both powered by auto-injected capabilities: **Pending Message Queue:** - `PendingMessage` type with `'steering'` and `'follow_up'` priorities - `enqueue_message()` on `RunContext` (from tools/hooks) and `AgentRun` (external code) - `PendingMessageDrainCapability` drains steering messages before model requests and follow-up messages when the agent would otherwise end **Background Tools:** - `ToolDefinition.background` field, `Tool(background=True)`, `@agent.tool(background=True)` - `BackgroundToolCapability` spawns asyncio tasks, returns immediate ack, delivers results as follow-up messages, waits before End, cleans up on run end Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Persist steering messages to `ctx.messages` (state.message_history) in addition to the request context copy, so they appear in `result.all_messages()` and subsequent runs - Handle `asyncio.CancelledError` explicitly in background task `_run()` so cancellation during run cleanup propagates cleanly without enqueueing a spurious follow-up - Use `CapabilityOrdering(position='outermost')` instead of manual list prepending for auto-injected capabilities (matches main's `_inject_auto_capabilities` pattern) - Update snapshots for new `background` field on `ToolDefinition`
2b8bf30 to
f530e2a
Compare
…ization The Pydantic payload converter used by Temporal workflows cannot serialize `collections.deque`, causing `PydanticSerializationError` when the agent state is sent across workflow boundaries. Switching to `list` fixes this — the drain semantics are identical for our usage (iterate, replace contents). This was the cause of consecutive CI timeouts on 3.10-3.13: the Temporal hitl test failed with an RPCError, which left the Temporal test server in a bad state that caused subsequent tests to hang until the 20-minute job timeout.
- Move `BackgroundToolCapability` per-run state (tasks dict, completion event) out of instance fields into a `ContextVar`. `wrap_run` now installs and tears down the state for each run; the capability itself is stateless. This removes the `for_run` override (which always returned a fresh instance), unblocking coverage on the `run_capability is effective_capability` branches in `agent/__init__.py` - Drop vestigial `get_serialization_name` overrides (our caps aren't spec-constructible and aren't registered in `CAPABILITY_TYPES`, so the default `cls.__name__` is fine and unused) - Simplify test `model_fn` fallback branches that were dead code into explicit asserts - Exercise `AgentRun.pending_messages` property in an existing test - Add `pragma: no cover` to the `CancelledError` branch and `wrap_run` task-cancel cleanup (only reachable when a run aborts with live background tasks — hard to test reliably without adding flakiness)
| follow_ups = _drain_by_priority(ctx.pending_messages, 'follow_up') | ||
| if not follow_ups: | ||
| return result | ||
|
|
||
| parts = [part for msg in follow_ups for part in msg.parts] | ||
| request = ModelRequest(parts=parts) | ||
| return ModelRequestNode(request=request) |
There was a problem hiding this comment.
🚩 No usage limit / max iteration guard for follow-up message loops
The follow-up message drain mechanism (_pending_messages.py:86-92) converts an End to a ModelRequestNode whenever follow-up messages exist. If a tool continuously enqueues follow-up messages (e.g., a background tool that spawns more background work on completion), this creates an unbounded loop. The existing usage_limits mechanism would eventually catch this if token limits are set, but there's no direct guard against infinite follow-up cycling. The standard run_step limit in the graph may also help, but it's worth verifying this is bounded.
Was this helpful? React with 👍 or 👎 to provide feedback.
| # ===== Pending Message Queue Tests ===== | ||
|
|
||
|
|
||
| async def test_enqueue_steering_message_from_tool(): | ||
| """Steering messages enqueued from a tool are injected before the next model request.""" | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='inject_msg', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| else: | ||
| return ModelResponse( | ||
| parts=[TextPart(content='done')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool | ||
| def inject_msg(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue_message(SystemPromptPart('Injected steering message')) | ||
| return 'ok' | ||
|
|
||
| result = await agent.run('Hello') | ||
| assert result.output == 'done' | ||
|
|
||
| # Verify the steering message appears before the second model request | ||
| messages = result.all_messages() | ||
| # Find a ModelRequest that contains a SystemPromptPart with our injected content | ||
| found = False | ||
| for msg in messages: | ||
| if isinstance(msg, ModelRequest): | ||
| for part in msg.parts: | ||
| if isinstance(part, SystemPromptPart) and 'Injected steering message' in part.content: | ||
| found = True | ||
| break | ||
| assert found, 'Steering message was not found in message history' | ||
|
|
||
|
|
||
| async def test_enqueue_follow_up_message_prevents_end(): | ||
| """Follow-up messages prevent the agent from ending and are drained into a new ModelRequest.""" | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='inject_follow_up', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| elif call_count == 2: | ||
| # Agent produces final result, but follow-up is pending | ||
| return ModelResponse( | ||
| parts=[TextPart(content='premature end')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| else: | ||
| # After follow-up is drained, agent produces real final result | ||
| return ModelResponse( | ||
| parts=[TextPart(content='final answer after follow-up')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool | ||
| def inject_follow_up(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue_message(UserPromptPart('Follow-up context'), priority='follow_up') | ||
| return 'ok' | ||
|
|
||
| result = await agent.run('Hello') | ||
| assert result.output == 'final answer after follow-up' | ||
| assert call_count == 3 | ||
|
|
||
|
|
||
| async def test_enqueue_message_from_agent_run(): | ||
| """Messages can be enqueued from external code via AgentRun.enqueue_message.""" | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| return ModelResponse( | ||
| parts=[TextPart(content=f'response {call_count}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| async with agent.iter('Hello') as agent_run: | ||
| assert agent_run.pending_messages == [] | ||
| # Enqueue a follow-up message from external code before iteration | ||
| agent_run.enqueue_message(UserPromptPart('External follow-up'), priority='follow_up') | ||
| assert len(agent_run.pending_messages) == 1 | ||
| # Use next() to drive iteration so after_node_run fires | ||
| node = agent_run.next_node | ||
| while not isinstance(node, End): | ||
| node = await agent_run.next(node) | ||
|
|
||
| assert agent_run.result is not None | ||
| assert call_count == 2 # First response triggers End, follow-up prevents it, second response is final | ||
|
|
||
|
|
||
| # ===== Background Tools Tests ===== | ||
|
|
||
|
|
||
| async def test_background_tool_basic(): | ||
| """Background tools run asynchronously and deliver results as follow-up messages.""" | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| # Call the background tool | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='slow_research', args='{"query": "test"}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| elif call_count == 2: | ||
| # Agent continues after getting ack — model produces a "waiting" response | ||
| return ModelResponse( | ||
| parts=[TextPart(content='waiting for background task')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| else: | ||
| # After background result is delivered as follow-up, the result is present in the messages. | ||
| assert any( | ||
| isinstance(part, SystemPromptPart) and 'completed' in part.content | ||
| for msg in messages | ||
| if isinstance(msg, ModelRequest) | ||
| for part in msg.parts | ||
| ) | ||
| return ModelResponse( | ||
| parts=[TextPart(content='got the background result')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool(background=True) | ||
| async def slow_research(ctx: RunContext[None], query: str) -> str: | ||
| await asyncio.sleep(0.01) # Simulate slow work | ||
| return f'Research result for {query}' | ||
|
|
||
| result = await agent.run('Do some research') | ||
| assert result.output == 'got the background result' | ||
|
|
||
|
|
||
| async def test_background_tool_error_handling(): | ||
| """Background tools that fail deliver error messages as follow-ups.""" | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='failing_tool', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| elif call_count == 2: | ||
| return ModelResponse( | ||
| parts=[TextPart(content='waiting')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| else: | ||
| assert any( | ||
| isinstance(part, SystemPromptPart) and 'failed' in part.content | ||
| for msg in messages | ||
| if isinstance(msg, ModelRequest) | ||
| for part in msg.parts | ||
| ) | ||
| return ModelResponse( | ||
| parts=[TextPart(content='handled the failure')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool_plain(background=True) | ||
| async def failing_tool() -> str: | ||
| await asyncio.sleep(0.01) | ||
| raise RuntimeError('Tool execution failed') | ||
|
|
||
| result = await agent.run('Do something') | ||
| assert result.output == 'handled the failure' | ||
|
|
||
|
|
||
| async def test_background_tool_ack_message(): | ||
| """Background tools return an immediate acknowledgment to the agent.""" | ||
| call_count = 0 | ||
| ack_content: str | None = None | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count, ack_content | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='bg_tool', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| else: | ||
| # Check the tool return for the ack | ||
| for msg in messages: | ||
| if isinstance(msg, ModelRequest): | ||
| for part in msg.parts: | ||
| if isinstance(part, ToolReturnPart) and 'running in background' in str(part.content): | ||
| ack_content = str(part.content) | ||
| return ModelResponse( | ||
| parts=[TextPart(content='done')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool_plain(background=True) | ||
| async def bg_tool() -> str: | ||
| await asyncio.sleep(0.5) # Long-running, won't complete before agent finishes | ||
| return 'result' | ||
|
|
||
| result = await agent.run('Test') | ||
| assert result.output == 'done' | ||
| assert ack_content is not None | ||
| assert 'running in background' in ack_content | ||
|
|
||
|
|
||
| async def test_non_background_tool_unaffected(): | ||
| """Non-background tools are executed normally, not spawned as background tasks.""" | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| for msg in messages: | ||
| if isinstance(msg, ModelRequest): | ||
| for part in msg.parts: | ||
| if isinstance(part, ToolReturnPart) and part.content == 'sync result': | ||
| return ModelResponse( | ||
| parts=[TextPart(content='got sync result')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='normal_tool', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool_plain | ||
| def normal_tool() -> str: | ||
| return 'sync result' | ||
|
|
||
| result = await agent.run('Test') | ||
| assert result.output == 'got sync result' | ||
|
|
||
|
|
||
| async def test_pending_messages_accessible_on_run_context(): | ||
| """RunContext.pending_messages is accessible and initially empty.""" | ||
| queue_observed = False | ||
| call_count = 0 | ||
|
|
||
| def model_fn(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return ModelResponse( | ||
| parts=[ToolCallPart(tool_name='check_queue', args='{}')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
| return ModelResponse( | ||
| parts=[TextPart(content='done')], | ||
| usage=RequestUsage(input_tokens=10, output_tokens=5), | ||
| ) | ||
|
|
||
| agent = Agent(FunctionModel(model_fn)) | ||
|
|
||
| @agent.tool | ||
| def check_queue(ctx: RunContext[None]) -> str: | ||
| nonlocal queue_observed | ||
| assert len(ctx.pending_messages) == 0 | ||
| ctx.enqueue_message(SystemPromptPart('test'), priority='steering') | ||
| assert len(ctx.pending_messages) == 1 | ||
| queue_observed = True | ||
| return 'done' | ||
|
|
||
| await agent.run('Test') | ||
| assert queue_observed |
There was a problem hiding this comment.
🚩 Tests use FunctionModel only — no provider integration coverage
Per tests/AGENTS.md rule:11, tests should be parametrized across providers (at minimum OpenAI, Anthropic, Google). All new tests in test_capabilities.py use only FunctionModel. While this is understandable for unit-level validation of the capability mechanism, there are no integration tests verifying that background tool acknowledgments and follow-up SystemPromptParts work correctly with real provider APIs. This is especially relevant given ANALYSIS-0004 about SystemPromptPart placement.
Was this helpful? React with 👍 or 👎 to provide feedback.
CI's `strict-no-cover` flagged that these lines are actually covered when the full test suite runs (including durable_exec/temporal tests). The cancellation cleanup paths in `wrap_run` finally and the `CancelledError` branch in `_run()` get exercised — likely by some Temporal workflow test that aborts mid-run with live background tasks.
`test_background_tool_cancelled_on_run_abort` triggers `asyncio.TimeoutError` on the agent run while a background task is sleeping in `after_node_run`'s wait. This exercises the cancellation paths in `wrap_run.finally` (`task.cancel()`, `asyncio.gather(...)`) and `_run`'s `CancelledError` branch, all of which are now genuinely tested.
Splits this PR into the parts that should land in core (the pending message queue primitive) and the parts that move to pydantic-ai-harness (the background tools capability). Removed from core: - ToolDefinition.background field - background= on Tool, @agent.tool / @agent.tool_plain, FunctionToolset - BackgroundToolCapability (and its auto-injection) The harness capability will read tool_def.metadata['background'] instead -- already plumbed through every tool registration path, including SetToolMetadata for marking entire MCP servers as background. Producer API rename: ctx.enqueue_message / agent_run.enqueue_message -> ctx.enqueue / agent_run.enqueue. Type stays PendingMessage; the queue is of messages, but enqueueing parts directly reads better and matches the broader 'inject mid-run' use case (steering events, follow-ups, etc.). Adds docs/message-history.md section on injecting messages mid-run.
| # Outermost so steering messages are drained into the request before other | ||
| # capabilities see it, and follow-up redirection runs after all other | ||
| # after_node_run hooks (which run in reverse order). | ||
| return CapabilityOrdering(position='outermost') |
There was a problem hiding this comment.
PendingMessageDrainCapability inherits the default get_serialization_name() which returns the class name ('PendingMessageDrainCapability'). Since this is an auto-injected internal capability — like ToolSearch which returns None — it should opt out of spec-based construction:
@classmethod
def get_serialization_name(cls) -> str | None:
return None # not spec-constructible (auto-injected)Without this, if get_serialization_name is ever called on this class (e.g. during spec serialization), it would incorrectly appear as a user-configurable capability.
|
|
||
| __all__ = [ | ||
| 'AbstractCapability', | ||
| 'PendingMessageDrainCapability', |
There was a problem hiding this comment.
The other auto-injected internal capability (ToolSearch in _tool_search.py) is not imported or exported here. Since PendingMessageDrainCapability is similarly internal and auto-injected (living in _pending_messages.py with the underscore-prefixed module), it probably shouldn't be in __all__ either.
If users need to reference it for CapabilityOrdering constraints (e.g. wrapped_by=[PendingMessageDrainCapability]), they can import it directly from the private module — but that seems unlikely to be a common need.
| ```python {title="enqueue_from_tool.py"} | ||
| from pydantic_ai import Agent, RunContext, SystemPromptPart | ||
|
|
||
| agent = Agent('test') |
There was a problem hiding this comment.
All other examples in this file use Agent('openai:gpt-5.2') — these two new examples should follow the same pattern instead of using the 'test' model. Real model names help users understand how to apply the feature in their own code and are consistent with the documentation guidelines.
| if isinstance(part, SystemPromptPart) and 'Injected steering message' in part.content: | ||
| found = True | ||
| break | ||
| assert found, 'Steering message was not found in message history' |
There was a problem hiding this comment.
Per the testing guidelines, tests should "assert on final output AND snapshot result.all_messages()" to validate the complete execution trace. The manual loop searching for a SystemPromptPart is fragile and doesn't catch regressions in message ordering, extra/missing messages, or metadata.
Consider replacing the manual search with a snapshot() assertion on result.all_messages() — this also serves as documentation of the exact message sequence produced by the pending message queue. Same applies to the other new tests (test_enqueue_follow_up_message_prevents_end, test_enqueue_from_agent_run, etc.) which don't assert on messages at all.
…test to public path Auto-review pass on b60ebb1: - `_pending_messages.py`: dropped the `get_ordering()` inline comment that duplicated the class docstring at lines 74-79 verbatim. - `tests/test_messages.py`: removed `test_clean_message_history_merges_adjacent_requests_tool_returns_first`, which imported `_clean_message_history` from a private module (flagged across multiple review rounds). - `tests/test_capabilities.py`: extended `test_enqueue_asap_with_rich_message_history_tail` to capture `FunctionModel`'s `messages` arg and snapshot the wire-merged view through the public path — covering the same invariant (adjacent ModelRequests merge, ToolReturnParts sort first, non-tool parts keep arrival order so enqueued content lands at the end) without reaching into framework internals. - `tests/test_capabilities.py`: converted `test_enqueue_asap_drains_at_end_if_arrived_during_final_step`'s `any(...)` assertion to `snapshot(result.all_messages())` to match the per-test convention used everywhere else in the file.
| """ | ||
|
|
||
|
|
||
| EnqueueContent: TypeAlias = 'str | Sequence[UserContent] | ModelRequest' |
There was a problem hiding this comment.
Would it theoretically be possible to support a Sequence[ModelMessage], i.e. interleaved requests and responses? I'm thinking of the synthetic "tool-search call + tool-search result" that makes automatic tool discovery work in #5230, but requires a model response + request. Maybe we just need to require it ends in a request?
I'm also thinking that i may fix #5437 before I merge this PR, so that we can support SystemPromptPart (ModelRequestPart) and sequences again.
| def enqueue( | ||
| self, | ||
| *content: EnqueueContent, | ||
| priority: PendingMessagePriority = 'asap', |
|
|
||
| def enqueue( | ||
| self, | ||
| *content: EnqueueContent, |
There was a problem hiding this comment.
Do we need Sequence in EnqueueContent if we use splat here?
| or a redirect if the agent would otherwise end). | ||
| `'when_idle'` — only when the agent would otherwise end, after `'asap'` messages. | ||
| """ | ||
| request = build_enqueue_request(content) |
There was a problem hiding this comment.
How about PendingMessage.from_content(*content, **, priority=...)?
| """ | ||
| return self._graph_run.state.pending_messages | ||
|
|
||
| def enqueue( |
There was a problem hiding this comment.
@adtyavrdhn Here's a reason to bring the AgentEventStream class back: we're adding an AgentRun.enqueue method here that's useful to be able to call in the middle of a streaming agent run, but this one only works for iter, not run_stream_events. As a followup, consider bringing that class back (even thought we just removed it from v2-main) with the enqueue method, and possibly cancel as well at some point, right?
| add follow-up context, an external event needs to redirect the agent's plan, or | ||
| background work needs to reach the agent when it completes. | ||
|
|
||
| Each `enqueue` call is packaged into a single [`ModelRequest`][pydantic_ai.messages.ModelRequest] |
There was a problem hiding this comment.
Before mentioning an impl detail like "packaged into a single request" introduce the enqueue method, and say on what types it exists, with links to API docs. This "single MR" thing is not even that interesting to mention.
| at enqueue time and drained automatically based on a `priority`: | ||
|
|
||
| - `'asap'` (default): delivered at the earliest opportunity — prepended to the next [`ModelRequest`][pydantic_ai.messages.ModelRequest], or, if the agent would otherwise terminate before another request, used to redirect the run into one more request. Use when the new context should reach the model as soon as possible. | ||
| - `'when_idle'`: delivered only when the agent would otherwise terminate, after any `'asap'` messages. Use when the agent shouldn't be interrupted but should pick up the new work once it's done with what it's doing. |
There was a problem hiding this comment.
Ensure we have tests on how this behaves vis-a-vis output tools (check output doc) and other output modes.
| - `'when_idle'`: delivered only when the agent would otherwise terminate, after any `'asap'` messages. Use when the agent shouldn't be interrupted but should pick up the new work once it's done with what it's doing. | ||
|
|
||
| Each positional argument to `enqueue` accepts the same shape as `Agent.run(user_prompt=...)` — | ||
| a `str` or `Sequence[UserContent]` is wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart] |
There was a problem hiding this comment.
Link to UserContent
|
|
||
| !!! info "Limitations" | ||
| - End-of-run redirects need [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] or | ||
| explicit [`AgentRun.next()`][pydantic_ai.run.AgentRun.next] driving — they |
There was a problem hiding this comment.
"Agent.iter + AgentRun.next driving"
Can we detect the "misconfigured" usage, and raise an error when we had pending messages?
| [pydantic-ai#5437](https://github.com/pydantic/pydantic-ai/issues/5437) tracks | ||
| the framework-level fix (rendering mid-conversation `SystemPromptPart`s as | ||
| XML-wrapped `UserPromptPart`s on the affected providers); once it lands, | ||
| direct `SystemPromptPart` enqueue support will come back. |
There was a problem hiding this comment.
As mentioned above, this may be fixed before this is released.
… output-tool test - Drop `pending_messages` from `ReinjectSystemPrompt`'s `system_prompt_parts` call: a system-prompt callback that enqueues would otherwise re-fire on every reinjection. Remove the test that asserted the now-unwanted behavior. - Warn when a bare `async for node in agent_run` loop reaches `End` with undrained pending messages, since `when_idle` / end-of-run redirects only drain via `after_node_run` (which bare iteration skips). - Add coverage for a `when_idle` redirect when the run would end via an output tool.
Now that mid-conversation `SystemPromptPart`s render inline on all providers (#5509), `EnqueueContent` accepts `str | Sequence[UserContent] | ModelRequestPart | ModelMessage`. Adjacent part-style items coalesce into one `ModelRequest`; complete messages stay separate, so one `enqueue` call can inject an interleaved exchange (e.g. a synthetic tool-search call + result). The assembled content must end in a `ModelRequest`. - Replace `build_enqueue_request` with `PendingMessage.from_content(*content, priority=...)`; `PendingMessage` now holds a tuple of messages. - Drain flattens each entry's messages, stamping metadata across all of them; the redirect uses the final request. - Docs: lead with the `enqueue` methods + accepted types, introduce "steering" for `'asap'`, drop the (now-fixed) `SystemPromptPart` limitation.
| pending_messages: Optional live pending-message list to expose as | ||
| `RunContext.pending_messages` so `ctx.enqueue` calls from inside system | ||
| prompt callbacks reach the actual queue (e.g. when `ReinjectSystemPrompt` | ||
| re-resolves callbacks via a synthetic `RunContext`). |
There was a problem hiding this comment.
This docstring specifically names ReinjectSystemPrompt as the motivating use case, but @DouweM's review explicitly said not to wire pending_messages through in reinject_system_prompt.py. Since ReinjectSystemPrompt.before_model_request doesn't pass this parameter, any ctx.enqueue() from a reinjected system prompt callback currently silently drops the message.
The docstring should be updated to reflect the actual state — either remove the ReinjectSystemPrompt mention, or clarify that it's for the initial system_prompt_parts call in Agent.iter(), not reinjection.
| def get_ordering(self) -> CapabilityOrdering: | ||
| return CapabilityOrdering(position='outermost') |
There was a problem hiding this comment.
This comment repeats the explanation already in the class docstring at lines 74–79. Per the project's no-comments-unless-WHY-is-non-obvious convention, it can be dropped — the class docstring is the canonical place for this.
| assert agent_run.pending_messages == [] | ||
| async for _ in agent_run: | ||
| pass | ||
|
|
There was a problem hiding this comment.
The PR description's test plan checks off "pending_messages threaded through Agent.system_prompt_parts so ctx.enqueue from inside an @agent.system_prompt callback (when used with ReinjectSystemPrompt) reaches the live queue" — but I don't see this test anywhere in the file.
Now that ReinjectSystemPrompt doesn't pass pending_messages (per @DouweM's direction), the initial system_prompt_parts call in Agent.iter() is the main path where the parameter matters. A test verifying that ctx.enqueue from a @agent.system_prompt callback during the initial resolution actually reaches the live queue would cover this. (This has been flagged in prior reviews as well.)
| follow_up_idx = next( | ||
| i | ||
| for i, msg in enumerate(messages) | ||
| if isinstance(msg, ModelRequest) | ||
| and any(isinstance(p, UserPromptPart) and p.content == 'Follow-up context' for p in msg.parts) | ||
| ) | ||
| first_output_idx = next( | ||
| i | ||
| for i, msg in enumerate(messages) | ||
| if isinstance(msg, ModelResponse) | ||
| and any(isinstance(p, ToolCallPart) and p.args == '{"value": 1}' for p in msg.parts) | ||
| ) | ||
| assert first_output_idx < follow_up_idx |
There was a problem hiding this comment.
This test uses manual index searching to verify ordering rather than snapshotting result.all_messages(). The other tests in this section (e.g. test_enqueue_asap_message_from_tool, test_enqueue_when_idle_message_prevents_end, test_enqueue_from_agent_run) all use snapshots for the full trace — this one should too, for consistency and to catch structural drift. The ordering assertion can be layered on top of the snapshot.
| EnqueueContent: TypeAlias = 'str | Sequence[UserContent] | ModelRequestPart | ModelMessage' | ||
| """A single item accepted by [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] | ||
| and [`AgentRun.enqueue`][pydantic_ai.run.AgentRun.enqueue]. | ||
|
|
||
| - `str` or `Sequence[UserContent]`: wrapped in a [`UserPromptPart`][pydantic_ai.messages.UserPromptPart] | ||
| (matching the shape of `Agent.run(user_prompt=...)`). | ||
| - [`ModelRequestPart`][pydantic_ai.messages.ModelRequestPart] (e.g. a | ||
| [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart]): included verbatim. | ||
| - [`ModelMessage`][pydantic_ai.messages.ModelMessage] (a complete | ||
| [`ModelRequest`][pydantic_ai.messages.ModelRequest] or | ||
| [`ModelResponse`][pydantic_ai.messages.ModelResponse]): emitted as its own message. | ||
|
|
||
| Consecutive part-style items (`str` / `Sequence[UserContent]` / `ModelRequestPart`) are coalesced | ||
| into a single `ModelRequest`; complete `ModelMessage`s stay separate. This lets one `enqueue` | ||
| call inject an interleaved exchange (e.g. a synthetic tool-search call + result — a `ModelResponse` | ||
| followed by a `ModelRequest`). The assembled sequence must end in a `ModelRequest` so the agent has | ||
| something to respond to. | ||
| """ |
There was a problem hiding this comment.
Nice expansion to support ModelRequestPart and ModelResponse (via ModelMessage) — the interleaved-exchange pattern for synthetic tool-search pairs is a good concrete use case and the docstring explains it well.
One question on the _build_enqueue_messages implementation below (line 71): the isinstance(item, (str, Sequence)) check is quite broad — Sequence from collections.abc matches bytes, tuple, range, etc., not just list[UserContent]. This doesn't cause runtime bugs because Pydantic validation on UserPromptPart.content would catch truly invalid types downstream, and typing enforces the contract at the call site. But if this ever needs runtime validation, narrowing the check (e.g. excluding str since it's already handled, or checking for list specifically) would make the boundary tighter.
…queue without a live queue Per review: enqueueing from a system-prompt callback is a weird, non-deterministic use case, and threading the live queue into `system_prompt_parts` only enabled it in a path where the message would be silently dropped. Remove the `pending_messages` param entirely (no caller passed it after the reinject wiring was removed) and make `RunContext.enqueue` raise a clear `UserError` when the context has no queue to drain — `RunContext.pending_messages` is now `None` outside a running agent. Real runs (tools, hooks, AgentRun.enqueue) are unaffected. Also address review nits: convert the output-tool and interleaved enqueue tests to `snapshot(result.all_messages())` per the test convention.
`ctx.enqueue` raises before the system-prompt callback could return, leaving a dead line. Test the guard directly on a queue-less `RunContext` instead — same coverage, no dead line.
| def get_ordering(self) -> CapabilityOrdering: | ||
| return CapabilityOrdering(position='outermost') |
There was a problem hiding this comment.
🚩 Two outermost capabilities may have non-deterministic relative ordering
Both PendingMessageDrainCapability (line 83) and Instrumentation (added separately via CombinedCapability in agent/__init__.py:1368-1370) request position='outermost' in their CapabilityOrdering. The relative order between two outermost capabilities depends on the ordering pass's tie-breaking logic. For before_model_request, the drain wants to fire first so subsequent hooks see drained messages, while Instrumentation wants to wrap everything for tracing. If the ordering pass doesn't guarantee a stable relative order between two outermost capabilities, the drain might fire after Instrumentation's before_model_request, meaning the Instrumentation span wouldn't capture the drained messages in its initial view. Tests pass, so the current ordering appears correct, but the implicit dependency on tie-breaking order is fragile.
Was this helpful? React with 👍 or 👎 to provide feedback.
| *extras, final = messages | ||
| assert isinstance(final, ModelRequest), 'enqueued content always ends in a ModelRequest' |
There was a problem hiding this comment.
📝 Info: End-of-run redirect assertion relies on from_content invariant
The assertion at line 157 (assert isinstance(final, ModelRequest)) depends on the invariant that every PendingMessage.messages tuple ends in a ModelRequest, enforced by PendingMessage.from_content (_enqueue.py:115-119). Since PendingMessage is a plain @dataclass (not frozen), direct construction bypassing from_content could violate this invariant. Additionally, assert statements are stripped in optimized mode (python -O). In practice, PendingMessage is internal API and all public entry points (RunContext.enqueue, AgentRun.enqueue) go through from_content, so the risk is minimal. If the invariant matters for safety, a runtime check (raising ValueError) would be more robust than an assertion.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Yeah this could be a nicer error, since people could touch RunContext.pending_messages directly.
|
|
||
|
|
||
| EnqueueContent: TypeAlias = 'str | Sequence[UserContent] | ModelRequestPart | ModelMessage' | ||
| """A single item accepted by [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] |
There was a problem hiding this comment.
Why not UserContent by itself? So you can pass multiple images for example?
| if not messages: | ||
| return None | ||
| if not isinstance(messages[-1], ModelRequest): | ||
| raise ValueError( |
There was a problem hiding this comment.
We use our own UserError for this stuff
| [`PendingMessageDrainCapability`][pydantic_ai.capabilities._pending_messages.PendingMessageDrainCapability]. | ||
| """ | ||
|
|
||
| messages: tuple[ModelMessage, ...] |
There was a problem hiding this comment.
Just use a list, please
| if message.run_id is None: | ||
| message.run_id = fallback_run_id | ||
| if message.conversation_id is None: | ||
| message.conversation_id = fallback_conversation_id |
There was a problem hiding this comment.
I feel like we do this kind of copying in a few places, and i worry that when we add a new similar field, we'll forget to update one place. Any way to fix that?
| *extras, final = messages | ||
| assert isinstance(final, ModelRequest), 'enqueued content always ends in a ModelRequest' |
There was a problem hiding this comment.
Yeah this could be a nicer error, since people could touch RunContext.pending_messages directly.
| from pydantic_ai import Agent | ||
| from pydantic_graph import End | ||
|
|
||
| agent = Agent('openai:gpt-5.2') |
|
|
||
| @agent.tool | ||
| def trigger_alert(ctx: RunContext[None]) -> str: | ||
| ctx.enqueue('Alert: production is degraded, prioritize triage.') |
There was a problem hiding this comment.
Have an example with a SystemPromptPart as well
…tadata stamping - `PendingMessage.from_content` raises `UserError` (not `ValueError`) and the end-of-run drain raises a helpful `UserError` instead of asserting, since `RunContext.pending_messages` is public and can hold a malformed message. - `PendingMessage.messages` is now a `list` rather than a `tuple`. - Add `messages.fill_run_metadata` and reuse it across `_agent_graph` and the pending-message drain so framework-tracked fields live in one place. - Docs: use Opus 4.7 in the examples and show enqueuing a `SystemPromptPart`.
…lper private
- `EnqueueContent` is now `UserContent | ModelRequestPart | ModelMessage`: each
positional is one atomic item, so a single image no longer needs a list
(`enqueue(img)`) and text + media mix in one call (`enqueue('caption', img)`).
Adjacent user content gathers into one `UserPromptPart`; pass an existing list
by spreading it. Drops the `Sequence[UserContent]` shape.
- Move `fill_run_metadata` from the public `messages` module to the private
`_utils` module so it isn't part of the public API surface.
| if isinstance(node, End) and self._graph_run.state.pending_messages: | ||
| # `asap` messages drain in `before_model_request` (which fires either way), but | ||
| # `when_idle` messages and end-of-run redirects drain in `after_node_run`, which | ||
| # bare iteration skips. Reaching `End` with a non-empty queue means those were | ||
| # silently stranded — flag the misconfigured driving. | ||
| warnings.warn( | ||
| 'The agent run ended with undrained pending messages enqueued via `enqueue`. ' | ||
| 'Bare `async for node in agent_run` does not drain `when_idle` messages or ' | ||
| 'end-of-run redirects, because they fire in `after_node_run`, which bare iteration ' | ||
| 'skips. Use `agent_run.next(node)` to advance the run, or `agent.run()` which drives ' | ||
| 'via `next()` automatically.', | ||
| UserWarning, | ||
| stacklevel=2, | ||
| ) |
There was a problem hiding this comment.
@DouweM asked at docs/message-history.md:484 whether we can detect this "misconfigured" driving and raise an error rather than a warning. The current code emits a UserWarning, which is easy to miss in production (warnings can be silently filtered). Since undrained messages are effectively lost data, an error would be safer — but it would also break iteration at the End boundary, which is awkward.
A middle ground: raise a dedicated exception (e.g. UndrainedPendingMessagesError) after yielding the End node, so the user still sees the run completed but gets a clear signal that messages were lost. Or, if a warning is the intended design, the maintainer should confirm that on the original comment so the thread can be resolved.
| model on the next request, alongside any tool returns from the same step. A | ||
| [`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart] is delivered the same way; on | ||
| providers that hoist system prompts (e.g. Anthropic, Google) it's rendered inline as a regular | ||
| user-role message so it keeps its mid-conversation position rather than being lifted to the top. |
There was a problem hiding this comment.
This sentence references <system-reminder>-style inline rendering ("it's rendered inline as a regular user-role message"), but the underlying _wrap_non_leading_system_prompts implementation actually uses <system> tags, not <system-reminder>. Worth aligning the docs phrasing with the implementation, or at least not implying a specific tag name. Something like "it's wrapped and rendered as a user-role message with system-context tagging" would be more accurate without over-specifying.
| @@ -0,0 +1,150 @@ | |||
| """Internal helpers for the `RunContext.enqueue` / `AgentRun.enqueue` APIs. | |||
There was a problem hiding this comment.
Per the project's requirements for all contributions: "update the relevant agent skills when introducing a new feature." The building-pydantic-ai-agents skill (pydantic_ai_slim/pydantic_ai/.agents/skills/building-pydantic-ai-agents/SKILL.md) doesn't mention enqueue, pending_messages, or mid-run message injection. Since this is a new public API on RunContext and AgentRun, the skill should document the feature so that agents building with Pydantic AI know how to use it — even just a bullet point under the tools or capabilities section pointing at RunContext.enqueue and when to use it.
…ue; document `enqueue` in skill - Bare `async for` reaching `End` with undrained `when_idle`/redirect messages now raises `UndrainedPendingMessagesError` instead of warning, so stranded messages fail loudly rather than being silently lost. - Document mid-run `enqueue` in the `building-pydantic-ai-agents` skill. - Clarify in the docs that a non-leading enqueued `SystemPromptPart` is sent as a `<system>`-tagged user-role message.
Closes #700.
Summary
Adds a pending message queue primitive for injecting content into a run mid-flight from tools, capability hooks, or external code driving
agent.iter(). Two delivery priorities:'asap'(default) — delivered at the earliest opportunity: prepended to the nextModelRequestviabefore_model_request, or, if the agent would otherwise terminate, used to redirect the run into one more request (matches pi-mono's drain-on-end behavior).'when_idle'— delivered only when the agent would otherwise terminate, after any'asap'messages. Doesn't interrupt in-flight work.Each
enqueueaccepts:strorSequence[UserContent](same shape asAgent.run(user_prompt=...)) — wrapped in aUserPromptPartModelRequest— emitted verbatim as its own message, preservinginstructions/metadata/etc.Adjacent parts-style enqueues of the same priority are merged into one synthesized
ModelRequestat drain time, matching what the model sees on the wire.The queue is exposed in two places:
RunContext.enqueue(*content, priority='asap')][pydantic_ai.tools.RunContext.enqueue] — for tools, hooks, and capabilitiesAgentRun.enqueue(*content, priority='asap')][pydantic_ai.run.AgentRun.enqueue] — for external code drivingagent.iter()Drain logic lives in an auto-injected
PendingMessageDrainCapability; nothing extra to import or configure.Design decisions
/queue, Claude Code proposal #30492. Only pi-mono has a named two-mode distinction (steer/followUp). Those names emerged independently in Claude Code's/steerproposal, but they specifically fit user-driven "I'm guiding the agent" scenarios. For background-tool delivery (a major use case), "steering" doesn't fit semantically —'asap'/'when_idle'describe timing without implying intent. No shipping system uses those plain-English names, but no shipping system has Pydantic AI's part-type richness either, so we're setting precedent rather than matching one.'asap'drains at end-of-run too. Pi-mono confirmed:agent-loop.ts:253unconditionally polls steering messages even when the agent would terminate. If a background task completes during the final step'safter_model_request(afterbefore_model_requestalready drained), the message would otherwise be lost. End-of-run drain catches it and redirects.SystemPromptPartinEnqueueContent. Anthropic and Google hoist any mid-conversationSystemPromptPartto their top-levelsystem=/system_instruction=parameter regardless of position, which invalidates prefix cache and loses positional intent. To inject system-style content mid-run, wrap aSystemPromptPartin aModelRequestpassthrough — that makes the cross-provider quirk visible at the call site. Follow-up Render mid-conversationSystemPromptParts as XML-wrappedUserPromptParts for Anthropic/Google #5437 proposes rendering non-leadingSystemPromptParts as XML-wrappedUserPromptParts on Anthropic/Google to make this work cleanly.Use cases
The underlying primitive for several patterns the team has been talking about — steering messages (#89), follow-up delivery for long-running work (also #89), context injection (#700), channels for external events (harness #37), and background tools (covered separately, see below).
What's NOT in this PR
The earlier draft also added
@agent.tool(background=True)plus aBackgroundToolCapability. That's been removed and now lives inpydantic-ai-harness(pydantic/pydantic-ai-harness#222). Splitting it out keeps this PR scoped to the general-purpose primitive and lets the background tools experiment iterate at the harness's faster cadence.Known limitation: Temporal / DBOS
Inside a Temporal (or DBOS) workflow, tools run in activities that receive a serialized snapshot of the run state. Mutations there don't round-trip back to the workflow, so
ctx.enqueuefrom inside a tool running in a Temporal activity is dropped on the floor. Even addingpending_messagesto the serialize list wouldn't fix it; the activity would need to return enqueued parts as part of its result and the workflow-side wrapper would need to merge them back in.AgentRun.enqueuefrom the workflow context still works fine. Documented indocs/message-history.md; worth a follow-up issue once this lands.Test plan
'asap'enqueue; external'when_idle'enqueue viaAgentRun; richmessage_history-tail +'asap'enqueue; late-'asap'(background-tools pattern) drained at end-of-run via capabilityafter_model_request;'asap'+'when_idle'ordering at end-of-runpending_messagesthreaded throughAgent.system_prompt_partssoctx.enqueuefrom inside an@agent.system_promptcallback (when used withReinjectSystemPrompt) reaches the live queue instead of being silently droppedtests/test_messages.pyverifies_clean_message_historymerges adjacentModelRequests withToolReturnPart/RetryPromptPartfirst; integration test intest_capabilities.pysnapshots the un-mergedall_messages()viewenqueue()with no items rejected;ModelRequestmixed with strings/parts rejected;PendingMessagedirectly constructed with empty payload rejecteddocs/message-history.mdcovering both producer paths, both priorities, the Temporal limitation, and whySystemPromptPartisn't accepted directly_pending_messages.pyand the newmessages.pyadditions🤖 Generated with Claude Code