Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion docs/mcp/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,49 @@ agent = Agent('openai:gpt-5.2', toolsets=[server])

## Tool metadata

MCP tools can include metadata that provides additional information about the tool's characteristics, which can be useful when [filtering tools][pydantic_ai.toolsets.FilteredToolset]. The `meta`, `annotations`, and `output_schema` fields can be found on the `metadata` dict on the [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] object that's passed to filter functions.
MCP tools can include metadata that provides additional information about the tool's characteristics, which can be useful when [filtering tools][pydantic_ai.toolsets.FilteredToolset]. The `meta` and `annotations` fields can be found on the `metadata` dict on the [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] object that's passed to filter functions, and the tool's output schema (if any) is available as the `return_schema` field.

[`MCPToolset`][pydantic_ai.mcp.MCPToolset] additionally exposes a `task: bool` flag indicating whether the server declares support for [task-augmented execution](#background-tasks) on the tool.

## Background tasks

[`MCPToolset`][pydantic_ai.mcp.MCPToolset] supports MCP [task-augmented execution](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks) (SEP-1686). Servers can declare per-tool task support via `execution.taskSupport`, and `MCPToolset` routes calls accordingly:

| `execution.taskSupport` | Behavior |
| --- | --- |
| `"required"` | Always calls with `task=True`. The server creates a task and the client awaits the final result via `tasks/result`. |
| `"optional"` | Always calls with `task=True` to opt in to durability, cancellation, and progress notifications. |
| `"forbidden"` or absent | Calls normally. |

For [FastMCP](https://gofastmcp.com/) servers, declare task support per tool with `task=TaskConfig(mode=...)`:

```python {title="background_task_server.py" dunder_name="not_main"}
from fastmcp import FastMCP
from fastmcp.server.tasks import TaskConfig

mcp = FastMCP('long_running_server')


@mcp.tool(task=TaskConfig(mode='required'))
async def deep_research(topic: str) -> str:
import asyncio
await asyncio.sleep(0)
return f'Researched {topic}'


if __name__ == '__main__':
mcp.run(transport='streamable-http')
```

The client side needs no extra configuration — `MCPToolset` sends `task=True` automatically based on the server's declaration:

```python {title="background_task_client.py"}
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPToolset

toolset = MCPToolset('http://localhost:8000/mcp')
agent = Agent('openai:gpt-5.2', toolsets=[toolset])
```

## Resources

Expand Down
33 changes: 26 additions & 7 deletions pydantic_ai_slim/pydantic_ai/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from fastmcp.client.progress import ProgressHandler
from fastmcp.client.roots import RootsHandler, RootsList
from fastmcp.client.sampling import SamplingHandler
from fastmcp.client.tasks import ToolTask
from fastmcp.client.transports import (
ClientTransport,
SSETransport,
Expand Down Expand Up @@ -2083,8 +2084,10 @@ async def list_tools(self) -> list[mcp_types.Tool]:

async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[AgentDepsT]]:
max_retries = self.max_retries if self.max_retries is not None else ctx.max_retries
return {
mcp_tool.name: ToolsetTool[AgentDepsT](
tools: dict[str, ToolsetTool[AgentDepsT]] = {}
for mcp_tool in await self.list_tools():
task_support = mcp_tool.execution.taskSupport if mcp_tool.execution else None
tools[mcp_tool.name] = ToolsetTool[AgentDepsT](
toolset=self,
tool_def=ToolDefinition(
name=mcp_tool.name,
Expand All @@ -2093,15 +2096,15 @@ async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[
metadata={
'meta': mcp_tool.meta,
'annotations': mcp_tool.annotations.model_dump() if mcp_tool.annotations else None,
'task': task_support in ('required', 'optional'),
},
return_schema=mcp_tool.outputSchema or None,
include_return_schema=self.include_return_schema,
),
max_retries=max_retries,
args_validator=TOOL_SCHEMA_VALIDATOR,
)
for mcp_tool in await self.list_tools()
}
return tools

def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[AgentDepsT]:
return ToolsetTool[AgentDepsT](
Expand All @@ -2117,21 +2120,32 @@ async def direct_call_tool(
args: dict[str, Any],
*,
metadata: dict[str, Any] | None = None,
use_task: bool = False,
) -> Any:
"""Call a tool on the server directly.

Args:
name: The name of the tool to call.
args: The arguments to pass to the tool.
metadata: Optional request-level `_meta` payload sent alongside the call.
use_task: When `True`, send the call with `task=True` per MCP
[SEP-1686](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks) so
the server wraps execution in a durable, cancelable, pollable task; the result is awaited via
`tasks/result`. Only valid for tools whose `execution.taskSupport` is `'required'` or `'optional'`.

Raises:
ModelRetry: If the tool errors and `tool_error_behavior='retry'` (the default).
fastmcp.exceptions.ToolError: If the tool errors and `tool_error_behavior='error'`.
"""
async with self:
try:
result: CallToolResult = await self.client.call_tool(name=name, arguments=args, meta=metadata)
if use_task:
tool_task: ToolTask = await self.client.call_tool(
name=name, arguments=args, task=True, meta=metadata
)
result: CallToolResult = await tool_task.result()
else:
result = await self.client.call_tool(name=name, arguments=args, meta=metadata)
except ToolError as e:
if self.tool_error_behavior == 'retry':
raise exceptions.ModelRetry(message=str(e)) from e
Expand All @@ -2158,9 +2172,14 @@ async def call_tool(
ctx: RunContext[Any],
tool: ToolsetTool[Any],
) -> Any:
# Server-side task-augmented execution per MCP SEP-1686 is governed entirely by the tool's
# `execution.taskSupport`: 'required'/'optional' → task path; 'forbidden' or absent → regular path.
use_task = bool((tool.tool_def.metadata or {}).get('task'))
if self.process_tool_call is not None:
return await self.process_tool_call(ctx, self.direct_call_tool, name, tool_args)
return await self.direct_call_tool(name, tool_args)
return await self.process_tool_call(
ctx, functools.partial(self.direct_call_tool, use_task=use_task), name, tool_args
)
return await self.direct_call_tool(name, tool_args, use_task=use_task)

async def list_resources(self) -> list[Resource]:
"""Retrieve the resources currently exposed by the server.
Expand Down
2 changes: 1 addition & 1 deletion pydantic_ai_slim/pydantic_ai/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ class ToolDefinition:
metadata: dict[str, Any] | None = None
"""Tool metadata that can be set by the toolset this tool came from. It is not sent to the model, but can be used for filtering and tool behavior customization.

For MCP tools, this contains the `meta`, `annotations`, and `output_schema` fields from the tool definition.
For MCP tools, this contains the `meta` and `annotations` fields from the tool definition, as well as a `task` flag indicating whether the server declares support for task-augmented execution.
"""

timeout: float | None = None
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ dev = [
"coverage[toml]>=7.10.7",
"dirty-equals>=0.9.0",
"duckduckgo-search>=7.0.0",
"pydocket>=0.20.2",
"exa-py>=2.0.0",
"tavily-python>=0.5.0",
"markdownify>=1.2",
Expand Down
103 changes: 101 additions & 2 deletions tests/test_mcp_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class is validated against the same surface area as the legacy `FastMCPToolset`.

from __future__ import annotations

import asyncio
import base64
import importlib
import json
Expand All @@ -27,6 +28,7 @@ class is validated against the same surface area as the legacy `FastMCPToolset`.
from pydantic_ai._run_context import RunContext
from pydantic_ai.exceptions import ModelRetry
from pydantic_ai.models.test import TestModel
from pydantic_ai.tools import ToolDefinition
from pydantic_ai.usage import RunUsage

from .conftest import try_import
Expand All @@ -39,6 +41,7 @@ class is validated against the same surface area as the legacy `FastMCPToolset`.
)
from fastmcp.exceptions import ToolError
from fastmcp.server import FastMCP
from fastmcp.server.tasks import TaskConfig
from mcp import types as mcp_types
from mcp.types import (
BlobResourceContents,
Expand Down Expand Up @@ -570,8 +573,6 @@ async def test_label_falls_back_to_repr(self):
assert 'MCPToolset' in toolset.label

async def test_tool_for_tool_def_uses_default_retries_when_unset(self):
from pydantic_ai.tools import ToolDefinition

toolset = MCPToolset('https://example.com/mcp')
tool = toolset.tool_for_tool_def(
ToolDefinition(name='foo', description='', parameters_json_schema={'type': 'object'})
Expand Down Expand Up @@ -732,6 +733,104 @@ async def test_load_mcp_toolsets_http_entry(self):
assert wrapped.client.transport.headers == {'X-Key': 'foo'}


class TestMCPToolsetBackgroundTasks:
"""SEP-1686 task-augmented execution. `MCPToolset` reads each tool's server-declared
`execution.taskSupport` and routes the call accordingly:
`'required'` and `'optional'` go through `client.call_tool(task=True)` → `tool_task.result()`,
while `'forbidden'`/absent stay on the regular sync path."""

@pytest.fixture
async def task_server(self) -> FastMCP[None]:
server: FastMCP[None] = FastMCP('task_server')

@server.tool(task=TaskConfig(mode='required'))
async def task_required_tool() -> str:
"""A tool that requires task-augmented execution."""
await asyncio.sleep(0)
return 'task_required_completed'

@server.tool(task=TaskConfig(mode='optional'))
async def task_optional_tool() -> str:
"""A tool that may run either as a task or synchronously."""
await asyncio.sleep(0)
return 'task_optional_completed'

@server.tool()
async def plain_tool() -> str:
"""A tool with no task support — `execution` is `None`."""
return 'plain_completed'

return server

async def test_get_tools_exposes_task_metadata(
self, task_server: FastMCP[None], run_context: RunContext[None]
) -> None:
"""`get_tools` exposes `task: bool` so downstream capabilities can target task-augmented tools."""
toolset = MCPToolset(task_server)
async with toolset:
tools = await toolset.get_tools(run_context)

assert (tools['task_required_tool'].tool_def.metadata or {}).get('task') is True
assert (tools['task_optional_tool'].tool_def.metadata or {}).get('task') is True
assert (tools['plain_tool'].tool_def.metadata or {}).get('task') is False

async def test_required_tool_routes_through_task_path(
self, task_server: FastMCP[None], run_context: RunContext[None]
) -> None:
"""`mode='required'` succeeds — getting the real result proves `task=True` was sent (the server
would otherwise return `-32601: requires task-augmented execution`)."""
toolset = MCPToolset(task_server)
async with toolset:
tools = await toolset.get_tools(run_context)
result = await toolset.call_tool('task_required_tool', {}, run_context, tools['task_required_tool'])
assert result == 'task_required_completed'

async def test_optional_tool_routes_through_task_path(
self, task_server: FastMCP[None], run_context: RunContext[None]
) -> None:
"""`mode='optional'` also goes through the task path by default — the SEP allows either, and the
task path delivers durability/cancellation/progress benefits with no functional downside."""
toolset = MCPToolset(task_server)
async with toolset:
tools = await toolset.get_tools(run_context)
result = await toolset.call_tool('task_optional_tool', {}, run_context, tools['task_optional_tool'])
assert result == 'task_optional_completed'

async def test_plain_tool_stays_on_sync_path(
self, task_server: FastMCP[None], run_context: RunContext[None]
) -> None:
"""A tool with no `execution.taskSupport` stays on the regular sync `tools/call`. Sending
`task=True` to such a server would violate the SEP."""
toolset = MCPToolset(task_server)
async with toolset:
tools = await toolset.get_tools(run_context)
result = await toolset.call_tool('plain_tool', {}, run_context, tools['plain_tool'])
assert result == 'plain_completed'

async def test_direct_call_tool_with_use_task(self, task_server: FastMCP[None]) -> None:
"""`direct_call_tool(..., use_task=True)` is the low-level escape hatch for users calling without
a `ToolDefinition` — `mode='required'` works directly."""
toolset = MCPToolset(task_server)
async with toolset:
result = await toolset.direct_call_tool('task_required_tool', {}, use_task=True)
assert result == 'task_required_completed'

async def test_process_tool_call_receives_use_task_partial(
self, task_server: FastMCP[None], run_context: RunContext[None]
) -> None:
"""`process_tool_call` gets a `CallToolFunc` that already has `use_task` baked in via `partial`,
so a custom wrapper doesn't need to know about the task path to preserve it."""

async def passthrough(ctx: RunContext[Any], call_tool: Any, name: str, args: dict[str, Any]) -> Any:
return await call_tool(name, args)

toolset = MCPToolset(task_server, process_tool_call=passthrough)
async with toolset:
tools = await toolset.get_tools(run_context)
result = await toolset.call_tool('task_required_tool', {}, run_context, tools['task_required_tool'])
assert result == 'task_required_completed'


def test_construction_does_not_emit_warnings(recwarn: Any) -> None:
"""Building an `MCPToolset` from a URL must not emit `FastMCPDeprecationWarning` for the
`sse_read_timeout` parameter — the StreamableHttp path migrated off it (the FastMCP `Client`
Expand Down
Loading
Loading