Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pydantic_ai_harness/code_mode/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,21 @@ Tools that match the selector are wrapped inside `run_code`. Non-matching tools

When you mark tools or whole toolsets `defer_loading=True` ([Tool Search](https://ai.pydantic.dev/tools-advanced/#tool-search)), `CodeMode` keeps them out of `run_code` while they're undiscovered — they pass straight through, so Tool Search drives them as usual (sent on the wire with `defer_loading` on providers with native tool search; otherwise dropped until discovered, with a `search_tools` tool alongside `run_code`). Once the model discovers a tool it comes back with `defer_loading=False`, and from then on `CodeMode` folds it into `run_code` like any other tool, so it's callable from generated code.

That fold-in grows `run_code`'s description, which invalidates the prompt-cache prefix once at the moment of discovery (turns with no discovery stay cache-warm). To instead keep a Tool Search corpus fully native — never folded into `run_code`, fully cache-stable, but not callable from inside it — exclude it with a `tools` selector; corpus members carry `with_native` set to the managing native tool:
That fold-in grows `run_code`'s description, which invalidates the prompt-cache prefix once at the moment of discovery (turns with no discovery stay cache-warm). Two ways to avoid the bust:

- Pass `dynamic_catalog=True` to keep `run_code.description` static across discoveries — the catalog of sandboxed-tool signatures moves into agent instructions (as a dynamic [`InstructionPart`](https://ai.pydantic.dev/api/messages/#pydantic_ai.messages.InstructionPart)) and newly-discovered tools are announced via [`ctx.enqueue`](https://ai.pydantic.dev/api/tools/#pydantic_ai.tools.RunContext.enqueue) instead of by rebuilding the description:

```python
CodeMode(tools=lambda ctx, td: td.with_native is None)
CodeMode(dynamic_catalog=True)
```

A future Pydantic AI change will let `run_code`'s description stay static — newly discovered tools announced separately — so the fold-in costs nothing; until then, the selector above is the escape hatch.
This pays off when paired with Tool Search: the tool-definitions block stays byte-stable so the prefix cache survives discoveries, at the cost of a larger (but cache-friendly) system prompt. With a fixed toolset and no Tool Search, the default keeps the system prompt shorter and is the better choice.

- To instead keep a Tool Search corpus fully native — never folded into `run_code`, but not callable from inside it — exclude it with a `tools` selector; corpus members carry `with_native` set to the managing native tool:

```python
CodeMode(tools=lambda ctx, td: td.with_native is None)
```

### Metadata-based selection

Expand Down
135 changes: 132 additions & 3 deletions pydantic_ai_harness/code_mode/_capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,29 @@

from __future__ import annotations

from dataclasses import dataclass, field
from collections.abc import Sequence
from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING, Any, cast

from pydantic_ai import AbstractToolset
from pydantic_ai.capabilities import AbstractCapability, CapabilityOrdering
from pydantic_ai.capabilities._tool_search import ToolSearch as _ToolSearch
from pydantic_ai.tools import AgentDepsT, ToolSelector
from pydantic_ai.messages import ModelResponse, NativeToolSearchReturnPart, SystemPromptPart
from pydantic_ai.tools import AgentDepsT, RunContext, ToolDefinition, ToolSelector

from pydantic_ai_harness.code_mode._toolset import CodeModeToolset

if TYPE_CHECKING:
from pydantic_ai.capabilities.abstract import ValidatedToolArgs
from pydantic_ai.messages import ToolCallPart
from pydantic_ai.models import ModelRequestContext


_DISCOVERY_ANNOUNCEMENT_PREFIX = (
'New functions are now available inside `run_code`. Their signatures have been '
'added to the available-functions catalog in the system prompt'
)


@dataclass
class CodeMode(AbstractCapability[AgentDepsT]):
Expand Down Expand Up @@ -48,10 +62,125 @@ class CodeMode(AbstractCapability[AgentDepsT]):
max_retries: int = 3
"""Maximum number of retries for the `run_code` tool (syntax errors count as retries)."""

dynamic_catalog: bool = False
"""Keep the `run_code` tool definition cache-stable as the sandboxed toolset grows.

By default the signatures of all sandboxed tools are rendered into `run_code`'s
description, which lives in the prompt-cache-keyed tool-definitions block. When the
toolset changes mid-run -- e.g. [`ToolSearch`][pydantic_ai.capabilities.ToolSearch]
reveals a new tool that then gets folded into `run_code` -- the description changes and
busts the prefix cache from that point on.

Set `dynamic_catalog=True` to instead:

- keep only the static base prose (sandbox restrictions, return-value contract) in
`run_code.description`, so the tool-definitions block stays byte-stable across
discoveries;
- move the "available functions" catalog (TypedDict definitions + signatures) into
agent instructions as a dynamic
[`InstructionPart`][pydantic_ai.messages.InstructionPart], which providers with
static/dynamic instruction splitting (Anthropic, Bedrock) place after the cache
breakpoint;
- announce newly-discovered tools via a short
[`SystemPromptPart`][pydantic_ai.messages.SystemPromptPart] enqueued through
[`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue], so the model knows the
new functions are callable without rewriting the cached description.

This pays off when paired with [`ToolSearch`][pydantic_ai.capabilities.ToolSearch]: the
tool-definitions cache survives discoveries at the cost of a larger (but
cache-friendly) system prompt. With a fixed toolset and no `ToolSearch`, the default
keeps the system prompt shorter and is the better choice.
"""

_announced_tools: set[str] = field(default_factory=set[str], init=False, repr=False)

def get_ordering(self) -> CapabilityOrdering:
"""CodeMode wraps around ToolSearch so that search_tools stays native."""
return CapabilityOrdering(position='outermost', wraps=[_ToolSearch])

async def for_run(self, ctx: RunContext[AgentDepsT]) -> CodeMode[AgentDepsT]:
"""Return a fresh instance so concurrent runs don't share `_announced_tools`."""
if not self.dynamic_catalog:
return self
return replace(self)

def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None:
"""Wrap the agent's assembled toolset, splitting it into native + sandboxed subsets if needed."""
return CodeModeToolset(wrapped=toolset, tool_selector=self.tools, max_retries=self.max_retries)
return CodeModeToolset(
wrapped=toolset,
tool_selector=self.tools,
max_retries=self.max_retries,
dynamic_catalog=self.dynamic_catalog,
)

async def after_tool_execute(
self,
ctx: RunContext[AgentDepsT],
*,
call: ToolCallPart,
tool_def: ToolDefinition,
args: ValidatedToolArgs,
result: Any,
) -> Any:
"""Announce newly-discovered tools from a local `search_tools` return.

Only active with `dynamic_catalog=True`. The native-search path is handled by
[`after_model_request`][pydantic_ai_harness.CodeMode.after_model_request] instead
(server-side search emits a `NativeToolSearchReturnPart` rather than a regular tool
execute result).
"""
if self.dynamic_catalog and tool_def.tool_kind == 'tool-search':
self._announce_newly_discovered(ctx, _extract_discovered_names(result))
return result

async def after_model_request(
self,
ctx: RunContext[AgentDepsT],
*,
request_context: ModelRequestContext,
response: ModelResponse,
) -> ModelResponse:
"""Announce newly-discovered tools from a native (server-side) tool-search return.

Only active with `dynamic_catalog=True`.
"""
if self.dynamic_catalog:
for part in response.parts:
if isinstance(part, NativeToolSearchReturnPart):
self._announce_newly_discovered(ctx, _extract_discovered_names(part.content))
return response

def _announce_newly_discovered(self, ctx: RunContext[AgentDepsT], names: Sequence[str]) -> None:
"""Enqueue a system-prompt announcement for any names we haven't already announced."""
fresh = [n for n in names if n not in self._announced_tools]
if not fresh:
return
self._announced_tools.update(fresh)
listing = ', '.join(f'`{name}`' for name in fresh)
# Enqueue a `SystemPromptPart` so the announcement is framed as system-level context.
# Mid-conversation `SystemPromptPart`s are rendered inline (not hoisted to the top-level
# system prompt) on all providers since pydantic/pydantic-ai#5509, so this is cache-safe.
ctx.enqueue(SystemPromptPart(content=f'{_DISCOVERY_ANNOUNCEMENT_PREFIX}: {listing}.'))


def _extract_discovered_names(content: Any) -> list[str]:
"""Read newly-discovered tool names from a tool-search return content.

Accepts both the local `ToolSearchReturnContent` (TypedDict shape) and the same shape
on a `NativeToolSearchReturnPart`. Returns `[]` for any malformed/unexpected input --
the announcement is a courtesy nudge, not load-bearing logic.
"""
if not isinstance(content, dict):
return []
typed = cast(dict[str, Any], content)
raw = typed.get('discovered_tools')
if not isinstance(raw, list):
return []
raw_list = cast(list[Any], raw)
names: list[str] = []
for match in raw_list:
if isinstance(match, dict):
name = cast(dict[str, Any], match).get('name')
if isinstance(name, str):
names.append(name)
return names
78 changes: 71 additions & 7 deletions pydantic_ai_harness/code_mode/_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@
import keyword
import re
import warnings
from collections.abc import Callable, Coroutine
from collections.abc import Callable, Coroutine, Sequence
from dataclasses import dataclass, field, replace
from typing import Annotated, Any

from pydantic import Field, TypeAdapter
from pydantic_ai import AbstractToolset, RunContext, ToolDefinition, WrapperToolset
from pydantic_ai.exceptions import ApprovalRequired, CallDeferred, ModelRetry, UserError
from pydantic_ai.function_signature import FunctionSignature
from pydantic_ai.messages import ToolCallPart, ToolReturn, ToolReturnContent, ToolReturnPart, is_multi_modal_content
from pydantic_ai.messages import (
InstructionPart,
ToolCallPart,
ToolReturn,
ToolReturnContent,
ToolReturnPart,
is_multi_modal_content,
)
from pydantic_ai.tool_manager import ToolManager
from pydantic_ai.tools import AgentDepsT, ToolDenied, ToolSelector, matches_tool_selector
from pydantic_ai.toolsets.abstract import SchemaValidatorProt, ToolsetTool
Expand Down Expand Up @@ -184,10 +191,25 @@ class CodeModeToolset(WrapperToolset[AgentDepsT]):
max_retries: int = 3
"""Maximum number of retries for the `run_code` tool (syntax errors count as retries)."""

dynamic_catalog: bool = False
"""Move the sandboxed-tool catalog out of `run_code.description` and into instructions.

When `False` (default), every sandboxed tool's signature is rendered into the
`run_code` description, which lives in the prompt-cache-keyed tool-definitions block.
When `True`, the description keeps only the static base prose and the catalog is
surfaced as a dynamic [`InstructionPart`][pydantic_ai.messages.InstructionPart] via
[`get_instructions`][pydantic_ai_harness.code_mode.CodeModeToolset.get_instructions],
so Tool Search discoveries don't bust the tool-definitions cache prefix.
"""

# init=False so `replace()` in `for_run` produces a fresh instance with _repl=None,
# giving each agent run isolated REPL state. Lazy-initialized on first call_tool.
_repl: MontyRepl | None = field(default=None, init=False, repr=False)

# Catalog string stashed during `get_tools` (when `dynamic_catalog`) and read back by
# `get_instructions` in the same step. Empty when there's nothing to surface.
_last_catalog: str = field(default='', init=False, repr=False)

# Tracks deferred-tool names we've already warned about so we don't spam the
# logs every step. Reset on `for_run` because each run gets a fresh instance.
_warned_deferred: set[str] = field(default_factory=set[str], init=False, repr=False)
Expand All @@ -205,8 +227,30 @@ async def for_run_step(self, ctx: RunContext[AgentDepsT]) -> AbstractToolset[Age
new_self = replace(self, wrapped=new_wrapped)
new_self._repl = self._repl
new_self._warned_deferred = self._warned_deferred
new_self._last_catalog = self._last_catalog
return new_self

async def get_instructions(
self, ctx: RunContext[AgentDepsT]
) -> str | InstructionPart | Sequence[str | InstructionPart] | None:
"""Surface the tool catalog as a dynamic instruction when `dynamic_catalog` is set.

The catalog is stashed by `get_tools` earlier in the same step. `dynamic=True` so
providers that split static/dynamic instructions (Anthropic, Bedrock) place a cache
breakpoint *before* the catalog — discoveries change it but leave the static prefix
cache intact. When `dynamic_catalog` is off (or there are no sandboxed tools) the
stash is empty and we defer entirely to the wrapped toolset.
"""
upstream = await self.wrapped.get_instructions(ctx)
if not self._last_catalog:
return upstream
catalog_part = InstructionPart(content=self._last_catalog, dynamic=True)
if upstream is None:
return catalog_part
if isinstance(upstream, (str, InstructionPart)):
return [upstream, catalog_part]
return [*upstream, catalog_part]

async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[AgentDepsT]]:
"""Return the `run_code` tool plus any native (non-sandboxed) tools."""
wrapped_tools = await self.wrapped.get_tools(ctx)
Expand Down Expand Up @@ -236,7 +280,15 @@ async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[

callable_defs, sanitized_to_original = self._partition_callable_tools(sandboxed_tools)

description = self._build_description(callable_defs)
# `dynamic_catalog` keeps the catalog out of `run_code.description` (cache-stable
# tool-defs block) and surfaces it via `get_instructions` instead. Stash it for the
# `get_instructions` call later this step; empty string means "nothing to surface".
if self.dynamic_catalog:
description = _RUN_CODE_BASE_DESCRIPTION
self._last_catalog = self._render_catalog(callable_defs)
else:
description = self._build_description(callable_defs)
self._last_catalog = ''

if _RUN_CODE_TOOL_NAME in native_tools:
raise UserError(
Expand Down Expand Up @@ -506,8 +558,22 @@ def _partition_callable_tools(
@staticmethod
def _build_description(callable_defs: dict[str, ToolDefinition]) -> str:
"""Render the `run_code` description: base prose + TypedDicts + function signatures."""
if not callable_defs:
catalog = CodeModeToolset._render_catalog(callable_defs)
if not catalog:
return _RUN_CODE_BASE_DESCRIPTION
return _RUN_CODE_BASE_DESCRIPTION + '\n\n' + catalog

@staticmethod
def _render_catalog(callable_defs: dict[str, ToolDefinition]) -> str:
"""Render the functions-header + TypedDict + function-signature blocks, or `''` if no defs.

Excludes the `run_code` base prose; the catalog is the discovery-driven portion that's
cache-hostile when carried in `run_code.description`. Used by `_build_description`
(default static-description path) and by `get_instructions` (the `dynamic_catalog`
path, which moves it into instructions instead).
"""
if not callable_defs:
return ''

sigs, conflicting = _get_sigs_and_conflicting(callable_defs)
type_blocks = FunctionSignature.render_type_definitions(sigs, conflicting)
Expand All @@ -518,9 +584,7 @@ def _build_description(callable_defs: dict[str, ToolDefinition]) -> str:

has_sync = any(td.sequential for td in callable_defs.values())
has_async = any(not td.sequential for td in callable_defs.values())
header = _functions_header(has_sync=has_sync, has_async=has_async)

sections = [_RUN_CODE_BASE_DESCRIPTION, header]
sections = [_functions_header(has_sync=has_sync, has_async=has_async)]
if type_blocks:
sections.append('```python\n' + '\n\n'.join(type_blocks) + '\n```')
sections.append('```python\n' + '\n\n'.join(function_blocks) + '\n```')
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ lint = [
'pyright>=1.1.408',
]

[tool.uv.sources]
# Tier 2 depends on `RunContext.enqueue` and the pending message queue, which live on
# pydantic-ai's `background-tools` branch (PR #4980). Switch back to a published version
# once that lands.
pydantic-ai-slim = { git = 'https://github.com/pydantic/pydantic-ai.git', branch = 'background-tools', subdirectory = 'pydantic_ai_slim' }

[tool.hatch.version]
source = 'uv-dynamic-versioning'

Expand Down
Loading
Loading