diff --git a/pydantic_ai_harness/code_mode/README.md b/pydantic_ai_harness/code_mode/README.md index 62fdda9..963bd58 100644 --- a/pydantic_ai_harness/code_mode/README.md +++ b/pydantic_ai_harness/code_mode/README.md @@ -140,22 +140,92 @@ for msg in result.all_messages(): tool_returns = part.metadata['tool_returns'] # dict[str, ToolReturnPart] ``` +## Filesystem and OS access + +Sandboxed code runs with no access to the host's files, environment, or clock. Two parameters grant +it -- reach for them when the agent's task genuinely needs the host. + +**`mount` -- share host directories.** Reach for this when the agent works with real files: analyzing +a dataset you've dropped in a folder and writing a report back, editing a checkout, or processing a +batch of documents. Sandboxed `pathlib` code reads and writes under the mounted path. (For +environment variables or the clock, use `os_access` instead.) + +```python +from pydantic_monty import MountDir + +from pydantic_ai_harness import CodeMode + +# The agent can read /work/data.csv and write /work/summary.md back to the host: +CodeMode(mount=MountDir('/work', '/tmp/agent-workspace', mode='read-write')) +``` + +**`os_access` -- answer the sandbox's OS calls yourself.** Reach for this when the agent needs +environment variables, the current date and time, or filesystem behavior you control. Hand it a +ready-made OS implementation, or a callback that decides each call -- so you can inject just the +secrets it needs, pin "now" for reproducible runs, or route file access to your own store. + +```python +from pydantic_monty import NOT_HANDLED, OSAccess + +from pydantic_ai_harness import CodeMode + +# Give the agent a fixed set of environment values: +CodeMode(os_access=OSAccess(environ={'API_BASE': 'https://api.example.com'})) + + +# ...or intercept each call to decide what the agent may see: +allowed_env = {'API_KEY': 'sk-...'} + + +def my_os(fn, args, kwargs): + if fn == 'os.getenv': + # Answer the call: allow-listed keys resolve, every other key reads back + # as None -- absent, exactly like a real unset variable. + return allowed_env.get(args[0]) + # Refuse everything else: NOT_HANDLED makes the call fail in the sandbox. + return NOT_HANDLED + + +CodeMode(os_access=my_os) +``` + +Your callback's return value decides the call's fate, and the two outcomes are easy to confuse: + +- **Return any value** -- including `None`, `''`, or `0` -- and that becomes the result the sandbox + sees. `os.getenv` returning `None` looks exactly like a normal unset variable, so the agent's code + keeps running. This is how you *hide* something: answer with an empty value. +- **Return `NOT_HANDLED`** and the call is treated as unsupported: it raises inside the sandbox and + the model gets a retry. This *refuses* a capability outright -- use it to block, not to say "no + value". Returning `NOT_HANDLED` for a key the agent reasonably expects will burn retries. + +Both expose the real host to model-written code, so grant only what the task needs. Access is fixed +when the capability is built, so construct `CodeMode` per request to scope it. + +A `MountDir` defaults to copy-on-write `mode='overlay'`: the sandbox reads host files and sees its +own writes, but those writes do **not** reach the host. Pass `mode='read-write'` to persist them, or +`mode='read-only'` to forbid writes. + +> Monty-specific: these hooks use Monty's `AbstractOS`/`MountDir` types. + ## Sandbox restrictions Code runs inside [Monty](https://github.com/pydantic/monty), a sandboxed Python subset. Key restrictions: - No class definitions - No third-party imports (allowed stdlib: `sys`, `typing`, `asyncio`, `math`, `json`, `re`, `datetime`, `os`, `pathlib`) -- No wall-clock or timing primitives: `asyncio.sleep`, `datetime.datetime.now()`/`datetime.date.today()`, and the `time` module are unavailable +- No wall-clock or timing primitives by default (`asyncio.sleep`, `datetime.now()`, `date.today()`, `time`) -- `datetime.now()`/`date.today()` become available with an `os_access` handler (above); `asyncio.sleep`/`time` never do - No `import *` +- Filesystem I/O needs an `os_access` handler or a `mount`; `os.getenv`/`os.environ` need an `os_access` handler - Tools requiring approval or with deferred execution are excluded from the sandbox ## API ```python CodeMode( - tools: ToolSelector = 'all', # 'all', list[str], callable, or dict - max_retries: int = 3, # retries on sandbox execution errors + tools: ToolSelector = 'all', # 'all', list[str], callable, or dict + max_retries: int = 3, # retries on sandbox execution errors + os_access: CodeModeOS | None = None, # host handler for env vars, clock, and file I/O + mount: CodeModeMount | None = None, # host directories to share with the sandbox ) ``` diff --git a/pydantic_ai_harness/code_mode/__init__.py b/pydantic_ai_harness/code_mode/__init__.py index 42304fa..234438c 100644 --- a/pydantic_ai_harness/code_mode/__init__.py +++ b/pydantic_ai_harness/code_mode/__init__.py @@ -1,6 +1,6 @@ """Code mode capability: route tool calls through a sandboxed Python environment.""" from pydantic_ai_harness.code_mode._capability import CodeMode -from pydantic_ai_harness.code_mode._toolset import CodeModeToolset +from pydantic_ai_harness.code_mode._toolset import CodeModeMount, CodeModeOS, CodeModeOSCallback, CodeModeToolset -__all__ = ['CodeMode', 'CodeModeToolset'] +__all__ = ['CodeMode', 'CodeModeMount', 'CodeModeOS', 'CodeModeOSCallback', 'CodeModeToolset'] diff --git a/pydantic_ai_harness/code_mode/_capability.py b/pydantic_ai_harness/code_mode/_capability.py index 57eeaab..2dc8702 100644 --- a/pydantic_ai_harness/code_mode/_capability.py +++ b/pydantic_ai_harness/code_mode/_capability.py @@ -2,14 +2,14 @@ from __future__ import annotations -from dataclasses import dataclass, field +from dataclasses import KW_ONLY, dataclass, field from pydantic_ai import AbstractToolset from pydantic_ai.capabilities import AbstractCapability, CapabilityOrdering from pydantic_ai.capabilities._tool_search import ToolSearch as _ToolSearch from pydantic_ai.tools import AgentDepsT, ToolSelector -from pydantic_ai_harness.code_mode._toolset import CodeModeToolset +from pydantic_ai_harness.code_mode._toolset import CodeModeMount, CodeModeOS, CodeModeToolset @dataclass @@ -34,6 +34,23 @@ class CodeMode(AbstractCapability[AgentDepsT]): # Sandbox only specific tools agent = Agent('openai:gpt-5', capabilities=[CodeMode(tools=['search', 'fetch'])]) ``` + + By default, sandboxed code cannot touch the host -- no filesystem, environment + variables, or clock. Two parameters open it up: + + - `mount` shares specific host directories: reach for it when the agent reads or + writes real files. + - `os_access` routes the sandbox's OS calls to a handler you provide: reach for it + when the agent needs environment variables, the clock, or filesystem behavior you + control. + + Both expose the real host to model-written code, so grant only what the task needs. + + ```python + from pydantic_monty import MountDir + + agent = Agent('openai:gpt-5', capabilities=[CodeMode(mount=MountDir('/work', '/tmp/agent-work'))]) + ``` """ tools: ToolSelector[AgentDepsT] = field(default='all') @@ -48,10 +65,24 @@ class CodeMode(AbstractCapability[AgentDepsT]): max_retries: int = 3 """Maximum number of retries for the `run_code` tool (syntax errors count as retries).""" + _: KW_ONLY + + os_access: CodeModeOS | None = None + """Give sandboxed code environment variables, the clock, and file I/O through a handler you provide; unset, they are unavailable.""" + + mount: CodeModeMount | None = None + """Host directories to expose to sandboxed `pathlib` code; each mount's `mode` controls whether writes reach the host.""" + def get_ordering(self) -> CapabilityOrdering: """CodeMode wraps around ToolSearch so that search_tools stays native.""" return CapabilityOrdering(position='outermost', wraps=[_ToolSearch]) def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None: """Wrap the agent's assembled toolset, splitting it into native + sandboxed subsets if needed.""" - return CodeModeToolset(wrapped=toolset, tool_selector=self.tools, max_retries=self.max_retries) + return CodeModeToolset( + wrapped=toolset, + tool_selector=self.tools, + max_retries=self.max_retries, + os_access=self.os_access, + mount=self.mount, + ) diff --git a/pydantic_ai_harness/code_mode/_toolset.py b/pydantic_ai_harness/code_mode/_toolset.py index da503a9..cc6f1bc 100644 --- a/pydantic_ai_harness/code_mode/_toolset.py +++ b/pydantic_ai_harness/code_mode/_toolset.py @@ -26,6 +26,7 @@ try: from pydantic_monty import ( + AbstractOS, ExternalException, ExternalResult, ExternalReturnValue, @@ -37,7 +38,9 @@ MontyRuntimeError, MontySyntaxError, MontyTypingError, + MountDir, NameLookupSnapshot, + OsFunction, ) except ImportError as _import_error: # pragma: no cover raise ImportError( @@ -48,6 +51,14 @@ # Type alias for the dispatch callback passed to _execution_loop. _DispatchFn = Callable[[str, dict[str, Any]], Coroutine[Any, Any, Any]] +# A raw OS callback. Return `pydantic_monty.NOT_HANDLED` to defer the call to the +# sandbox's default, which leaves it unavailable. +CodeModeOSCallback = Callable[[OsFunction, tuple[Any, ...], dict[str, Any]], Any] +# Accepted by `CodeMode.os_access`: a ready-made OS implementation or a raw callback. +CodeModeOS = AbstractOS | CodeModeOSCallback +# Accepted by `CodeMode.mount`: one or more host-directory mounts. +CodeModeMount = MountDir | list[MountDir] + class _RunCodeArguments(TypedDict): code: Annotated[str, Field(description='The Python code to execute in the sandbox.')] @@ -69,14 +80,35 @@ class _RunCodeArguments(TypedDict): # and to reconstruct multimodal types (e.g. BinaryContent) from Monty results (validate_python). _TOOL_RETURN_CONTENT_TA: TypeAdapter[Any] = TypeAdapter(ToolReturnContent) -_RUN_CODE_BASE_DESCRIPTION = """\ +_RUN_CODE_DESCRIPTION_HEAD = """\ Write and run Python code in a sandboxed environment. The sandbox uses Monty, a subset of Python. Key restrictions: - **No classes**: class definitions are not supported - **No third-party libraries**: only the standard library modules listed below can be used -- **Importable standard library modules**: `sys`, `typing`, `asyncio`, `math`, `json`, `re`, `datetime`, `os`, `pathlib`. These must be imported at the top of your snippet before use, just like in regular Python. For example: `import asyncio` then `results = await asyncio.gather(tool_one(...), tool_two(...))`. -- **No wall-clock or timing primitives**: `asyncio.sleep`, `datetime.datetime.now()`, `datetime.date.today()`, and the `time` module are unavailable. +- **Importable standard library modules**: `sys`, `typing`, `asyncio`, `math`, `json`, `re`, `datetime`, `os`, `pathlib`. These must be imported before use, just like in regular Python. For example: `import asyncio` then `results = await asyncio.gather(tool_one(...), tool_two(...))`.""" + +# Timing/OS restriction line, swapped depending on what host access the agent +# configured. Three states, because `mount` and `os` enable different things: +# a `mount` only exposes filesystem paths, while environment and clock calls +# require an `os` handler. +_NO_OS_RESTRICTION = ( + '- **No wall-clock or timing primitives**: `asyncio.sleep`, `datetime.datetime.now()`, ' + '`datetime.date.today()`, and the `time` module are unavailable.' +) +_MOUNT_ONLY_NOTE = ( + '- **Mounted filesystem access**: `pathlib.Path` operations under the configured mount ' + 'point(s) are routed to the host. `os.getenv`/`os.environ`, `datetime.datetime.now()`, ' + '`datetime.date.today()`, `asyncio.sleep`, and the `time` module remain unavailable.' +) +_OS_ENABLED_NOTE = ( + '- **Host-backed OS access**: `pathlib.Path` operations, `os.getenv`/`os.environ`, ' + '`datetime.datetime.now()`, and `datetime.date.today()` are routed to the OS handler ' + 'configured for this agent (availability depends on that configuration). `asyncio.sleep` and ' + 'the `time` module remain unavailable.' +) + +_RUN_CODE_DESCRIPTION_TAIL = """\ - **No `import *`**: wildcard imports are not supported State is preserved between calls (REPL-style). Set `restart: true` to reset state. @@ -90,6 +122,22 @@ class _RunCodeArguments(TypedDict): """ +def _base_description(*, has_os: bool, has_mount: bool) -> str: + """Assemble the `run_code` base description with the right OS-access restriction line. + + `os` routes environment, clock, and filesystem calls; a `mount` alone only + exposes filesystem paths, so a mount-only sandbox must not advertise env or + clock access (the model would generate calls that fail and burn retries). + """ + if has_os: + restriction = _OS_ENABLED_NOTE + elif has_mount: + restriction = _MOUNT_ONLY_NOTE + else: + restriction = _NO_OS_RESTRICTION + return f'{_RUN_CODE_DESCRIPTION_HEAD}\n{restriction}\n{_RUN_CODE_DESCRIPTION_TAIL}' + + def _functions_header(*, has_sync: bool, has_async: bool) -> str: """Build the functions-header paragraph for the `run_code` tool description.""" base = ( @@ -184,6 +232,12 @@ class CodeModeToolset(WrapperToolset[AgentDepsT]): max_retries: int = 3 """Maximum number of retries for the `run_code` tool (syntax errors count as retries).""" + os_access: CodeModeOS | None = None + """Give sandboxed code environment variables, the clock, and file I/O through a handler you provide; unset, they are unavailable.""" + + mount: CodeModeMount | None = None + """Host directories to expose to sandboxed `pathlib` code; each mount's `mode` controls whether writes reach the host.""" + # init=False so `replace()` in `for_run` produces a fresh instance with _repl=None, # giving each agent run isolated REPL state. Lazy-initialized on first call_tool. _repl: MontyRepl | None = field(default=None, init=False, repr=False) @@ -236,7 +290,9 @@ async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[ callable_defs, sanitized_to_original = self._partition_callable_tools(sandboxed_tools) - description = self._build_description(callable_defs) + description = self._build_description( + callable_defs, has_os=self.os_access is not None, has_mount=self.mount is not None + ) if _RUN_CODE_TOOL_NAME in native_tools: raise UserError( @@ -399,7 +455,7 @@ async def dispatch_tool_call(original_name: str, kwargs: dict[str, Any]) -> Any: capture = _PrintCapture() try: - monty_state = self._repl.feed_start(code, print_callback=capture) + monty_state = self._repl.feed_start(code, print_callback=capture, os=self.os_access, mount=self.mount) completed = await _execution_loop( monty_state, dispatch=dispatch_tool_call, @@ -407,6 +463,8 @@ async def dispatch_tool_call(original_name: str, kwargs: dict[str, Any]) -> Any: sanitized_to_original=sanitized_to_original, sequential_tools=sequential_tools, global_sequential=global_sequential, + os_access=self.os_access, + mount=self.mount, ) except MontySyntaxError as e: raise ModelRetry(f'Syntax error in code:\n{_prepend_prints(e.display(), capture)}') from e @@ -504,10 +562,11 @@ def _partition_callable_tools( return callable_defs, sanitized_to_original @staticmethod - def _build_description(callable_defs: dict[str, ToolDefinition]) -> str: + def _build_description(callable_defs: dict[str, ToolDefinition], *, has_os: bool, has_mount: bool) -> str: """Render the `run_code` description: base prose + TypedDicts + function signatures.""" + base = _base_description(has_os=has_os, has_mount=has_mount) if not callable_defs: - return _RUN_CODE_BASE_DESCRIPTION + return base sigs, conflicting = _get_sigs_and_conflicting(callable_defs) type_blocks = FunctionSignature.render_type_definitions(sigs, conflicting) @@ -520,7 +579,7 @@ def _build_description(callable_defs: dict[str, ToolDefinition]) -> str: has_async = any(not td.sequential for td in callable_defs.values()) header = _functions_header(has_sync=has_sync, has_async=has_async) - sections = [_RUN_CODE_BASE_DESCRIPTION, header] + sections = [base, header] if type_blocks: sections.append('```python\n' + '\n\n'.join(type_blocks) + '\n```') sections.append('```python\n' + '\n\n'.join(function_blocks) + '\n```') @@ -579,6 +638,8 @@ async def _execution_loop( sanitized_to_original: dict[str, str], sequential_tools: set[str], global_sequential: bool, + os_access: CodeModeOS | None, + mount: CodeModeMount | None, ) -> MontyComplete: """Drive the Monty REPL via the synchronous snapshot API until completion. @@ -597,6 +658,9 @@ async def _execution_loop( - **Global sequential mode** (DBOS/Temporal): all tools are deferred via `resume({'future': ...})` but stored as bare coroutines and awaited one-at-a-time at `FutureSnapshot` to prevent interleaving. + + `os`/`mount` must be passed to every `resume` call (not just `feed_start`): + Monty's auto-dispatch of OS calls stops the moment a resume omits them. """ pending: dict[int, asyncio.Task[Any] | Coroutine[Any, Any, Any]] = {} # Results from parallel tasks that were awaited early (at a sequential-tool @@ -605,7 +669,7 @@ async def _execution_loop( try: while not isinstance(monty_state, MontyComplete): if isinstance(monty_state, NameLookupSnapshot): - monty_state = monty_state.resume() + monty_state = monty_state.resume(os=os_access, mount=mount) elif isinstance(monty_state, FunctionSnapshot): monty_state = await _handle_function_snapshot( monty_state, @@ -616,6 +680,8 @@ async def _execution_loop( global_sequential=global_sequential, pending=pending, pre_resolved=pre_resolved, + os_access=os_access, + mount=mount, ) else: monty_state = await _resolve_future_snapshot( @@ -623,6 +689,8 @@ async def _execution_loop( pending=pending, pre_resolved=pre_resolved, global_sequential=global_sequential, + os_access=os_access, + mount=mount, ) finally: for item in pending.values(): # pragma: no cover @@ -644,16 +712,20 @@ async def _handle_function_snapshot( global_sequential: bool, pending: dict[int, asyncio.Task[Any] | Coroutine[Any, Any, Any]], pre_resolved: dict[int, ExternalResult], + os_access: CodeModeOS | None, + mount: CodeModeMount | None, ) -> FunctionSnapshot | FutureSnapshot | NameLookupSnapshot | MontyComplete: """Handle a single FunctionSnapshot from the Monty execution loop.""" fn_name = snapshot.function_name if fn_name not in callable_defs: - return snapshot.resume({'exception': NameError(f'Unknown function: {fn_name}')}) + return snapshot.resume({'exception': NameError(f'Unknown function: {fn_name}')}, os=os_access, mount=mount) if snapshot.args: return snapshot.resume( - {'exception': TypeError(f'{fn_name}() does not accept positional arguments; use keyword arguments')} + {'exception': TypeError(f'{fn_name}() does not accept positional arguments; use keyword arguments')}, + os=os_access, + mount=mount, ) original_name = sanitized_to_original.get(fn_name, fn_name) @@ -666,8 +738,8 @@ async def _handle_function_snapshot( pre_resolved[cid] = await _resolve_coro(pending.pop(cid)) outcome = await _resolve_coro(dispatch(original_name, snapshot.kwargs)) if 'return_value' in outcome: - return snapshot.resume({'return_value': outcome['return_value']}) - return snapshot.resume({'exception': outcome['exception']}) + return snapshot.resume({'return_value': outcome['return_value']}, os=os_access, mount=mount) + return snapshot.resume({'exception': outcome['exception']}, os=os_access, mount=mount) # Deferred execution — store for later resolution at FutureSnapshot. if global_sequential: @@ -676,7 +748,7 @@ async def _handle_function_snapshot( else: # Eagerly schedule as a Task for concurrent execution. pending[snapshot.call_id] = asyncio.ensure_future(dispatch(original_name, snapshot.kwargs)) - return snapshot.resume({'future': ...}) + return snapshot.resume({'future': ...}, os=os_access, mount=mount) async def _resolve_future_snapshot( @@ -685,11 +757,13 @@ async def _resolve_future_snapshot( pending: dict[int, asyncio.Task[Any] | Coroutine[Any, Any, Any]], pre_resolved: dict[int, ExternalResult], global_sequential: bool, + os_access: CodeModeOS | None, + mount: CodeModeMount | None, ) -> FunctionSnapshot | FutureSnapshot | NameLookupSnapshot | MontyComplete: """Resolve pending tool calls at a FutureSnapshot.""" pending_ids = snapshot.pending_call_ids if not pending_ids: # pragma: no cover - return snapshot.resume(results={}) + return snapshot.resume(results={}, os=os_access, mount=mount) results: dict[int, ExternalResult] = {} for cid in pending_ids: @@ -708,7 +782,7 @@ async def _resolve_future_snapshot( for cid, outcome in zip(gather_ids, settled): results[cid] = _settle_outcome(outcome) - return snapshot.resume(results=results) + return snapshot.resume(results=results, os=os_access, mount=mount) async def _resolve_coro( diff --git a/tests/code_mode/test_code_mode.py b/tests/code_mode/test_code_mode.py index 1ffb084..e6f3f1a 100644 --- a/tests/code_mode/test_code_mode.py +++ b/tests/code_mode/test_code_mode.py @@ -8,6 +8,7 @@ from __future__ import annotations +from pathlib import Path from typing import Any, TypeVar import pytest @@ -24,6 +25,7 @@ from pydantic_ai.toolsets.function import FunctionToolset from pydantic_ai.usage import RunUsage from pydantic_core import SchemaValidator, core_schema +from pydantic_monty import NOT_HANDLED, MountDir, OSAccess, OsFunction from typing_extensions import TypedDict from pydantic_ai_harness import CodeMode @@ -1854,6 +1856,196 @@ def test_code_mode_ordering(self) -> None: assert ToolSearch in ordering.wraps +def _unused_os_callback(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + """An `os` callback for tests that only assert description/forwarding, never run code.""" + return NOT_HANDLED # pragma: no cover - never invoked by these tests + + +class TestCodeModeOSAccess: + """`CodeMode(os_access=...)` / `mount=...` give sandboxed code host-backed OS access.""" + + async def test_description_default_keeps_no_wallclock_restriction(self) -> None: + """Without `os`/`mount`, the description keeps the no-wall-clock restriction.""" + wrapper = CodeMode[None]().get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + description = (await wrapper.get_tools(build_run_context(None)))['run_code'].tool_def.description + assert description is not None + assert 'No wall-clock or timing primitives' in description + + async def test_description_with_os_callback_notes_host_access(self) -> None: + """An `os` callback swaps the restriction line for the host-access note.""" + wrapper = CodeMode[None](os_access=_unused_os_callback).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + description = (await wrapper.get_tools(build_run_context(None)))['run_code'].tool_def.description + assert description is not None + assert 'Host-backed OS access' in description + + async def test_description_mount_only_advertises_filesystem_not_env_or_clock(self, tmp_path: Path) -> None: + """A `mount` without `os` advertises filesystem access only -- it must not tell the model + that env/clock are host-backed, since a mount cannot route `os.getenv`/`datetime.now()`.""" + wrapper = CodeMode[None](mount=MountDir('/work', str(tmp_path))).get_wrapper_toolset( + _build_function_toolset(add) + ) + assert isinstance(wrapper, CodeModeToolset) + description = (await wrapper.get_tools(build_run_context(None)))['run_code'].tool_def.description + assert description is not None + # The regression guard: a mount must select the filesystem note, not the OS note that would + # (wrongly) advertise env/clock as host-routed -- this assert fails if the OS note is picked. + assert 'Mounted filesystem access' in description + + async def test_description_host_access_note_shows_with_no_sandboxed_tools(self) -> None: + """The host-access note appears even when no tools are sandboxed (base description).""" + # `tools=[]` sandboxes nothing, so `run_code` renders the base description path. + wrapper = CodeMode[None](os_access=_unused_os_callback, tools=[]).get_wrapper_toolset( + _build_function_toolset(add) + ) + assert isinstance(wrapper, CodeModeToolset) + description = (await wrapper.get_tools(build_run_context(None)))['run_code'].tool_def.description + assert description is not None + assert 'Host-backed OS access' in description + + async def test_os_callback_dispatches_inside_run_code(self) -> None: + """An `os` callback is threaded through `feed_start` and every `resume`, so OS calls + keep dispatching even after a tool call suspends and resumes the sandbox.""" + + def os_cb(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + if fn == 'os.getenv': + return 'envval' + return NOT_HANDLED # pragma: no cover - sandbox only calls os.getenv here + + wrapper = CodeMode[None](os_access=os_cb).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + # The tool call forces a FunctionSnapshot -> FutureSnapshot round-trip; the os.getenv + # afterwards only resolves if `os` survived those resumes. + code = "import os\nx = await add(a=2, b=3)\nhome = os.getenv('THING')\n{'sum': x, 'home': home}" + result = await wrapper.call_tool('run_code', {'code': code}, ctx, tools['run_code']) + assert result.return_value == {'sum': 5, 'home': 'envval'} + + async def test_os_access_persists_across_run_code_calls(self) -> None: + """`os` is supplied on every `feed_start`, so OS access still works on a later + `run_code` call that reuses the persisted (non-fresh) REPL.""" + + def os_cb(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + if fn == 'os.getenv': + return 'persisted' + return NOT_HANDLED # pragma: no cover - sandbox only calls os.getenv here + + wrapper = CodeMode[None](os_access=os_cb).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + first = await wrapper.call_tool('run_code', {'code': "import os\nos.getenv('A')"}, ctx, tools['run_code']) + assert first.return_value == 'persisted' + # Second call reuses the REPL (so `import os` carries over) and must still dispatch. + second = await wrapper.call_tool('run_code', {'code': "os.getenv('B')"}, ctx, tools['run_code']) + assert second.return_value == 'persisted' + + async def test_abstract_os_instance_dispatches_inside_run_code(self) -> None: + """An `AbstractOS` instance is accepted as the `os` value and dispatches OS calls.""" + wrapper = CodeMode[None](os_access=OSAccess(environ={'THING': 'fromabs'})).get_wrapper_toolset( + _build_function_toolset(add) + ) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + result = await wrapper.call_tool('run_code', {'code': "import os\nos.getenv('THING')"}, ctx, tools['run_code']) + assert result.return_value == 'fromabs' + + async def test_os_callback_exception_becomes_model_retry(self) -> None: + """A raising `os` callback surfaces as a `ModelRetry`, like any other sandbox runtime + error -- it must not crash the agent loop.""" + + def os_cb(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + raise ValueError('boom from os') + + wrapper = CodeMode[None](os_access=os_cb).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + with pytest.raises(ModelRetry, match='boom from os'): + await wrapper.call_tool('run_code', {'code': "import os\nos.getenv('X')"}, ctx, tools['run_code']) + + async def test_os_callback_returning_value_answers_call_including_none(self) -> None: + """Returning a value from the `os` callback -- even `None` -- *answers* the call. + + Allow-listed keys resolve; every other key reads back as `None`, exactly like a real + unset env var, so the sandbox keeps running with no retry. This is how a callback hides + a secret: by answering with an empty value, not by refusing the call. + """ + allowed = {'API_KEY': 'sk-xxx'} + + def os_cb(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + if fn == 'os.getenv': + return allowed.get(args[0]) + return NOT_HANDLED # pragma: no cover - sandbox only calls os.getenv here + + wrapper = CodeMode[None](os_access=os_cb).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + code = "import os\n{'allowed': os.getenv('API_KEY'), 'hidden': os.getenv('SECRET')}" + result = await wrapper.call_tool('run_code', {'code': code}, ctx, tools['run_code']) + assert result.return_value == {'allowed': 'sk-xxx', 'hidden': None} + + async def test_os_callback_not_handled_refuses_call_as_model_retry(self) -> None: + """Returning `NOT_HANDLED` *refuses* the call rather than answering it. + + The OS function is treated as unsupported, so it raises in the sandbox and surfaces as + `ModelRetry`. This is the counterpart to returning a value: refusing is not the same as + answering `None`, and using it for a key the model expects will burn retries. + """ + + def os_cb(fn: OsFunction, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + return NOT_HANDLED + + wrapper = CodeMode[None](os_access=os_cb).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + with pytest.raises(ModelRetry, match='not supported in this environment'): + await wrapper.call_tool('run_code', {'code': "import os\nos.getenv('X')"}, ctx, tools['run_code']) + + async def test_mount_exposes_host_directory(self, tmp_path: Path) -> None: + """A `mount` exposes a host directory inside the sandbox, threaded through resumes.""" + (tmp_path / 'data.txt').write_text('hello-from-host') + wrapper = CodeMode[None](mount=MountDir('/work', str(tmp_path))).get_wrapper_toolset( + _build_function_toolset(add) + ) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + code = "from pathlib import Path\nawait add(a=1, b=1)\nPath('/work/data.txt').read_text()" + result = await wrapper.call_tool('run_code', {'code': code}, ctx, tools['run_code']) + assert result.return_value == 'hello-from-host' + + async def test_mount_accepts_list_of_directories(self, tmp_path: Path) -> None: + """`mount` accepts a `list[MountDir]`; each directory is exposed at its virtual path.""" + (tmp_path / 'a').mkdir() + (tmp_path / 'b').mkdir() + (tmp_path / 'a' / 'f.txt').write_text('AA') + (tmp_path / 'b' / 'f.txt').write_text('BB') + mounts = [MountDir('/a', str(tmp_path / 'a')), MountDir('/b', str(tmp_path / 'b'))] + wrapper = CodeMode[None](mount=mounts).get_wrapper_toolset(_build_function_toolset(add)) + assert isinstance(wrapper, CodeModeToolset) + ctx = await build_ctx(None, wrapper) + tools = await wrapper.get_tools(ctx) + code = "from pathlib import Path\nPath('/a/f.txt').read_text() + Path('/b/f.txt').read_text()" + result = await wrapper.call_tool('run_code', {'code': code}, ctx, tools['run_code']) + assert result.return_value == 'AABB' + + def test_capability_forwards_os_and_mount_to_toolset(self, tmp_path: Path) -> None: + """`CodeMode` forwards `os_access`/`mount` onto the `CodeModeToolset` it builds.""" + mount = MountDir('/work', str(tmp_path)) + wrapper = CodeMode[None](os_access=_unused_os_callback, mount=mount).get_wrapper_toolset( + _build_function_toolset(add) + ) + assert isinstance(wrapper, CodeModeToolset) + assert wrapper.os_access is _unused_os_callback + assert wrapper.mount is mount + + def _search_tool_def(description: str = 'Search for tools.') -> ToolDefinition: """Create a ToolDefinition mimicking the search_tools tool from ToolSearchToolset.""" from pydantic_ai.toolsets._tool_search import _SEARCH_TOOLS_NAME