diff --git a/PLAN.md b/PLAN.md
new file mode 100644
index 0000000..a94daff
--- /dev/null
+++ b/PLAN.md
@@ -0,0 +1,63 @@
+# Compaction Capability — Implementation Plan
+
+Closes #21
+
+## Overview
+
+This PR adds three compaction-related capabilities to `pydantic-harness`:
+
+1. **`SlidingWindow`** — Zero-cost message trimming via a configurable sliding window.
+2. **`LimitWarner`** — Injects warning messages when the agent approaches iteration, context-window, or total-token limits.
+3. **`Compaction`** — LLM-powered summarization that replaces older messages with a compact summary.
+
+All three are `AbstractCapability` subclasses that operate via the `before_model_request` hook, modifying `request_context.messages` before each model call.
+
+## Design Decisions
+
+### Tool-call / tool-return pair safety
+
+The most critical invariant: trimming or compacting must **never** orphan a `ToolCallPart` without its corresponding `ToolReturnPart` (or vice versa). Doing so causes HTTP 400 errors from LLM providers.
+
+The implementation uses a `_is_safe_cutoff()` function that searches around a proposed cutoff point for tool-call pairs that would be split. If a cutoff is unsafe, it walks backward to find a safe one. This approach is adapted from [vstorm-co/summarization-pydantic-ai](https://github.com/vstorm-co/summarization-pydantic-ai)'s `_cutoff.py`.
+
+### Trigger and retention modes
+
+Both `SlidingWindow` and `Compaction` support two trigger modes:
+- `max_messages` — fire when message count exceeds threshold
+- `max_tokens` — fire when estimated token count exceeds threshold
+
+And two retention modes:
+- `keep_messages` — retain N tail messages
+- `keep_tokens` — retain messages fitting within a token budget
+
+### Token estimation
+
+A simple `estimate_token_count()` function approximates tokens at ~4 characters per token. This avoids requiring a tokenizer dependency while providing reasonable estimates for threshold detection.
+
+### LimitWarner design
+
+Warnings are injected as a trailing `ModelRequest` with a `UserPromptPart` (not a system message), because models tend to pay more attention to user messages. A `[LimitWarner]` marker enables stripping previous warnings before injecting new ones, preventing warning accumulation.
+
+### Compaction summarization
+
+The `Compaction` capability creates a temporary `pydantic_ai.Agent` with the configured summarization model. System prompts from the beginning of the conversation are preserved and prepended to the summary message.
+
+## Dependencies
+
+- Requires `pydantic-ai-slim` with the capabilities branch (not yet on PyPI).
+- For local development, add a `[tool.uv.sources]` override pointing to the capabilities branch checkout.
+
+## Files
+
+- `src/pydantic_harness/compaction.py` — All three capabilities plus helpers
+- `src/pydantic_harness/__init__.py` — Package exports
+- `tests/test_compaction.py` — 81 tests covering all code paths
+- `pyproject.toml` — Coverage threshold adjustment (98% due to branch coverage of elif chains)
+
+## References
+
+- [pydantic/pydantic-ai#4137](https://github.com/pydantic/pydantic-ai/issues/4137) — First-class Context Compaction API
+- [pydantic/pydantic-ai#4267](https://github.com/pydantic/pydantic-ai/issues/4267) — Anthropic Compactions
+- [pydantic/pydantic-ai#4013](https://github.com/pydantic/pydantic-ai/issues/4013) — OpenAI Compactions
+- [pydantic/pydantic-harness#35](https://github.com/pydantic/pydantic-harness/issues/35) — Expose context window size on ModelProfile
+- [vstorm-co/summarization-pydantic-ai](https://github.com/vstorm-co/summarization-pydantic-ai) — Prior art for cutoff logic
diff --git a/pydantic_ai_harness/experimental/__init__.py b/pydantic_ai_harness/experimental/__init__.py
new file mode 100644
index 0000000..b59be4b
--- /dev/null
+++ b/pydantic_ai_harness/experimental/__init__.py
@@ -0,0 +1,13 @@
+"""Experimental pydantic-ai-harness capabilities.
+
+Anything under `pydantic_ai_harness.experimental` may change or be removed in any release,
+without a deprecation period. Importing an experimental capability emits a
+`HarnessExperimentalWarning` that tells you how to silence the whole category at once.
+
+Importing this module on its own does **not** emit a warning, so you can pull in
+`HarnessExperimentalWarning` to silence the warnings before importing a capability.
+"""
+
+from pydantic_ai_harness.experimental._warn import HarnessExperimentalWarning
+
+__all__ = ['HarnessExperimentalWarning']
diff --git a/pydantic_ai_harness/experimental/_warn.py b/pydantic_ai_harness/experimental/_warn.py
new file mode 100644
index 0000000..3ddf1e7
--- /dev/null
+++ b/pydantic_ai_harness/experimental/_warn.py
@@ -0,0 +1,40 @@
+"""Experimental-feature warning machinery for pydantic-ai-harness."""
+
+from __future__ import annotations
+
+import warnings
+
+
+class HarnessExperimentalWarning(UserWarning):
+ """Signals that a pydantic-ai-harness feature is experimental.
+
+ Experimental features may change or be removed in any release, without a deprecation
+ period. Silence every experimental-harness warning at once with::
+
+ import warnings
+ from pydantic_ai_harness.experimental import HarnessExperimentalWarning
+
+ warnings.filterwarnings('ignore', category=HarnessExperimentalWarning)
+ """
+
+
+_SILENCE_HINT = (
+ ' import warnings\n'
+ ' from pydantic_ai_harness.experimental import HarnessExperimentalWarning\n'
+ " warnings.filterwarnings('ignore', category=HarnessExperimentalWarning)"
+)
+
+
+def warn_experimental(feature: str) -> None:
+ """Emit a `HarnessExperimentalWarning` for *feature*, including how to silence all of them.
+
+ One filter silences the whole category — every experimental capability — so users never
+ need a suppression line per capability.
+ """
+ warnings.warn(
+ f'`pydantic_ai_harness.experimental.{feature}` is experimental: its API may change or be '
+ f'removed in any release, without a deprecation period.\n\n'
+ f'Silence all pydantic-ai-harness experimental warnings with:\n\n{_SILENCE_HINT}\n',
+ category=HarnessExperimentalWarning,
+ stacklevel=2,
+ )
diff --git a/pydantic_ai_harness/experimental/compaction/README.md b/pydantic_ai_harness/experimental/compaction/README.md
new file mode 100644
index 0000000..67dd838
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/README.md
@@ -0,0 +1,124 @@
+# Compaction capabilities
+
+> [!WARNING]
+> **Experimental.** These capabilities live under `pydantic_ai_harness.experimental` and may
+> change or be removed in any release, without a deprecation period. Import them from the
+> experimental path — there is no top-level export:
+>
+> ```python
+> from pydantic_ai_harness.experimental.compaction import TieredCompaction
+> ```
+>
+> Importing any experimental capability emits a `HarnessExperimentalWarning`. Silence **all**
+> harness experimental warnings with a single filter (no per-capability lines needed):
+>
+> ```python
+> import warnings
+> from pydantic_ai_harness.experimental import HarnessExperimentalWarning
+>
+> warnings.filterwarnings('ignore', category=HarnessExperimentalWarning)
+> ```
+
+A menu of strategies for keeping an agent's conversation history within a model's context
+window. Each is a Pydantic AI `Capability` that runs in the `before_model_request` hook; edits
+**persist** into the run's message history, so a trim/clear/summary carries forward to later
+steps (it is not recomputed from the full history every turn).
+
+All strategies preserve tool-call / tool-return **pairing** — core does not validate this, and a
+provider rejects an orphaned pair. The zero-LLM strategies never call a model.
+
+## The menu
+
+| Capability | Cost | What it does | Reach for it when |
+|---|---|---|---|
+| `SlidingWindow` | zero-LLM | Drops the oldest whole messages down to a tail | You only need the recent turns and can discard old context entirely |
+| `ClearToolResults` | zero-LLM | Blanks the content of old tool *results* in place, keeping the last `keep_pairs` | Tool outputs dominate context and can be re-fetched on demand (the cheap first tier) |
+| `DeduplicateFileReads` | zero-LLM | Blanks every file read superseded by a newer read of the same file | The agent re-reads files and only the latest version matters |
+| `SummarizingCompaction` | one LLM call | Summarizes older messages into a structured summary, keeping the recent tail | Old context still matters but must be compressed; use behind the cheap tiers |
+| `TieredCompaction` | escalates | Runs cheap passes first, summarizes only if still over `target_tokens` | You want the SOTA default: spend the expensive summary only when needed |
+| `LimitWarner` | zero-LLM | Injects an URGENT/CRITICAL warning as limits approach | You want the agent to wrap up rather than have its history rewritten |
+
+## Triggers
+
+Every size-based strategy triggers on `max_messages` and/or `max_tokens` (estimated). Token counts
+use a ~4-chars-per-token heuristic by default; pass a `tokenizer` callable (e.g. `tiktoken`) for
+accuracy. `DeduplicateFileReads` runs on every request when no trigger is set (it is cheap and
+near-lossless). `TieredCompaction` triggers and stops on a single `target_tokens` budget.
+
+## Cost: why summarization is the last resort
+
+Summarization turns input tokens into output tokens, which are billed at a premium and generated
+serially — so it is genuinely expensive. The zero-LLM strategies touch only the cheaper input side.
+The field consensus (Anthropic, OpenCode, Letta) is to clear/dedupe first and summarize only when
+that is not enough — which is exactly what `TieredCompaction` encodes:
+
+```python
+from pydantic_ai import Agent
+from pydantic_ai_harness.experimental.compaction import (
+ ClearToolResults,
+ DeduplicateFileReads,
+ SummarizingCompaction,
+ TieredCompaction,
+)
+
+agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[
+ TieredCompaction(
+ tiers=[
+ DeduplicateFileReads(file_key=my_file_key),
+ ClearToolResults(max_tokens=1, keep_pairs=3),
+ SummarizingCompaction(max_messages=1, keep_messages=20), # model inherits the run's
+ ],
+ target_tokens=120_000,
+ )
+ ],
+)
+```
+
+A tier inside `TieredCompaction` is driven directly by the orchestrator, which re-measures after each
+and stops once under `target_tokens` — so a tier's own `max_*` trigger is irrelevant there (set it to
+anything valid). Any object with `async def compact(messages, ctx) -> list[ModelMessage]`
+(`CompactionStrategy`) can be a tier, so you can plug in your own.
+
+## Cache tradeoff (read before using `ClearToolResults`)
+
+Clearing or deduplicating rewrites message content, which invalidates the provider's prompt cache
+from the edit point onward — the next request pays a cache-write. Use `ClearToolResults`'
+`min_clear_tokens` to skip clearing that reclaims too little to be worth busting the cache.
+
+## Model inheritance
+
+`SummarizingCompaction(model=...)` accepts a model name or `Model`; when left `None` it inherits the
+running agent's model. No token caps are imposed on the summary call.
+
+## Usage accounting
+
+The summary call is a real request to the model, so its full usage — tokens **and** the request
+itself — is folded into the run's `ctx.usage`. This is deliberate: it keeps cost honest, keeps the
+request count consistent (a model request that didn't count as one would be the surprise), and lets a
+`UsageLimits` request limit catch a runaway compaction. A run-request / iteration limiter will
+therefore see compaction calls among its requests.
+
+## `DeduplicateFileReads.file_key`
+
+There is no default `file_key`: identifying a file read is agent-specific, and a wrong guess would
+drop live data. Supply a callable mapping a `ToolCallPart` to a stable file key, or `None` when the
+call is not a file read:
+
+```python
+from pydantic_ai.messages import ToolCallPart
+
+
+def my_file_key(call: ToolCallPart) -> str | None:
+ if call.tool_name != 'read_file':
+ return None
+ args = call.args
+ return args.get('path') if isinstance(args, dict) else None
+```
+
+## Out of scope
+
+These strategies compress or drop context *inside* the window. Moving large tool outputs *out* of the
+window — overflowing them to a file the agent (or a subagent) can query on demand — is a separate
+capability, not lossy truncation. Prefer it over capping individual tool outputs.
diff --git a/pydantic_ai_harness/experimental/compaction/__init__.py b/pydantic_ai_harness/experimental/compaction/__init__.py
new file mode 100644
index 0000000..715a3f7
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/__init__.py
@@ -0,0 +1,28 @@
+"""Compaction capabilities: keep an agent's conversation history within the context window.
+
+Each capability lives in its own module; shared utilities (token estimation, the
+`CompactionStrategy` protocol, tool-pair-safe cutoffs, in-place clearing) live in `_shared`.
+"""
+
+from pydantic_ai_harness.experimental._warn import warn_experimental
+from pydantic_ai_harness.experimental.compaction._clear_tool_results import ClearToolResults
+from pydantic_ai_harness.experimental.compaction._deduplicate_file_reads import DeduplicateFileReads
+from pydantic_ai_harness.experimental.compaction._limit_warner import LimitWarner, WarningKind
+from pydantic_ai_harness.experimental.compaction._shared import CompactionStrategy, estimate_token_count
+from pydantic_ai_harness.experimental.compaction._sliding_window import SlidingWindow
+from pydantic_ai_harness.experimental.compaction._summarizing_compaction import SummarizingCompaction
+from pydantic_ai_harness.experimental.compaction._tiered_compaction import TieredCompaction
+
+warn_experimental('compaction')
+
+__all__ = [
+ 'ClearToolResults',
+ 'CompactionStrategy',
+ 'DeduplicateFileReads',
+ 'LimitWarner',
+ 'SlidingWindow',
+ 'SummarizingCompaction',
+ 'TieredCompaction',
+ 'WarningKind',
+ 'estimate_token_count',
+]
diff --git a/pydantic_ai_harness/experimental/compaction/_clear_tool_results.py b/pydantic_ai_harness/experimental/compaction/_clear_tool_results.py
new file mode 100644
index 0000000..db88778
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_clear_tool_results.py
@@ -0,0 +1,135 @@
+"""`ClearToolResults` — zero-cost in-place clearing of old tool results."""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import ModelMessage
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import (
+ estimate_token_count,
+ exceeds,
+ iter_tool_pairs,
+ rebuild_with_cleared,
+)
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import ModelRequestContext
+
+
+@dataclass
+class ClearToolResults(AbstractCapability[AgentDepsT]):
+ """Zero-cost in-place clearing of old tool results.
+
+ Replaces the content of the oldest tool *results* with a short placeholder while
+ keeping the most recent ``keep_pairs`` tool-call / tool-return pairs intact. Tool
+ calls remain paired with their (now-blanked) results, so the history stays valid.
+ No LLM calls are made.
+
+ This is the cheap first tier of compaction — tool results typically dominate
+ context, and the agent can re-run a tool if it needs the data again.
+
+ Cache tradeoff: clearing rewrites message content, which invalidates the provider's
+ prompt cache from the clear point onward (the next request pays a cache-write). Use
+ ``min_clear_tokens`` to skip clearing that reclaims too little to be worth busting the
+ cache.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai_harness.experimental.compaction import ClearToolResults
+
+ agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[ClearToolResults(max_tokens=100_000, keep_pairs=3)],
+ )
+ ```
+ """
+
+ max_messages: int | None = None
+ """Trigger clearing when message count reaches this value. ``None`` disables."""
+
+ max_tokens: int | None = None
+ """Trigger clearing when estimated token count reaches this value. ``None`` disables."""
+
+ keep_pairs: int = 3
+ """Number of most-recent tool-call / tool-return pairs left untouched."""
+
+ placeholder: str = '[tool result cleared]'
+ """Replacement content for a cleared tool result."""
+
+ exclude_tools: frozenset[str] = frozenset()
+ """Tool names whose results are never cleared."""
+
+ clear_tool_inputs: bool = False
+ """When ``True``, also blank the arguments of the cleared tool calls."""
+
+ min_clear_tokens: int | None = None
+ """Only clear if doing so reclaims at least this many estimated tokens.
+
+ Protects the prompt cache from being invalidated for a trivial gain. ``None`` always clears.
+ """
+
+ tokenizer: Callable[[str], int] | None = None
+ """Optional tokenizer for accurate token counting.
+
+ A callable that returns the token count for a given string.
+ When ``None``, uses a ~4 characters-per-token heuristic.
+ """
+
+ def __post_init__(self) -> None:
+ if self.max_messages is None and self.max_tokens is None:
+ raise ValueError('At least one of max_messages or max_tokens must be set.')
+ if self.max_messages is not None and self.max_messages < 1:
+ raise ValueError('max_messages must be positive.')
+ if self.max_tokens is not None and self.max_tokens < 1:
+ raise ValueError('max_tokens must be positive.')
+ if self.keep_pairs < 0:
+ raise ValueError('keep_pairs must be non-negative.')
+ if self.min_clear_tokens is not None and self.min_clear_tokens < 0:
+ raise ValueError('min_clear_tokens must be non-negative.')
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]:
+ """Blank the oldest tool results beyond the most recent ``keep_pairs``."""
+ pairs = iter_tool_pairs(messages)
+ clearable = pairs[: max(0, len(pairs) - self.keep_pairs)]
+
+ clear_return_ids: set[str] = set()
+ clear_input_ids: set[str] = set()
+ for pair in clearable:
+ if pair.tool_name in self.exclude_tools:
+ continue
+ clear_return_ids.add(pair.tool_call_id)
+ if self.clear_tool_inputs:
+ clear_input_ids.add(pair.tool_call_id)
+
+ if not clear_return_ids:
+ return messages
+
+ cleared = rebuild_with_cleared(messages, clear_return_ids, clear_input_ids, self.placeholder)
+ if self.min_clear_tokens is not None:
+ reclaimed = estimate_token_count(messages, self.tokenizer) - estimate_token_count(cleared, self.tokenizer)
+ if reclaimed < self.min_clear_tokens:
+ return messages
+ return cleared
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Clear old tool results if the conversation exceeds the configured threshold."""
+ messages: list[ModelMessage] = list(request_context.messages)
+ if not exceeds(messages, self.max_messages, self.max_tokens, self.tokenizer):
+ return request_context
+ request_context.messages = await self.compact(messages, ctx)
+ return request_context
diff --git a/pydantic_ai_harness/experimental/compaction/_deduplicate_file_reads.py b/pydantic_ai_harness/experimental/compaction/_deduplicate_file_reads.py
new file mode 100644
index 0000000..63d436d
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_deduplicate_file_reads.py
@@ -0,0 +1,111 @@
+"""`DeduplicateFileReads` — zero-cost in-place clearing of superseded file reads."""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import ModelMessage, ToolCallPart
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import exceeds, iter_tool_pairs, rebuild_with_cleared
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import ModelRequestContext
+
+
+@dataclass
+class DeduplicateFileReads(AbstractCapability[AgentDepsT]):
+ """Zero-cost in-place clearing of superseded file reads.
+
+ When the same file is read more than once, only the latest read keeps its content;
+ earlier reads are blanked with a placeholder. Tool-call pairing is preserved. No LLM
+ calls are made.
+
+ File identity is supplied by the ``file_key`` seam — given a ``ToolCallPart`` it returns
+ a stable key for the file being read, or ``None`` if the call is not a file read. There
+ is no default: file-read identification is agent-specific, and a wrong guess would drop
+ live data.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai.messages import ToolCallPart
+ from pydantic_ai_harness.experimental.compaction import DeduplicateFileReads
+
+
+ def file_key(call: ToolCallPart) -> str | None:
+ if call.tool_name != 'read_file':
+ return None
+ args = call.args_as_dict()
+ return args.get('path')
+
+
+ agent = Agent('openai:gpt-4o', capabilities=[DeduplicateFileReads(file_key=file_key)])
+ ```
+ """
+
+ file_key: Callable[[ToolCallPart], str | None]
+ """Map a tool call to a stable file key, or ``None`` if it is not a file read."""
+
+ placeholder: str = '[superseded file read]'
+ """Replacement content for a superseded file read."""
+
+ max_messages: int | None = None
+ """Optional message-count trigger. When both triggers are ``None``, runs whenever invoked."""
+
+ max_tokens: int | None = None
+ """Optional token-count trigger. When both triggers are ``None``, runs whenever invoked."""
+
+ tokenizer: Callable[[str], int] | None = None
+ """Optional tokenizer for accurate token counting.
+
+ A callable that returns the token count for a given string.
+ When ``None``, uses a ~4 characters-per-token heuristic.
+ """
+
+ def __post_init__(self) -> None:
+ if self.max_messages is not None and self.max_messages < 1:
+ raise ValueError('max_messages must be positive.')
+ if self.max_tokens is not None and self.max_tokens < 1:
+ raise ValueError('max_tokens must be positive.')
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]:
+ """Blank every file read that is later superseded by a newer read of the same file."""
+ pairs = iter_tool_pairs(messages)
+ keys: list[str | None] = []
+ latest_order: dict[str, int] = {}
+ for pair in pairs:
+ key = self.file_key(pair.call_part)
+ keys.append(key)
+ if key is not None:
+ latest_order[key] = pair.order
+
+ clear_return_ids: set[str] = set()
+ for pair, key in zip(pairs, keys):
+ if key is not None and latest_order[key] != pair.order:
+ clear_return_ids.add(pair.tool_call_id)
+
+ if not clear_return_ids:
+ return messages
+ return rebuild_with_cleared(messages, clear_return_ids, set(), self.placeholder)
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Deduplicate file reads, optionally gated on a size threshold."""
+ messages: list[ModelMessage] = list(request_context.messages)
+ if self.max_messages is not None or self.max_tokens is not None:
+ if not exceeds(messages, self.max_messages, self.max_tokens, self.tokenizer):
+ return request_context
+ request_context.messages = await self.compact(messages, ctx)
+ return request_context
diff --git a/pydantic_ai_harness/experimental/compaction/_limit_warner.py b/pydantic_ai_harness/experimental/compaction/_limit_warner.py
new file mode 100644
index 0000000..bd2dc9e
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_limit_warner.py
@@ -0,0 +1,218 @@
+"""`LimitWarner` — injects warnings as the run approaches configured limits."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any, Literal
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import ModelMessage, ModelRequest, SystemPromptPart, UserPromptPart
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import estimate_token_count
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import ModelRequestContext
+
+WarningKind = Literal['iterations', 'context_window', 'total_tokens']
+"""Categories of limits that can trigger warnings."""
+
+_WARNING_ORDER: tuple[WarningKind, ...] = ('iterations', 'context_window', 'total_tokens')
+_MARKER = '[LimitWarner]'
+
+
+@dataclass(frozen=True)
+class _Warning:
+ kind: WarningKind
+ severity: Literal['URGENT', 'CRITICAL']
+ details: str
+
+
+@dataclass
+class LimitWarner(AbstractCapability[AgentDepsT]):
+ """Injects a warning message when the agent approaches configured limits.
+
+ The warning is appended as a trailing ``ModelRequest`` with a
+ ``UserPromptPart`` so that the model treats it as a distinct user turn
+ (models tend to pay more attention to user messages than system messages).
+
+ Previous warnings injected by this capability are stripped before deciding
+ whether to inject a new one.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai_harness.experimental.compaction import LimitWarner
+
+ agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[LimitWarner(
+ max_iterations=40,
+ max_context_tokens=100_000,
+ )],
+ )
+ ```
+ """
+
+ max_iterations: int | None = None
+ """Maximum allowed requests for the run."""
+
+ max_context_tokens: int | None = None
+ """Maximum context-window size to warn against."""
+
+ max_total_tokens: int | None = None
+ """Maximum cumulative run token budget to warn against."""
+
+ warn_on: list[WarningKind] | None = None
+ """Which limits should emit warnings. Defaults to all configured limits."""
+
+ warning_threshold: float = 0.7
+ """Fraction of a limit at which warnings begin (between 0 and 1)."""
+
+ critical_remaining_iterations: int = 3
+ """Remaining request count at which iteration warnings become CRITICAL."""
+
+ _active_kinds: tuple[WarningKind, ...] = field(default=(), init=False, repr=False)
+
+ def __post_init__(self) -> None:
+ if self.max_iterations is not None and self.max_iterations <= 0:
+ raise ValueError('max_iterations must be positive.')
+ if self.max_context_tokens is not None and self.max_context_tokens <= 0:
+ raise ValueError('max_context_tokens must be positive.')
+ if self.max_total_tokens is not None and self.max_total_tokens <= 0:
+ raise ValueError('max_total_tokens must be positive.')
+ if not 0 < self.warning_threshold <= 1:
+ raise ValueError('warning_threshold must be between 0 (exclusive) and 1 (inclusive).')
+ if self.critical_remaining_iterations < 0:
+ raise ValueError('critical_remaining_iterations must be non-negative.')
+
+ configured: dict[WarningKind, int | None] = {
+ 'iterations': self.max_iterations,
+ 'context_window': self.max_context_tokens,
+ 'total_tokens': self.max_total_tokens,
+ }
+ if all(v is None for v in configured.values()):
+ raise ValueError('At least one of max_iterations, max_context_tokens, or max_total_tokens must be set.')
+
+ if self.warn_on is None:
+ self._active_kinds = tuple(k for k in _WARNING_ORDER if configured[k] is not None)
+ else:
+ if not self.warn_on:
+ raise ValueError('warn_on must not be empty.')
+ for kind in self.warn_on:
+ if configured[kind] is None:
+ raise ValueError(f'{kind!r} requires its corresponding max_* limit to be configured.')
+ self._active_kinds = tuple(dict.fromkeys(self.warn_on))
+
+ # -- internal helpers --
+
+ @staticmethod
+ def _is_marker_part(part: Any) -> bool:
+ if isinstance(part, SystemPromptPart):
+ return _MARKER in part.content
+ if isinstance(part, UserPromptPart) and isinstance(part.content, str):
+ return _MARKER in part.content
+ return False
+
+ def _strip_old_warnings(self, messages: list[ModelMessage]) -> list[ModelMessage]:
+ cleaned: list[ModelMessage] = []
+ for msg in messages:
+ if not isinstance(msg, ModelRequest):
+ cleaned.append(msg)
+ continue
+ parts = [p for p in msg.parts if not self._is_marker_part(p)]
+ if not parts:
+ continue
+ if len(parts) == len(msg.parts):
+ cleaned.append(msg)
+ else:
+ cleaned.append(ModelRequest(parts=parts))
+ return cleaned
+
+ def _build_iteration_warning(self, ctx: RunContext[AgentDepsT]) -> _Warning | None:
+ if self.max_iterations is None or 'iterations' not in self._active_kinds:
+ return None
+ usage_frac = ctx.usage.requests / self.max_iterations
+ if usage_frac < self.warning_threshold:
+ return None
+ remaining = max(0, self.max_iterations - ctx.usage.requests)
+ severity: Literal['URGENT', 'CRITICAL'] = (
+ 'CRITICAL' if remaining <= self.critical_remaining_iterations else 'URGENT'
+ )
+ details = f'Iterations: {ctx.usage.requests}/{self.max_iterations} requests used ({usage_frac:.0%}); {remaining} remaining.'
+ return _Warning(kind='iterations', severity=severity, details=details)
+
+ def _build_context_warning(self, context_tokens: int) -> _Warning | None:
+ if self.max_context_tokens is None or 'context_window' not in self._active_kinds:
+ return None # pragma: no cover
+ usage_frac = context_tokens / self.max_context_tokens
+ if usage_frac < self.warning_threshold:
+ return None
+ remaining = max(0, self.max_context_tokens - context_tokens)
+ severity: Literal['URGENT', 'CRITICAL'] = 'CRITICAL' if usage_frac >= 1 else 'URGENT'
+ details = f'Context window: {context_tokens}/{self.max_context_tokens} tokens used ({usage_frac:.0%}); {remaining} remaining.'
+ return _Warning(kind='context_window', severity=severity, details=details)
+
+ def _build_total_tokens_warning(self, ctx: RunContext[AgentDepsT]) -> _Warning | None:
+ if self.max_total_tokens is None or 'total_tokens' not in self._active_kinds:
+ return None
+ total = ctx.usage.total_tokens
+ usage_frac = total / self.max_total_tokens
+ if usage_frac < self.warning_threshold:
+ return None
+ remaining = max(0, self.max_total_tokens - total)
+ severity: Literal['URGENT', 'CRITICAL'] = 'CRITICAL' if usage_frac >= 1 else 'URGENT'
+ details = f'Total tokens: {total}/{self.max_total_tokens} used ({usage_frac:.0%}); {remaining} remaining.'
+ return _Warning(kind='total_tokens', severity=severity, details=details)
+
+ @staticmethod
+ def _format_warning(warnings: list[_Warning]) -> str:
+ severity: Literal['URGENT', 'CRITICAL'] = (
+ 'URGENT' if all(w.severity == 'URGENT' for w in warnings) else 'CRITICAL'
+ )
+ guidance = (
+ 'Complete the current task efficiently and avoid unnecessary tool calls.'
+ if severity == 'URGENT'
+ else 'Complete the current task immediately and avoid unnecessary tool calls.'
+ )
+ lines = [_MARKER, f'{severity}: Configured run limits are approaching.']
+ lines.extend(f'- {w.details}' for w in warnings)
+ lines.append(guidance)
+ return '\n'.join(lines)
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Strip old warnings, then inject a new one if thresholds are exceeded."""
+ messages = self._strip_old_warnings(list(request_context.messages))
+
+ active: list[_Warning] = []
+
+ w = self._build_iteration_warning(ctx)
+ if w is not None:
+ active.append(w)
+
+ if self.max_context_tokens is not None and 'context_window' in self._active_kinds:
+ context_tokens = estimate_token_count(messages)
+ w = self._build_context_warning(context_tokens)
+ if w is not None:
+ active.append(w)
+
+ w = self._build_total_tokens_warning(ctx)
+ if w is not None:
+ active.append(w)
+
+ if not active:
+ request_context.messages = messages
+ return request_context
+
+ order = {k: i for i, k in enumerate(_WARNING_ORDER)}
+ active.sort(key=lambda w: order[w.kind])
+ warning_text = self._format_warning(active)
+ messages.append(ModelRequest(parts=[UserPromptPart(content=warning_text)]))
+
+ request_context.messages = messages
+ return request_context
diff --git a/pydantic_ai_harness/experimental/compaction/_shared.py b/pydantic_ai_harness/experimental/compaction/_shared.py
new file mode 100644
index 0000000..2518f87
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_shared.py
@@ -0,0 +1,343 @@
+"""Shared utilities for the compaction capabilities.
+
+Token estimation, the `CompactionStrategy` protocol, tool-pair-safe cutoff logic, first-user
+preservation, and in-place tool-result clearing — anything used by more than one capability.
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, replace
+from typing import Protocol
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.messages import (
+ ModelMessage,
+ ModelRequest,
+ ModelRequestPart,
+ ModelResponse,
+ ModelResponsePart,
+ SystemPromptPart,
+ TextContent,
+ TextPart,
+ ToolCallPart,
+ ToolReturnPart,
+ UserPromptPart,
+)
+from pydantic_ai.tools import RunContext
+
+# ---------------------------------------------------------------------------
+# Token estimation
+# ---------------------------------------------------------------------------
+
+_CHARS_PER_TOKEN = 4
+"""Rough approximation: ~4 characters per token on average."""
+
+
+def _collect_text(messages: Sequence[ModelMessage]) -> list[str]:
+ """Collect all text segments from a sequence of messages."""
+ segments: list[str] = []
+ for msg in messages:
+ if isinstance(msg, ModelRequest):
+ for part in msg.parts:
+ if isinstance(part, UserPromptPart):
+ segments.append(_user_prompt_text_for_counting(part))
+ elif isinstance(part, SystemPromptPart):
+ segments.append(part.content)
+ elif isinstance(part, ToolReturnPart):
+ segments.append(str(part.content))
+ else:
+ for part in msg.parts:
+ if isinstance(part, TextPart):
+ segments.append(part.content)
+ elif isinstance(part, ToolCallPart):
+ segments.append(part.tool_name)
+ segments.append(str(part.args))
+ return segments
+
+
+def _user_prompt_text_for_counting(part: UserPromptPart) -> str:
+ """Extract text content from a user prompt part for counting."""
+ if isinstance(part.content, str):
+ return part.content
+ texts: list[str] = []
+ for item in part.content:
+ if isinstance(item, str):
+ texts.append(item)
+ elif isinstance(item, TextContent):
+ texts.append(item.content)
+ return ''.join(texts)
+
+
+def estimate_token_count(
+ messages: Sequence[ModelMessage],
+ tokenizer: Callable[[str], int] | None = None,
+) -> int:
+ """Approximate token count for a sequence of messages.
+
+ Args:
+ messages: Messages to count tokens for.
+ tokenizer: Optional callable that returns the token count for a string.
+ When ``None``, falls back to a ~4 characters-per-token heuristic.
+ """
+ segments = _collect_text(messages)
+ if tokenizer is not None:
+ return sum(tokenizer(s) for s in segments)
+ return sum(len(s) for s in segments) // _CHARS_PER_TOKEN
+
+
+def exceeds(
+ messages: Sequence[ModelMessage],
+ max_messages: int | None,
+ max_tokens: int | None,
+ tokenizer: Callable[[str], int] | None,
+) -> bool:
+ """Return True if *messages* exceeds either configured size threshold."""
+ if max_messages is not None and len(messages) > max_messages:
+ return True
+ if max_tokens is not None and estimate_token_count(messages, tokenizer) > max_tokens:
+ return True
+ return False
+
+
+# ---------------------------------------------------------------------------
+# Compaction strategy protocol
+# ---------------------------------------------------------------------------
+
+
+class CompactionStrategy(Protocol[AgentDepsT]):
+ """A history transform that can be used standalone or as a `TieredCompaction` tier.
+
+ ``compact`` applies the transform *unconditionally* (the trigger check lives in the
+ capability's ``before_model_request``). Implementations must preserve tool-call /
+ tool-return pairing.
+ """
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]: ... # pragma: no cover
+
+
+# ---------------------------------------------------------------------------
+# Safe cutoff logic — preserves tool-call / tool-return pairs
+# ---------------------------------------------------------------------------
+
+_TOOL_PAIR_SEARCH_RANGE = 5
+"""Number of messages to search around a cutoff point for tool-call pairs."""
+
+
+def _is_safe_cutoff(
+ messages: list[ModelMessage],
+ cutoff: int,
+ search_range: int = _TOOL_PAIR_SEARCH_RANGE,
+) -> bool:
+ """Return True if cutting at *cutoff* does not orphan any tool-call pair.
+
+ A tool-call pair is a ``ToolCallPart`` in a ``ModelResponse`` together with
+ the corresponding ``ToolReturnPart`` in a subsequent ``ModelRequest``. Both
+ sides must end up on the same side of the cut.
+ """
+ if cutoff >= len(messages):
+ return True
+
+ start = max(0, cutoff - search_range)
+ end = min(len(messages), cutoff + search_range)
+
+ for i in range(start, end):
+ msg = messages[i]
+ if not isinstance(msg, ModelResponse):
+ continue
+
+ call_ids: set[str] = set()
+ for part in msg.parts:
+ if isinstance(part, ToolCallPart) and part.tool_call_id:
+ call_ids.add(part.tool_call_id)
+
+ if not call_ids:
+ continue
+
+ for j in range(i + 1, len(messages)):
+ later = messages[j]
+ if not isinstance(later, ModelRequest):
+ continue
+ for rpart in later.parts:
+ if isinstance(rpart, ToolReturnPart) and rpart.tool_call_id in call_ids:
+ call_before = i < cutoff
+ return_before = j < cutoff
+ if call_before != return_before:
+ return False
+
+ return True
+
+
+def find_safe_cutoff(messages: list[ModelMessage], keep: int) -> int:
+ """Find a cutoff index that keeps *keep* tail messages without splitting tool pairs.
+
+ Returns 0 if trimming is unnecessary (fewer messages than *keep*).
+ """
+ if keep == 0:
+ return len(messages)
+ if len(messages) <= keep:
+ return 0
+
+ target = len(messages) - keep
+ for idx in range(target, -1, -1):
+ if _is_safe_cutoff(messages, idx):
+ return idx
+ return 0 # pragma: no cover
+
+
+def find_token_cutoff(
+ messages: list[ModelMessage],
+ target_tokens: int,
+ tokenizer: Callable[[str], int] | None = None,
+) -> int:
+ """Binary-search for a cutoff such that ``messages[cutoff:]`` fits in *target_tokens*.
+
+ Adjusts the result so that no tool-call pairs are orphaned.
+ """
+ if not messages or estimate_token_count(messages, tokenizer) <= target_tokens:
+ return 0
+
+ lo, hi = 0, len(messages)
+ candidate = len(messages)
+
+ while lo < hi:
+ mid = (lo + hi) // 2
+ if estimate_token_count(messages[mid:], tokenizer) <= target_tokens:
+ candidate = mid
+ hi = mid
+ else:
+ lo = mid + 1
+
+ if candidate >= len(messages):
+ candidate = max(0, len(messages) - 1) # pragma: no cover
+
+ # Walk backward to a safe point.
+ for idx in range(candidate, -1, -1):
+ if _is_safe_cutoff(messages, idx):
+ return idx
+ return 0 # pragma: no cover
+
+
+# ---------------------------------------------------------------------------
+# First user message preservation
+# ---------------------------------------------------------------------------
+
+
+def find_first_user_message(messages: list[ModelMessage]) -> ModelRequest | None:
+ """Return the first ``ModelRequest`` that contains a ``UserPromptPart``, or ``None``."""
+ for msg in messages:
+ if isinstance(msg, ModelRequest) and any(isinstance(p, UserPromptPart) for p in msg.parts):
+ return msg
+ return None
+
+
+def prepend_first_user_message(
+ original: list[ModelMessage],
+ cutoff: int,
+ trimmed: list[ModelMessage],
+) -> list[ModelMessage]:
+ """Ensure the first user message from *original* appears in *trimmed*.
+
+ If the first ``ModelRequest`` containing a ``UserPromptPart`` in *original*
+ was discarded (its index is before *cutoff*) and is not already in *trimmed*,
+ prepend it.
+ """
+ first = find_first_user_message(original)
+ if first is None:
+ return trimmed
+ idx = original.index(first)
+ if idx < cutoff and first not in trimmed:
+ return [first, *trimmed]
+ return trimmed
+
+
+# ---------------------------------------------------------------------------
+# Tool-pair inspection and in-place clearing
+# ---------------------------------------------------------------------------
+
+
+_CLEARED_TOOL_ARGS = '{}'
+"""Replacement for cleared tool-call arguments.
+
+Kept JSON-valid: ``ToolCallPart.args_as_json_str()`` returns a ``str`` arg verbatim, so a
+non-JSON placeholder would reach the provider as malformed function arguments.
+"""
+
+
+@dataclass(frozen=True)
+class _ToolPair:
+ """A matched tool call and its return, with the order the return appeared."""
+
+ tool_call_id: str
+ tool_name: str
+ call_part: ToolCallPart
+ order: int
+
+
+def iter_tool_pairs(messages: Sequence[ModelMessage]) -> list[_ToolPair]:
+ """Return matched tool-call / tool-return pairs in return-appearance order."""
+ calls: dict[str, ToolCallPart] = {}
+ for msg in messages:
+ if isinstance(msg, ModelResponse):
+ for part in msg.parts:
+ if isinstance(part, ToolCallPart) and part.tool_call_id:
+ calls[part.tool_call_id] = part
+
+ pairs: list[_ToolPair] = []
+ order = 0
+ for msg in messages:
+ if isinstance(msg, ModelRequest):
+ for part in msg.parts:
+ if isinstance(part, ToolReturnPart) and part.tool_call_id in calls:
+ call = calls[part.tool_call_id]
+ pairs.append(_ToolPair(part.tool_call_id, call.tool_name, call, order))
+ order += 1
+ return pairs
+
+
+def rebuild_with_cleared(
+ messages: Sequence[ModelMessage],
+ clear_return_ids: set[str],
+ clear_input_ids: set[str],
+ placeholder: str,
+) -> list[ModelMessage]:
+ """Return *messages* with selected tool results (and optionally inputs) blanked.
+
+ The ``ToolReturnPart`` / ``ToolCallPart`` are kept in place with placeholder content,
+ so tool-call pairing is never broken. Already-blanked parts are left untouched.
+ """
+ out: list[ModelMessage] = []
+ for msg in messages:
+ if isinstance(msg, ModelRequest):
+ request_parts: list[ModelRequestPart] = []
+ changed = False
+ for part in msg.parts:
+ if (
+ isinstance(part, ToolReturnPart)
+ and part.tool_call_id in clear_return_ids
+ and str(part.content) != placeholder
+ ):
+ request_parts.append(replace(part, content=placeholder))
+ changed = True
+ else:
+ request_parts.append(part)
+ out.append(replace(msg, parts=request_parts) if changed else msg)
+ else:
+ response_parts: list[ModelResponsePart] = []
+ changed = False
+ for part in msg.parts:
+ if (
+ isinstance(part, ToolCallPart)
+ and part.tool_call_id in clear_input_ids
+ and part.args != _CLEARED_TOOL_ARGS
+ ):
+ response_parts.append(replace(part, args=_CLEARED_TOOL_ARGS))
+ changed = True
+ else:
+ response_parts.append(part)
+ out.append(replace(msg, parts=response_parts) if changed else msg)
+ return out
diff --git a/pydantic_ai_harness/experimental/compaction/_sliding_window.py b/pydantic_ai_harness/experimental/compaction/_sliding_window.py
new file mode 100644
index 0000000..be10078
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_sliding_window.py
@@ -0,0 +1,116 @@
+"""`SlidingWindow` — zero-cost trimming of the oldest messages."""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import ModelMessage
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import (
+ exceeds,
+ find_safe_cutoff,
+ find_token_cutoff,
+ prepend_first_user_message,
+)
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import ModelRequestContext
+
+
+@dataclass
+class SlidingWindow(AbstractCapability[AgentDepsT]):
+ """Zero-cost sliding-window trimmer.
+
+ When the conversation exceeds a configurable threshold (message count or
+ estimated token count), the oldest messages are discarded while preserving
+ tool-call / tool-return pairs. No LLM calls are made.
+
+ Trimming happens in ``before_model_request`` so it is transparent to the
+ rest of the agent run.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai_harness.experimental.compaction import SlidingWindow
+
+ agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[SlidingWindow(max_messages=80, keep_messages=40)],
+ )
+ ```
+ """
+
+ max_messages: int | None = None
+ """Trigger trimming when message count reaches this value. ``None`` disables."""
+
+ max_tokens: int | None = None
+ """Trigger trimming when estimated token count reaches this value. ``None`` disables."""
+
+ keep_messages: int = 40
+ """Number of tail messages to retain after trimming (message-count trigger)."""
+
+ keep_tokens: int | None = None
+ """Target token budget after trimming (token-count trigger).
+
+ When ``None``, falls back to ``keep_messages``.
+ """
+
+ tokenizer: Callable[[str], int] | None = None
+ """Optional tokenizer for accurate token counting.
+
+ A callable that returns the token count for a given string.
+ When ``None``, uses a ~4 characters-per-token heuristic.
+ """
+
+ preserve_first_user_message: bool = True
+ """When ``True``, the first ``ModelRequest`` containing a ``UserPromptPart``
+ is always kept after trimming, in addition to system prompts.
+ """
+
+ def __post_init__(self) -> None:
+ if self.max_messages is None and self.max_tokens is None:
+ raise ValueError('At least one of max_messages or max_tokens must be set.')
+ if self.max_messages is not None and self.max_messages < 1:
+ raise ValueError('max_messages must be positive.')
+ if self.max_tokens is not None and self.max_tokens < 1:
+ raise ValueError('max_tokens must be positive.')
+ if self.keep_messages < 0:
+ raise ValueError('keep_messages must be non-negative.')
+ if self.keep_tokens is not None and self.keep_tokens < 0:
+ raise ValueError('keep_tokens must be non-negative.')
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]:
+ """Drop the oldest messages down to the configured tail."""
+ if self.keep_tokens is not None:
+ cutoff = find_token_cutoff(messages, self.keep_tokens, self.tokenizer)
+ else:
+ cutoff = find_safe_cutoff(messages, self.keep_messages)
+
+ if cutoff <= 0:
+ return messages
+
+ trimmed = messages[cutoff:]
+ if self.preserve_first_user_message:
+ trimmed = prepend_first_user_message(messages, cutoff, trimmed)
+ return trimmed
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Trim the message list if it exceeds the configured threshold."""
+ messages: list[ModelMessage] = list(request_context.messages)
+ if not exceeds(messages, self.max_messages, self.max_tokens, self.tokenizer):
+ return request_context
+ request_context.messages = await self.compact(messages, ctx)
+ return request_context
diff --git a/pydantic_ai_harness/experimental/compaction/_summarizing_compaction.py b/pydantic_ai_harness/experimental/compaction/_summarizing_compaction.py
new file mode 100644
index 0000000..7bc94e0
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_summarizing_compaction.py
@@ -0,0 +1,287 @@
+"""`SummarizingCompaction` — LLM-powered summarization of older messages."""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import (
+ ModelMessage,
+ ModelRequest,
+ SystemPromptPart,
+ TextContent,
+ TextPart,
+ ToolCallPart,
+ ToolReturnPart,
+ UserPromptPart,
+)
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import (
+ exceeds,
+ find_first_user_message,
+ find_safe_cutoff,
+ find_token_cutoff,
+)
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import Model, ModelRequestContext
+
+_DEFAULT_SUMMARY_PROMPT = """\
+You are a context summarization assistant. The conversation below will be replaced by \
+your summary, so it must carry everything needed to continue the task.
+
+Write the summary under these exact section headings, omitting a section only if it has \
+no content:
+
+## Intent
+The user's overall goal and any standing constraints or preferences.
+
+## Key decisions
+Choices made and the reasoning, so they are not relitigated.
+
+## Artifacts
+Files, paths, identifiers, commands, and APIs touched — quote exact names.
+
+## Current state
+What is done and what is in progress right now.
+
+## Next steps
+The immediate actions still required to finish the task.
+
+## Open questions
+Unresolved questions or blockers.
+
+Focus on results, not a replay of completed actions. Respond ONLY with the summary — no \
+preamble, no markdown fences.
+
+
+{messages}
+\
+"""
+
+_SUMMARY_PREFIX = 'Summary of previous conversation:\n\n'
+
+
+def _format_messages(messages: Sequence[ModelMessage]) -> str:
+ """Render messages into a human-readable string for summarization."""
+ lines: list[str] = []
+ for msg in messages:
+ if isinstance(msg, ModelRequest):
+ for part in msg.parts:
+ if isinstance(part, UserPromptPart):
+ lines.append(f'User: {_user_prompt_text(part)}')
+ elif isinstance(part, SystemPromptPart):
+ lines.append(f'System: {part.content}')
+ elif isinstance(part, ToolReturnPart):
+ content_str = str(part.content)[:500]
+ if len(str(part.content)) > 500:
+ content_str += '...'
+ lines.append(f'Tool [{part.tool_name}]: {content_str}')
+ else:
+ for part in msg.parts:
+ if isinstance(part, TextPart):
+ lines.append(f'Assistant: {part.content}')
+ elif isinstance(part, ToolCallPart):
+ lines.append(f'Tool Call [{part.tool_name}]: {part.args}')
+ return '\n'.join(lines)
+
+
+def _user_prompt_text(part: UserPromptPart) -> str:
+ """Extract text content from a user prompt part."""
+ if isinstance(part.content, str):
+ return part.content
+ texts: list[str] = []
+ for item in part.content:
+ if isinstance(item, str):
+ texts.append(item)
+ elif isinstance(item, TextContent):
+ texts.append(item.content)
+ return ' '.join(texts) if texts else ''
+
+
+def _extract_system_prompts(messages: list[ModelMessage]) -> list[SystemPromptPart]:
+ """Extract leading system-prompt parts from the conversation."""
+ parts: list[SystemPromptPart] = []
+ for msg in messages:
+ if not isinstance(msg, ModelRequest):
+ break
+ for part in msg.parts:
+ if isinstance(part, SystemPromptPart):
+ parts.append(part)
+ else:
+ return parts
+ return parts
+
+
+def _extract_previous_summary(messages: list[ModelMessage]) -> str | None:
+ """Extract the most recent compaction summary from the message history.
+
+ Looks for a ``SystemPromptPart`` whose content starts with the summary prefix,
+ which indicates it was produced by a prior compaction pass.
+ """
+ for msg in messages:
+ if not isinstance(msg, ModelRequest):
+ continue
+ for part in msg.parts:
+ if isinstance(part, SystemPromptPart) and part.content.startswith(_SUMMARY_PREFIX):
+ return part.content[len(_SUMMARY_PREFIX) :]
+ return None
+
+
+@dataclass
+class SummarizingCompaction(AbstractCapability[AgentDepsT]):
+ """LLM-powered conversation compaction.
+
+ When the conversation exceeds a configurable threshold, older messages are
+ summarized using a dedicated model call and replaced with a compact, structured
+ summary message, preserving recent context and tool-call integrity.
+
+ This is the expensive tier — summarization turns input tokens into (pricier) output
+ tokens — so it is best used behind cheaper passes (see `TieredCompaction`).
+
+ The summary call's usage is folded into the parent run's usage (it counts as a real
+ request), so cost accounting stays honest; note this also increments the run's request
+ count, which a request-count limiter would see.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai_harness.experimental.compaction import SummarizingCompaction
+
+ agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[SummarizingCompaction(
+ model='openai:gpt-4o-mini',
+ max_messages=60,
+ keep_messages=20,
+ )],
+ )
+ ```
+ """
+
+ model: str | Model | None = None
+ """Model used to generate summaries. When ``None``, inherits the running agent's model."""
+
+ max_messages: int | None = None
+ """Trigger compaction when message count exceeds this value."""
+
+ max_tokens: int | None = None
+ """Trigger compaction when estimated token count exceeds this value."""
+
+ keep_messages: int = 20
+ """Number of tail messages to preserve after compaction (message-count trigger)."""
+
+ keep_tokens: int | None = None
+ """Target token budget to preserve after compaction (token-count trigger).
+
+ When ``None``, falls back to ``keep_messages``.
+ """
+
+ summary_prompt: str = _DEFAULT_SUMMARY_PROMPT
+ """Prompt template for generating summaries.
+
+ Must contain a ``{messages}`` placeholder.
+ """
+
+ tokenizer: Callable[[str], int] | None = None
+ """Optional tokenizer for accurate token counting.
+
+ A callable that returns the token count for a given string.
+ When ``None``, uses a ~4 characters-per-token heuristic.
+ """
+
+ preserve_first_user_message: bool = True
+ """When ``True``, the first ``ModelRequest`` containing a ``UserPromptPart``
+ is always kept after compaction, in addition to system prompts.
+ """
+
+ incremental: bool = True
+ """When ``True``, include any existing summary from a prior compaction in the
+ summarization prompt so that it is extended rather than regenerated from scratch.
+ """
+
+ def __post_init__(self) -> None:
+ if self.max_messages is None and self.max_tokens is None:
+ raise ValueError('At least one of max_messages or max_tokens must be set.')
+ if self.max_messages is not None and self.max_messages < 1:
+ raise ValueError('max_messages must be positive.')
+ if self.max_tokens is not None and self.max_tokens < 1:
+ raise ValueError('max_tokens must be positive.')
+ if self.keep_messages < 0:
+ raise ValueError('keep_messages must be non-negative.')
+ if self.keep_tokens is not None and self.keep_tokens < 0:
+ raise ValueError('keep_tokens must be non-negative.')
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]:
+ """Summarize older messages, replacing them with a single summary message."""
+ if self.keep_tokens is not None:
+ cutoff = find_token_cutoff(messages, self.keep_tokens, self.tokenizer)
+ else:
+ cutoff = find_safe_cutoff(messages, self.keep_messages)
+
+ if cutoff <= 0:
+ return messages
+
+ system_parts = _extract_system_prompts(messages)
+ to_summarize = messages[:cutoff]
+ preserved = messages[cutoff:]
+
+ previous_summary = _extract_previous_summary(messages) if self.incremental else None
+ summary = await self._summarize(to_summarize, ctx, previous_summary=previous_summary)
+
+ summary_part = SystemPromptPart(content=f'{_SUMMARY_PREFIX}{summary}')
+ summary_message = ModelRequest(parts=[*system_parts, summary_part])
+
+ first_user: list[ModelMessage] = []
+ if self.preserve_first_user_message:
+ first_user_msg = find_first_user_message(messages)
+ if first_user_msg is not None:
+ idx = messages.index(first_user_msg)
+ if idx < cutoff and first_user_msg not in preserved:
+ first_user = [first_user_msg]
+
+ return [summary_message, *first_user, *preserved]
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Summarize older messages when the threshold is exceeded."""
+ messages: list[ModelMessage] = list(request_context.messages)
+ if not exceeds(messages, self.max_messages, self.max_tokens, self.tokenizer):
+ return request_context
+ request_context.messages = await self.compact(messages, ctx)
+ return request_context
+
+ async def _summarize(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ *,
+ previous_summary: str | None = None,
+ ) -> str:
+ """Generate a summary for the given messages using the configured model."""
+ from pydantic_ai import Agent
+
+ formatted = _format_messages(messages)
+ prompt = self.summary_prompt.format(messages=formatted)
+
+ if previous_summary is not None:
+ prompt = f'{prompt}\n\n\n{previous_summary}\n'
+
+ model = self.model if self.model is not None else ctx.model
+ agent: Agent[None, str] = Agent(
+ model,
+ instructions='You are a context summarization assistant. Extract the most important information from conversations.',
+ )
+ result = await agent.run(prompt, usage=ctx.usage)
+ return result.output.strip()
diff --git a/pydantic_ai_harness/experimental/compaction/_tiered_compaction.py b/pydantic_ai_harness/experimental/compaction/_tiered_compaction.py
new file mode 100644
index 0000000..c3e68f5
--- /dev/null
+++ b/pydantic_ai_harness/experimental/compaction/_tiered_compaction.py
@@ -0,0 +1,95 @@
+"""`TieredCompaction` — escalation orchestrator over a sequence of strategies."""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from pydantic_ai._run_context import AgentDepsT
+from pydantic_ai.capabilities import AbstractCapability
+from pydantic_ai.messages import ModelMessage
+from pydantic_ai.tools import RunContext
+
+from pydantic_ai_harness.experimental.compaction._shared import CompactionStrategy, estimate_token_count
+
+if TYPE_CHECKING:
+ from pydantic_ai.models import ModelRequestContext
+
+
+@dataclass
+class TieredCompaction(AbstractCapability[AgentDepsT]):
+ """Escalation orchestrator over a sequence of compaction strategies.
+
+ Runs each tier in order, re-measuring the token count after each, and stops as soon as
+ the conversation fits ``target_tokens``. Order tiers cheap-to-expensive (e.g. clear
+ tool results, deduplicate reads, then summarize) so the expensive summarization tier is
+ only reached when the cheap passes cannot reclaim enough.
+
+ Each tier's own trigger is bypassed — `TieredCompaction` drives the tiers directly via
+ their ``compact`` method and decides when to stop.
+
+ Example:
+ ```python
+ from pydantic_ai import Agent
+ from pydantic_ai_harness.experimental.compaction import (
+ ClearToolResults,
+ SummarizingCompaction,
+ TieredCompaction,
+ )
+
+ agent = Agent(
+ 'openai:gpt-4o',
+ capabilities=[TieredCompaction(
+ tiers=[
+ ClearToolResults(max_tokens=1),
+ SummarizingCompaction(model='openai:gpt-4o-mini', max_messages=1),
+ ],
+ target_tokens=100_000,
+ )],
+ )
+ ```
+ """
+
+ tiers: Sequence[CompactionStrategy[AgentDepsT]]
+ """Strategies to apply in order, cheap-to-expensive. The last is typically a summarizer."""
+
+ target_tokens: int
+ """Stop escalating once the estimated token count is at or below this value."""
+
+ tokenizer: Callable[[str], int] | None = None
+ """Optional tokenizer for accurate token counting.
+
+ A callable that returns the token count for a given string.
+ When ``None``, uses a ~4 characters-per-token heuristic.
+ """
+
+ def __post_init__(self) -> None:
+ if not self.tiers:
+ raise ValueError('tiers must not be empty.')
+ if self.target_tokens < 1:
+ raise ValueError('target_tokens must be positive.')
+
+ async def compact(
+ self,
+ messages: list[ModelMessage],
+ ctx: RunContext[AgentDepsT],
+ ) -> list[ModelMessage]:
+ """Apply tiers in order until the history fits ``target_tokens`` or tiers run out."""
+ for tier in self.tiers:
+ if estimate_token_count(messages, self.tokenizer) <= self.target_tokens:
+ break
+ messages = await tier.compact(messages, ctx)
+ return messages
+
+ async def before_model_request(
+ self,
+ ctx: RunContext[AgentDepsT],
+ request_context: ModelRequestContext,
+ ) -> ModelRequestContext:
+ """Escalate through the tiers when the conversation exceeds ``target_tokens``."""
+ messages: list[ModelMessage] = list(request_context.messages)
+ if estimate_token_count(messages, self.tokenizer) <= self.target_tokens:
+ return request_context
+ request_context.messages = await self.compact(messages, ctx)
+ return request_context
diff --git a/pyproject.toml b/pyproject.toml
index 7ca3b76..dcc4c38 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -131,6 +131,8 @@ filterwarnings = [
'error',
# DBOS's run_sync triggers this on Python 3.12+ — not our code.
'ignore:There is no current event loop:DeprecationWarning',
+ # Experimental capabilities warn on import by design; assert it explicitly where it matters.
+ 'ignore::pydantic_ai_harness.experimental.HarnessExperimentalWarning',
]
anyio_mode = 'auto'
diff --git a/tests/experimental/__init__.py b/tests/experimental/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/experimental/compaction/__init__.py b/tests/experimental/compaction/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/experimental/compaction/test_compaction.py b/tests/experimental/compaction/test_compaction.py
new file mode 100644
index 0000000..ff3bb6a
--- /dev/null
+++ b/tests/experimental/compaction/test_compaction.py
@@ -0,0 +1,1907 @@
+"""Tests for pydantic_ai_harness.experimental.compaction capabilities."""
+
+from __future__ import annotations
+
+import dataclasses
+from typing import Any
+from unittest.mock import AsyncMock, patch
+
+import pytest
+from pydantic_ai.messages import (
+ ModelMessage,
+ ModelRequest,
+ ModelResponse,
+ SystemPromptPart,
+ TextPart,
+ ToolCallPart,
+ ToolReturnPart,
+ UserPromptPart,
+)
+from pydantic_ai.models import ModelRequestContext, ModelRequestParameters
+from pydantic_ai.usage import RunUsage
+
+from pydantic_ai_harness.experimental.compaction import (
+ ClearToolResults,
+ DeduplicateFileReads,
+ LimitWarner,
+ SlidingWindow,
+ SummarizingCompaction,
+ TieredCompaction,
+ estimate_token_count,
+)
+from pydantic_ai_harness.experimental.compaction._shared import (
+ _is_safe_cutoff,
+ find_first_user_message,
+ find_safe_cutoff,
+ find_token_cutoff,
+ iter_tool_pairs,
+ prepend_first_user_message,
+)
+from pydantic_ai_harness.experimental.compaction._summarizing_compaction import (
+ _SUMMARY_PREFIX,
+ _extract_previous_summary,
+ _extract_system_prompts,
+ _format_messages,
+)
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_ctx(
+ *,
+ requests: int = 0,
+ input_tokens: int = 0,
+ output_tokens: int = 0,
+) -> Any:
+ """Build a minimal RunContext-like object for testing hooks."""
+
+ @dataclasses.dataclass
+ class _FakeModel:
+ model_id: str = 'test-model'
+
+ usage = RunUsage(requests=requests, input_tokens=input_tokens, output_tokens=output_tokens)
+
+ @dataclasses.dataclass
+ class _FakeCtx:
+ usage: RunUsage
+ model: Any = dataclasses.field(default_factory=_FakeModel)
+ deps: None = None
+
+ return _FakeCtx(usage=usage)
+
+
+def _make_request_context(messages: list[ModelMessage]) -> ModelRequestContext:
+ """Build a ModelRequestContext wrapping the given messages."""
+
+ @dataclasses.dataclass
+ class _FakeModel:
+ model_id: str = 'test-model'
+
+ return ModelRequestContext(
+ model=_FakeModel(), # type: ignore[arg-type]
+ messages=messages,
+ model_settings=None,
+ model_request_parameters=ModelRequestParameters(),
+ )
+
+
+def _user(text: str) -> ModelRequest:
+ return ModelRequest(parts=[UserPromptPart(content=text)])
+
+
+def _assistant(text: str) -> ModelResponse:
+ return ModelResponse(parts=[TextPart(content=text)])
+
+
+def _tool_call(tool_name: str, call_id: str) -> ModelResponse:
+ return ModelResponse(parts=[ToolCallPart(tool_name=tool_name, args='{}', tool_call_id=call_id)])
+
+
+def _tool_return(tool_name: str, call_id: str, content: str = 'ok') -> ModelRequest:
+ return ModelRequest(parts=[ToolReturnPart(tool_name=tool_name, content=content, tool_call_id=call_id)])
+
+
+# ---------------------------------------------------------------------------
+# estimate_token_count
+# ---------------------------------------------------------------------------
+
+
+class TestEstimateTokenCount:
+ def test_empty(self):
+ assert estimate_token_count([]) == 0
+
+ def test_user_message(self):
+ msgs: list[ModelMessage] = [_user('hello world')] # 11 chars => 2 tokens
+ assert estimate_token_count(msgs) == 11 // 4
+
+ def test_system_prompt(self):
+ msgs: list[ModelMessage] = [ModelRequest(parts=[SystemPromptPart(content='x' * 100)])]
+ assert estimate_token_count(msgs) == 25
+
+ def test_assistant_text(self):
+ msgs: list[ModelMessage] = [_assistant('y' * 80)]
+ assert estimate_token_count(msgs) == 20
+
+ def test_tool_call_and_return(self):
+ msgs: list[ModelMessage] = [
+ _tool_call('search', 'tc1'),
+ _tool_return('search', 'tc1', 'result text here'),
+ ]
+ assert estimate_token_count(msgs) > 0
+
+
+# ---------------------------------------------------------------------------
+# _is_safe_cutoff
+# ---------------------------------------------------------------------------
+
+
+class TestIsSafeCutoff:
+ def test_cutoff_beyond_end(self):
+ msgs: list[ModelMessage] = [_user('a'), _assistant('b')]
+ assert _is_safe_cutoff(msgs, 10) is True
+
+ def test_no_tool_pairs(self):
+ msgs: list[ModelMessage] = [_user('a'), _assistant('b'), _user('c')]
+ assert _is_safe_cutoff(msgs, 1) is True
+
+ def test_safe_when_both_sides_kept(self):
+ msgs: list[ModelMessage] = [
+ _user('a'),
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('b'),
+ ]
+ # Cutting before the tool pair (index 0) is safe: both call and return are kept.
+ assert _is_safe_cutoff(msgs, 0) is True
+
+ def test_unsafe_when_splitting_pair(self):
+ msgs: list[ModelMessage] = [
+ _user('a'),
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('b'),
+ ]
+ # Cutting at index 2: call (idx 1) is before cutoff, return (idx 2) is at cutoff (after).
+ assert _is_safe_cutoff(msgs, 2) is False
+
+ def test_safe_when_pair_entirely_discarded(self):
+ msgs: list[ModelMessage] = [
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('a'),
+ _assistant('b'),
+ ]
+ # Cutting at 2: both call and return are before cutoff (discarded together).
+ assert _is_safe_cutoff(msgs, 2) is True
+
+
+# ---------------------------------------------------------------------------
+# find_safe_cutoff
+# ---------------------------------------------------------------------------
+
+
+class TestFindSafeCutoff:
+ def test_keep_zero_returns_length(self):
+ msgs: list[ModelMessage] = [_user('a'), _assistant('b')]
+ assert find_safe_cutoff(msgs, 0) == 2
+
+ def test_fewer_messages_than_keep(self):
+ msgs: list[ModelMessage] = [_user('a')]
+ assert find_safe_cutoff(msgs, 5) == 0
+
+ def test_normal_cutoff(self):
+ msgs: list[ModelMessage] = [_user('a'), _assistant('b'), _user('c'), _assistant('d')]
+ # Keep 2 => target cutoff is 2.
+ assert find_safe_cutoff(msgs, 2) == 2
+
+ def test_adjusts_for_tool_pair(self):
+ msgs: list[ModelMessage] = [
+ _user('a'),
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('b'),
+ _assistant('c'),
+ ]
+ # Keep 3 => target cutoff is 2, but that splits the tool pair.
+ # Should adjust to 1 (keep tool call and return together).
+ cutoff = find_safe_cutoff(msgs, 3)
+ assert cutoff == 1
+
+
+# ---------------------------------------------------------------------------
+# find_token_cutoff
+# ---------------------------------------------------------------------------
+
+
+class TestFindTokenCutoff:
+ def test_already_within_budget(self):
+ msgs: list[ModelMessage] = [_user('hi')]
+ assert find_token_cutoff(msgs, 999999) == 0
+
+ def test_empty(self):
+ assert find_token_cutoff([], 100) == 0
+
+ def test_trims_to_budget(self):
+ # Each message contributes ~3 tokens (12 chars / 4).
+ msgs: list[ModelMessage] = [_user('x' * 12) for _ in range(20)]
+ cutoff = find_token_cutoff(msgs, 30) # Budget for ~10 messages.
+ assert cutoff > 0
+ remaining = msgs[cutoff:]
+ assert estimate_token_count(remaining) <= 30
+
+ def test_walks_back_over_tool_pair(self):
+ # The token-fit cutoff lands between a tool call and its return; the backward
+ # walk must skip to a safe index that keeps the pair together.
+ msgs: list[ModelMessage] = [
+ _user('a' * 8),
+ _tool_call('fn', 'tc1'), # contributes 'fn' + '{}' = 4 tokens
+ _tool_return('fn', 'tc1', 'b' * 4),
+ _user('c' * 4),
+ ]
+ # messages[2:] = 8 tokens (fits), messages[1:] = 12 (does not) -> candidate is 2,
+ # which splits the pair, so it walks back to 1.
+ assert find_token_cutoff(msgs, 8, tokenizer=len) == 1
+
+
+# ---------------------------------------------------------------------------
+# SlidingWindow
+# ---------------------------------------------------------------------------
+
+
+class TestSlidingWindow:
+ def test_validation_no_trigger(self):
+ with pytest.raises(ValueError, match='At least one of max_messages or max_tokens must be set'):
+ SlidingWindow()
+
+ def test_validation_negative_max_messages(self):
+ with pytest.raises(ValueError, match='max_messages must be positive'):
+ SlidingWindow(max_messages=0)
+
+ def test_validation_negative_max_tokens(self):
+ with pytest.raises(ValueError, match='max_tokens must be positive'):
+ SlidingWindow(max_tokens=-1)
+
+ def test_validation_negative_keep_messages(self):
+ with pytest.raises(ValueError, match='keep_messages must be non-negative'):
+ SlidingWindow(max_messages=10, keep_messages=-1)
+
+ def test_validation_negative_keep_tokens(self):
+ with pytest.raises(ValueError, match='keep_tokens must be non-negative'):
+ SlidingWindow(max_messages=10, keep_tokens=-1)
+
+ @pytest.mark.anyio
+ async def test_no_trim_below_threshold(self):
+ sw = SlidingWindow(max_messages=10, keep_messages=5)
+ messages: list[ModelMessage] = [_user('a'), _assistant('b')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 2
+
+ @pytest.mark.anyio
+ async def test_trims_when_above_message_threshold(self):
+ sw = SlidingWindow(max_messages=5, keep_messages=3, preserve_first_user_message=False)
+ messages: list[ModelMessage] = [_user(f'msg-{i}') for i in range(8)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) <= 3
+
+ @pytest.mark.anyio
+ async def test_trims_by_token_threshold(self):
+ sw = SlidingWindow(max_tokens=10, keep_messages=2)
+ messages: list[ModelMessage] = [_user('x' * 40) for _ in range(5)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) < 5
+
+ @pytest.mark.anyio
+ async def test_preserves_tool_pairs(self):
+ sw = SlidingWindow(max_messages=4, keep_messages=2)
+ messages: list[ModelMessage] = [
+ _user('start'),
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('end'),
+ _assistant('done'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ # Should not split the tool pair.
+ assert _orphan_free(result.messages)
+
+ @pytest.mark.anyio
+ async def test_keep_tokens_mode(self):
+ sw = SlidingWindow(max_messages=3, keep_tokens=10, preserve_first_user_message=False)
+ # Each message = 20 chars = 5 tokens. Total = 50 tokens.
+ messages: list[ModelMessage] = [_user('x' * 20) for _ in range(10)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert estimate_token_count(result.messages) <= 10
+ assert len(result.messages) < 10
+
+
+# ---------------------------------------------------------------------------
+# LimitWarner
+# ---------------------------------------------------------------------------
+
+
+class TestLimitWarner:
+ def test_validation_no_limits(self):
+ with pytest.raises(ValueError, match='At least one of'):
+ LimitWarner()
+
+ def test_validation_negative_max_iterations(self):
+ with pytest.raises(ValueError, match='max_iterations must be positive'):
+ LimitWarner(max_iterations=-1)
+
+ def test_validation_negative_max_context_tokens(self):
+ with pytest.raises(ValueError, match='max_context_tokens must be positive'):
+ LimitWarner(max_context_tokens=0)
+
+ def test_validation_negative_max_total_tokens(self):
+ with pytest.raises(ValueError, match='max_total_tokens must be positive'):
+ LimitWarner(max_total_tokens=-5)
+
+ def test_validation_bad_threshold(self):
+ with pytest.raises(ValueError, match='warning_threshold'):
+ LimitWarner(max_iterations=10, warning_threshold=0)
+
+ def test_validation_negative_critical_remaining(self):
+ with pytest.raises(ValueError, match='critical_remaining_iterations'):
+ LimitWarner(max_iterations=10, critical_remaining_iterations=-1)
+
+ def test_validation_empty_warn_on(self):
+ with pytest.raises(ValueError, match='warn_on must not be empty'):
+ LimitWarner(max_iterations=10, warn_on=[])
+
+ def test_validation_warn_on_without_limit(self):
+ with pytest.raises(ValueError, match="'total_tokens' requires"):
+ LimitWarner(max_iterations=10, warn_on=['total_tokens'])
+
+ @pytest.mark.anyio
+ async def test_no_warning_below_threshold(self):
+ lw = LimitWarner(max_iterations=100)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=10)
+ result = await lw.before_model_request(ctx, rc)
+ # No warning appended.
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_iteration_warning_urgent(self):
+ lw = LimitWarner(max_iterations=20, warning_threshold=0.7, critical_remaining_iterations=3)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ # 15/20 = 75% usage, 5 remaining > critical_remaining_iterations=3 => URGENT.
+ ctx = _make_ctx(requests=15)
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 2
+ last = result.messages[-1]
+ assert isinstance(last, ModelRequest)
+ text = last.parts[0]
+ assert isinstance(text, UserPromptPart)
+ assert isinstance(text.content, str)
+ assert 'URGENT' in text.content
+ assert '[LimitWarner]' in text.content
+
+ @pytest.mark.anyio
+ async def test_iteration_warning_critical(self):
+ lw = LimitWarner(max_iterations=10, warning_threshold=0.7, critical_remaining_iterations=3)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=9) # 1 remaining.
+ result = await lw.before_model_request(ctx, rc)
+ last = result.messages[-1]
+ assert isinstance(last, ModelRequest)
+ text = last.parts[0]
+ assert isinstance(text, UserPromptPart)
+ assert isinstance(text.content, str)
+ assert 'CRITICAL' in text.content
+
+ @pytest.mark.anyio
+ async def test_context_window_warning(self):
+ lw = LimitWarner(max_context_tokens=10)
+ # Create a message that exceeds 70% of 10 tokens.
+ messages: list[ModelMessage] = [_user('x' * 40)] # ~10 tokens.
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 2
+
+ @pytest.mark.anyio
+ async def test_total_tokens_warning(self):
+ lw = LimitWarner(max_total_tokens=100)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(input_tokens=50, output_tokens=30) # 80 total.
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 2
+
+ @pytest.mark.anyio
+ async def test_strips_old_warnings(self):
+ lw = LimitWarner(max_iterations=10, warning_threshold=0.7)
+ old_warning = ModelRequest(parts=[UserPromptPart(content='[LimitWarner]\nOld warning')])
+ messages: list[ModelMessage] = [_user('hi'), old_warning]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5) # Below threshold.
+ result = await lw.before_model_request(ctx, rc)
+ # Old warning removed, no new warning added (below threshold).
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_multiple_warnings_ordered(self):
+ lw = LimitWarner(max_iterations=10, max_total_tokens=100)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=8, input_tokens=50, output_tokens=30)
+ result = await lw.before_model_request(ctx, rc)
+ last = result.messages[-1]
+ assert isinstance(last, ModelRequest)
+ text = last.parts[0]
+ assert isinstance(text, UserPromptPart)
+ assert isinstance(text.content, str)
+ # Iterations should come before total_tokens.
+ assert text.content.index('Iterations') < text.content.index('Total tokens')
+
+
+# ---------------------------------------------------------------------------
+# Compaction
+# ---------------------------------------------------------------------------
+
+
+class TestCompaction:
+ def test_validation_no_trigger(self):
+ with pytest.raises(ValueError, match='At least one of max_messages or max_tokens must be set'):
+ SummarizingCompaction(model='test', max_messages=None, max_tokens=None)
+
+ def test_validation_negative_max_messages(self):
+ with pytest.raises(ValueError, match='max_messages must be positive'):
+ SummarizingCompaction(model='test', max_messages=0)
+
+ def test_validation_negative_max_tokens(self):
+ with pytest.raises(ValueError, match='max_tokens must be positive'):
+ SummarizingCompaction(model='test', max_tokens=-1)
+
+ def test_validation_negative_keep_messages(self):
+ with pytest.raises(ValueError, match='keep_messages must be non-negative'):
+ SummarizingCompaction(model='test', max_messages=10, keep_messages=-1)
+
+ def test_validation_negative_keep_tokens(self):
+ with pytest.raises(ValueError, match='keep_tokens must be non-negative'):
+ SummarizingCompaction(model='test', max_messages=10, keep_tokens=-1)
+
+ @pytest.mark.anyio
+ async def test_no_compaction_below_threshold(self):
+ comp = SummarizingCompaction(model='test', max_messages=100)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await comp.before_model_request(ctx, rc)
+ assert result.messages == messages
+
+ @pytest.mark.anyio
+ async def test_compaction_replaces_old_messages(self):
+ comp = SummarizingCompaction(model='test:m', max_messages=3, keep_messages=1, preserve_first_user_message=False)
+ messages: list[ModelMessage] = [
+ _user('first'),
+ _assistant('response 1'),
+ _user('second'),
+ _assistant('response 2'),
+ _user('third'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Summary of conversation.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ # Should have summary message + 1 kept message.
+ assert len(result.messages) == 2
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ # The summary should be in a SystemPromptPart.
+ sys_parts = [p for p in first_msg.parts if isinstance(p, SystemPromptPart)]
+ assert len(sys_parts) >= 1
+ assert 'Summary of conversation.' in sys_parts[-1].content
+
+ @pytest.mark.anyio
+ async def test_compaction_preserves_system_prompts(self):
+ comp = SummarizingCompaction(model='test:m', max_messages=3, keep_messages=1)
+ messages: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='You are a helpful assistant.')]),
+ _user('first'),
+ _assistant('response 1'),
+ _user('second'),
+ _assistant('response 2'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'A summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ # Should have the original system prompt preserved.
+ sys_contents = [p.content for p in first_msg.parts if isinstance(p, SystemPromptPart)]
+ assert 'You are a helpful assistant.' in sys_contents
+
+ @pytest.mark.anyio
+ async def test_compaction_preserves_tool_pairs(self):
+ comp = SummarizingCompaction(model='test:m', max_messages=4, keep_messages=2)
+ messages: list[ModelMessage] = [
+ _user('start'),
+ _tool_call('fn', 'tc1'),
+ _tool_return('fn', 'tc1'),
+ _user('middle'),
+ _assistant('response'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ # Tool pairs in remaining messages should be intact.
+ assert _orphan_free(result.messages)
+
+ @pytest.mark.anyio
+ async def test_compaction_token_trigger(self):
+ comp = SummarizingCompaction(model='test:m', max_tokens=5, keep_messages=1)
+ messages: list[ModelMessage] = [_user('x' * 40) for _ in range(5)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Token-based summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ assert len(result.messages) >= 1
+ # Summary message should exist.
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+
+ @pytest.mark.anyio
+ async def test_compaction_keep_tokens_mode(self):
+ comp = SummarizingCompaction(model='test:m', max_messages=3, keep_tokens=5)
+ messages: list[ModelMessage] = [_user('x' * 40) for _ in range(5)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Token-keep summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ assert len(result.messages) >= 1
+
+
+# ---------------------------------------------------------------------------
+# _format_messages
+# ---------------------------------------------------------------------------
+
+
+class TestFormatMessages:
+ def test_user_and_assistant(self):
+ msgs: list[ModelMessage] = [_user('hi'), _assistant('hello')]
+ text = _format_messages(msgs)
+ assert 'User: hi' in text
+ assert 'Assistant: hello' in text
+
+ def test_system_prompt(self):
+ msgs: list[ModelMessage] = [ModelRequest(parts=[SystemPromptPart(content='be helpful')])]
+ text = _format_messages(msgs)
+ assert 'System: be helpful' in text
+
+ def test_tool_call_and_return(self):
+ msgs: list[ModelMessage] = [
+ _tool_call('search', 'tc1'),
+ _tool_return('search', 'tc1', 'found it'),
+ ]
+ text = _format_messages(msgs)
+ assert 'Tool Call [search]' in text
+ assert 'Tool [search]: found it' in text
+
+ def test_long_tool_return_truncated(self):
+ msgs: list[ModelMessage] = [_tool_return('fn', 'tc1', 'x' * 600)]
+ text = _format_messages(msgs)
+ assert '...' in text
+
+
+# ---------------------------------------------------------------------------
+# _extract_system_prompts
+# ---------------------------------------------------------------------------
+
+
+class TestExtractSystemPrompts:
+ def test_extracts_leading_system_parts(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='sys1')]),
+ _user('hi'),
+ ]
+ parts = _extract_system_prompts(msgs)
+ assert len(parts) == 1
+ assert parts[0].content == 'sys1'
+
+ def test_stops_at_non_system(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='sys1'), UserPromptPart(content='hi')]),
+ ]
+ parts = _extract_system_prompts(msgs)
+ assert len(parts) == 1
+
+ def test_empty_when_no_system(self):
+ msgs: list[ModelMessage] = [_user('hi')]
+ parts = _extract_system_prompts(msgs)
+ assert parts == []
+
+ def test_stops_at_non_request(self):
+ msgs: list[ModelMessage] = [_assistant('hello'), _user('hi')]
+ parts = _extract_system_prompts(msgs)
+ assert parts == []
+
+
+# ---------------------------------------------------------------------------
+# Package-level exports
+# ---------------------------------------------------------------------------
+
+
+class TestExports:
+ def test_exposed_under_experimental_only(self):
+ import pydantic_ai_harness
+ import pydantic_ai_harness.experimental.compaction as compaction
+
+ names = [
+ 'SlidingWindow',
+ 'ClearToolResults',
+ 'DeduplicateFileReads',
+ 'LimitWarner',
+ 'SummarizingCompaction',
+ 'TieredCompaction',
+ ]
+ for name in names:
+ # Available from the experimental package...
+ assert hasattr(compaction, name)
+ # ...and deliberately NOT from the top-level namespace.
+ assert not hasattr(pydantic_ai_harness, name)
+
+
+# ---------------------------------------------------------------------------
+# Additional coverage — multi-modal content, edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestUserPromptMultiModal:
+ """Cover _user_prompt_text_for_counting and _user_prompt_text for non-string UserContent."""
+
+ def test_estimate_with_text_content_parts(self):
+ from pydantic_ai.messages import TextContent
+
+ part = UserPromptPart(content=[TextContent(content='hello')])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ # 5 chars / 4 = 1 token.
+ assert estimate_token_count(msgs) == 1
+
+ def test_estimate_with_str_content_parts(self):
+ """UserContent can also be plain str items in a sequence."""
+ part = UserPromptPart(content=['hello', 'world'])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ # 10 chars / 4 = 2 tokens.
+ assert estimate_token_count(msgs) == 2
+
+ def test_format_with_text_content(self):
+ from pydantic_ai.messages import TextContent
+
+ part = UserPromptPart(content=[TextContent(content='multi-part')])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ text = _format_messages(msgs)
+ assert 'User: multi-part' in text
+
+ def test_format_with_str_content(self):
+ part = UserPromptPart(content=['one', 'two'])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ text = _format_messages(msgs)
+ assert 'User: one two' in text
+
+ def test_format_empty_sequence(self):
+ part = UserPromptPart(content=[])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ text = _format_messages(msgs)
+ assert 'User: ' in text
+
+
+class TestLimitWarnerEdgeCases:
+ """Cover LimitWarner edge cases for marker detection and stripping."""
+
+ @pytest.mark.anyio
+ async def test_strip_warning_with_only_marker_message(self):
+ """A message composed entirely of a marker part should be removed."""
+ lw = LimitWarner(max_iterations=100)
+ marker_msg = ModelRequest(parts=[UserPromptPart(content='[LimitWarner]\nold')])
+ messages: list[ModelMessage] = [_user('real'), marker_msg]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5)
+ result = await lw.before_model_request(ctx, rc)
+ # Marker message should be stripped; only the real message remains.
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_strip_warning_system_prompt_marker(self):
+ """Marker in a SystemPromptPart should also be detected."""
+ lw = LimitWarner(max_iterations=100)
+ marker_msg = ModelRequest(parts=[SystemPromptPart(content='[LimitWarner]\nold')])
+ messages: list[ModelMessage] = [_user('real'), marker_msg]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5)
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_strip_mixed_parts_keeps_non_marker(self):
+ """A message with both marker and non-marker parts should keep the non-marker parts."""
+ lw = LimitWarner(max_iterations=100)
+ mixed = ModelRequest(
+ parts=[
+ UserPromptPart(content='keep this'),
+ UserPromptPart(content='[LimitWarner]\nremove this'),
+ ]
+ )
+ messages: list[ModelMessage] = [mixed]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5)
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+ first = result.messages[0]
+ assert isinstance(first, ModelRequest)
+ assert len(first.parts) == 1
+
+ @pytest.mark.anyio
+ async def test_context_warning_below_threshold(self):
+ """Context window should not warn when below threshold."""
+ lw = LimitWarner(max_context_tokens=1000)
+ messages: list[ModelMessage] = [_user('hi')] # ~0.5 tokens, well below 70%.
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_total_tokens_warning_critical(self):
+ """Total tokens at or above limit should produce CRITICAL."""
+ lw = LimitWarner(max_total_tokens=100)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(input_tokens=60, output_tokens=50) # 110 total, above limit.
+ result = await lw.before_model_request(ctx, rc)
+ last = result.messages[-1]
+ assert isinstance(last, ModelRequest)
+ text = last.parts[0]
+ assert isinstance(text, UserPromptPart)
+ assert isinstance(text.content, str)
+ assert 'CRITICAL' in text.content
+
+ @pytest.mark.anyio
+ async def test_context_window_critical(self):
+ """Context window at or above limit should produce CRITICAL."""
+ lw = LimitWarner(max_context_tokens=5)
+ messages: list[ModelMessage] = [_user('x' * 40)] # ~10 tokens, well above 5.
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await lw.before_model_request(ctx, rc)
+ last = result.messages[-1]
+ assert isinstance(last, ModelRequest)
+ text = last.parts[0]
+ assert isinstance(text, UserPromptPart)
+ assert isinstance(text.content, str)
+ assert 'CRITICAL' in text.content
+
+ def test_warn_on_subset(self):
+ """Can configure warn_on to only include specific limits."""
+ lw = LimitWarner(max_iterations=10, max_total_tokens=100, warn_on=['iterations'])
+ assert lw._active_kinds == ('iterations',)
+
+
+class TestCompactionEdgeCases:
+ """Cover Compaction edge cases."""
+
+ @pytest.mark.anyio
+ async def test_compaction_cutoff_zero_no_change(self):
+ """When cutoff is 0, no compaction should occur (messages all kept)."""
+ comp = SummarizingCompaction(model='test:m', max_messages=2, keep_messages=10)
+ # Only 3 messages, keep_messages=10 means cutoff=0.
+ messages: list[ModelMessage] = [_user('a'), _assistant('b'), _user('c')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await comp.before_model_request(ctx, rc)
+ assert len(result.messages) == 3
+
+
+class TestSlidingWindowEdgeCases:
+ """Cover SlidingWindow edge cases."""
+
+ @pytest.mark.anyio
+ async def test_cutoff_zero_no_trim(self):
+ """When the cutoff resolves to 0, messages should not be trimmed."""
+ sw = SlidingWindow(max_messages=2, keep_messages=10)
+ # 3 messages, but keep_messages=10 => cutoff=0.
+ messages: list[ModelMessage] = [_user('a'), _assistant('b'), _user('c')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 3
+
+ @pytest.mark.anyio
+ async def test_token_not_triggered_when_below(self):
+ """Token trigger should not fire below threshold."""
+ sw = SlidingWindow(max_tokens=999999, keep_messages=2)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+
+
+class TestLimitWarnerMarkerDetection:
+ """Cover _is_marker_part return False for non-text parts."""
+
+ @pytest.mark.anyio
+ async def test_non_string_user_prompt_not_detected_as_marker(self):
+ """UserPromptPart with non-string content should not match marker."""
+ lw = LimitWarner(max_iterations=100)
+ # Create a ModelRequest with a ToolReturnPart (not a marker).
+ messages: list[ModelMessage] = [
+ _user('real'),
+ _tool_return('fn', 'tc1', 'some result'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5)
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 2
+
+ @pytest.mark.anyio
+ async def test_strip_preserves_model_responses(self):
+ """ModelResponse messages pass through strip unchanged."""
+ lw = LimitWarner(max_iterations=100)
+ messages: list[ModelMessage] = [
+ _user('hi'),
+ _assistant('response'),
+ ModelRequest(parts=[UserPromptPart(content='[LimitWarner]\nold')]),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(requests=5)
+ result = await lw.before_model_request(ctx, rc)
+ # Marker message removed; user and assistant remain.
+ assert len(result.messages) == 2
+ assert isinstance(result.messages[1], ModelResponse)
+
+
+class TestLimitWarnerTotalTokensBelowThreshold:
+ """Cover _build_total_tokens_warning returning None when below threshold."""
+
+ @pytest.mark.anyio
+ async def test_total_tokens_below_threshold(self):
+ lw = LimitWarner(max_total_tokens=1000)
+ messages: list[ModelMessage] = [_user('hi')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx(input_tokens=10, output_tokens=10) # 20 total, 2% of 1000.
+ result = await lw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1 # No warning.
+
+
+# ---------------------------------------------------------------------------
+# Tokenizer parameter
+# ---------------------------------------------------------------------------
+
+
+class TestTokenizerParameter:
+ """Tests for the optional tokenizer parameter on estimate_token_count,
+ SlidingWindow, and Compaction."""
+
+ def test_estimate_token_count_with_tokenizer(self):
+ """Custom tokenizer should override the heuristic."""
+ msgs: list[ModelMessage] = [_user('hello world')]
+ # Heuristic: 11 chars / 4 = 2 tokens.
+ assert estimate_token_count(msgs) == 2
+ # Custom tokenizer: count words instead.
+ assert estimate_token_count(msgs, tokenizer=lambda s: len(s.split())) == 2
+
+ def test_estimate_token_count_tokenizer_called_per_segment(self):
+ """Tokenizer is called once per text segment, results are summed."""
+ calls: list[str] = []
+
+ def tracking_tokenizer(s: str) -> int:
+ calls.append(s)
+ return 10
+
+ msgs: list[ModelMessage] = [_user('a'), _assistant('b')]
+ result = estimate_token_count(msgs, tokenizer=tracking_tokenizer)
+ assert result == 20
+ assert len(calls) == 2
+
+ @pytest.mark.anyio
+ async def test_sliding_window_with_tokenizer(self):
+ """SlidingWindow should use the tokenizer for token-based triggers."""
+ # Custom tokenizer: 1 token per character.
+ sw = SlidingWindow(
+ max_tokens=10,
+ keep_tokens=5,
+ tokenizer=lambda s: len(s),
+ preserve_first_user_message=False,
+ )
+ # Each message has 4 chars = 4 tokens with this tokenizer. 5 messages = 20 tokens.
+ messages: list[ModelMessage] = [_user('abcd') for _ in range(5)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ # With keep_tokens=5 and 4 tokens per message, should keep 1 message.
+ remaining_tokens = estimate_token_count(result.messages, tokenizer=lambda s: len(s))
+ assert remaining_tokens <= 5
+
+ @pytest.mark.anyio
+ async def test_sliding_window_tokenizer_threshold_check(self):
+ """SlidingWindow tokenizer should be used for the trigger check."""
+ # Tokenizer that inflates counts: 100 tokens per char.
+ sw = SlidingWindow(
+ max_tokens=50,
+ keep_messages=1,
+ tokenizer=lambda s: len(s) * 100,
+ preserve_first_user_message=False,
+ )
+ # 2 chars * 100 = 200 tokens per message. Only 1 message but still > 50.
+ messages: list[ModelMessage] = [_user('ab'), _user('cd')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_compaction_with_tokenizer(self):
+ """Compaction should use the tokenizer for token-based triggers."""
+ # Tokenizer: 1 token per char.
+ comp = SummarizingCompaction(
+ model='test:m',
+ max_tokens=10,
+ keep_messages=1,
+ tokenizer=lambda s: len(s),
+ preserve_first_user_message=False,
+ incremental=False,
+ )
+ # Each message: 'abcde' = 5 chars = 5 tokens. 4 messages = 20 tokens > 10.
+ messages: list[ModelMessage] = [_user('abcde') for _ in range(4)]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Token summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ # Should have triggered compaction.
+ assert len(result.messages) >= 1
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ sys_parts = [p for p in first_msg.parts if isinstance(p, SystemPromptPart)]
+ assert any('Token summary.' in p.content for p in sys_parts)
+
+ def testfind_token_cutoff_with_tokenizer(self):
+ """find_token_cutoff should use the tokenizer."""
+ messages: list[ModelMessage] = [_user('abcde') for _ in range(10)]
+ # Tokenizer: 1 token per char. Each message = 5 tokens.
+ cutoff = find_token_cutoff(messages, 15, tokenizer=lambda s: len(s))
+ remaining = messages[cutoff:]
+ assert estimate_token_count(remaining, tokenizer=lambda s: len(s)) <= 15
+
+
+# ---------------------------------------------------------------------------
+# Preserve first user message
+# ---------------------------------------------------------------------------
+
+
+class TestPreserveFirstUserMessage:
+ """Tests for the preserve_first_user_message parameter."""
+
+ def testfind_first_user_message_found(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='sys')]),
+ _user('first'),
+ _user('second'),
+ ]
+ result = find_first_user_message(msgs)
+ assert result is not None
+ assert isinstance(result.parts[0], UserPromptPart)
+ assert result.parts[0].content == 'first'
+
+ def testfind_first_user_message_none(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='sys')]),
+ _assistant('hello'),
+ ]
+ assert find_first_user_message(msgs) is None
+
+ @pytest.mark.anyio
+ async def test_sliding_window_preserves_first_user(self):
+ sw = SlidingWindow(max_messages=3, keep_messages=2, preserve_first_user_message=True)
+ messages: list[ModelMessage] = [
+ _user('original task'),
+ _assistant('got it'),
+ _user('follow-up 1'),
+ _assistant('done'),
+ _user('follow-up 2'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ # The first user message ('original task') should be preserved even though
+ # it was outside the keep window.
+ assert 'original task' in _user_texts(result.messages)
+
+ @pytest.mark.anyio
+ async def test_sliding_window_no_duplicate_when_in_window(self):
+ """First user message should not be duplicated if already in the kept window."""
+ sw = SlidingWindow(max_messages=3, keep_messages=5, preserve_first_user_message=True)
+ messages: list[ModelMessage] = [
+ _user('task'),
+ _assistant('ok'),
+ _user('more'),
+ _assistant('done'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 4 # Not triggered since 4 < 5 keep.
+
+ @pytest.mark.anyio
+ async def test_sliding_window_disabled_preserve(self):
+ """When preserve_first_user_message=False, first user message is not kept."""
+ sw = SlidingWindow(max_messages=3, keep_messages=1, preserve_first_user_message=False)
+ messages: list[ModelMessage] = [
+ _user('original'),
+ _assistant('a'),
+ _user('b'),
+ _assistant('c'),
+ _user('last'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+ assert 'original' not in _user_texts(result.messages)
+
+ @pytest.mark.anyio
+ async def test_compaction_preserves_first_user(self):
+ comp = SummarizingCompaction(model='test:m', max_messages=3, keep_messages=1, preserve_first_user_message=True)
+ messages: list[ModelMessage] = [
+ _user('build a web app'),
+ _assistant('response 1'),
+ _user('second'),
+ _assistant('response 2'),
+ _user('third'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ # Summary message + first user message + 1 kept = 3.
+ assert len(result.messages) == 3
+ # First message is the summary (with system prompts).
+ assert isinstance(result.messages[0], ModelRequest)
+ sys_parts = [p for p in result.messages[0].parts if isinstance(p, SystemPromptPart)]
+ assert any('Summary.' in p.content for p in sys_parts)
+ # Second message is the preserved first user message.
+ assert isinstance(result.messages[1], ModelRequest)
+ user_parts = [p for p in result.messages[1].parts if isinstance(p, UserPromptPart)]
+ assert len(user_parts) == 1
+ assert user_parts[0].content == 'build a web app'
+
+ @pytest.mark.anyio
+ async def test_compaction_no_duplicate_first_user_when_in_window(self):
+ """First user message already in kept window should not be duplicated."""
+ comp = SummarizingCompaction(model='test:m', max_messages=3, keep_messages=5, preserve_first_user_message=True)
+ messages: list[ModelMessage] = [
+ _user('task'),
+ _assistant('ok'),
+ _user('more'),
+ _assistant('done'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await comp.before_model_request(ctx, rc)
+ # Not triggered since keep_messages > len(messages).
+ assert len(result.messages) == 4
+
+ @pytest.mark.anyio
+ async def test_sliding_window_no_user_messages(self):
+ """When there are no user messages, preservation is a no-op."""
+ sw = SlidingWindow(max_messages=2, keep_messages=1, preserve_first_user_message=True)
+ messages: list[ModelMessage] = [
+ _assistant('a'),
+ _assistant('b'),
+ _assistant('c'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+ result = await sw.before_model_request(ctx, rc)
+ assert len(result.messages) == 1
+
+
+# ---------------------------------------------------------------------------
+# Incremental summarization
+# ---------------------------------------------------------------------------
+
+
+class TestIncrementalSummarization:
+ """Tests for the incremental parameter on Compaction."""
+
+ def test_extract_previous_summary_found(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content=f'{_SUMMARY_PREFIX}Old summary text.')]),
+ _user('hi'),
+ ]
+ assert _extract_previous_summary(msgs) == 'Old summary text.'
+
+ def test_extract_previous_summary_not_found(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='Regular system prompt.')]),
+ _user('hi'),
+ ]
+ assert _extract_previous_summary(msgs) is None
+
+ def test_extract_previous_summary_empty_messages(self):
+ assert _extract_previous_summary([]) is None
+
+ def test_extract_previous_summary_skips_non_requests(self):
+ msgs: list[ModelMessage] = [
+ _assistant('hi'),
+ _user('hello'),
+ ]
+ assert _extract_previous_summary(msgs) is None
+
+ @pytest.mark.anyio
+ async def test_incremental_includes_previous_summary(self):
+ """When incremental=True and a prior summary exists, it should be included in the prompt."""
+ comp = SummarizingCompaction(
+ model='test:m',
+ max_messages=3,
+ keep_messages=1,
+ incremental=True,
+ preserve_first_user_message=False,
+ )
+ # Simulate a conversation that already has a summary from prior compaction.
+ messages: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content=f'{_SUMMARY_PREFIX}Previous context here.')]),
+ _user('new input 1'),
+ _assistant('response 1'),
+ _user('new input 2'),
+ _assistant('response 2'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Extended summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ await comp.before_model_request(ctx, rc)
+
+ # Verify the summarization prompt included the previous summary.
+ call_args = mock_agent_instance.run.call_args
+ prompt_text = call_args[0][0]
+ assert '' in prompt_text
+ assert 'Previous context here.' in prompt_text
+
+ @pytest.mark.anyio
+ async def test_incremental_no_previous_summary(self):
+ """When incremental=True but no prior summary exists, prompt should be plain."""
+ comp = SummarizingCompaction(
+ model='test:m',
+ max_messages=3,
+ keep_messages=1,
+ incremental=True,
+ preserve_first_user_message=False,
+ )
+ messages: list[ModelMessage] = [
+ _user('first'),
+ _assistant('response 1'),
+ _user('second'),
+ _assistant('response 2'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Fresh summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ await comp.before_model_request(ctx, rc)
+
+ call_args = mock_agent_instance.run.call_args
+ prompt_text = call_args[0][0]
+ assert '' not in prompt_text
+
+ @pytest.mark.anyio
+ async def test_incremental_disabled(self):
+ """When incremental=False, the previous summary should not be included."""
+ comp = SummarizingCompaction(
+ model='test:m',
+ max_messages=3,
+ keep_messages=1,
+ incremental=False,
+ preserve_first_user_message=False,
+ )
+ messages: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content=f'{_SUMMARY_PREFIX}Old summary.')]),
+ _user('new input'),
+ _assistant('response'),
+ _user('another'),
+ _assistant('another response'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Regenerated summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ await comp.before_model_request(ctx, rc)
+
+ call_args = mock_agent_instance.run.call_args
+ prompt_text = call_args[0][0]
+ assert '' not in prompt_text
+
+ @pytest.mark.anyio
+ async def test_incremental_output_contains_summary(self):
+ """The output after incremental compaction should contain the new summary."""
+ comp = SummarizingCompaction(
+ model='test:m',
+ max_messages=3,
+ keep_messages=1,
+ incremental=True,
+ preserve_first_user_message=False,
+ )
+ messages: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content=f'{_SUMMARY_PREFIX}Old context.')]),
+ _user('a'),
+ _assistant('b'),
+ _user('c'),
+ _assistant('d'),
+ ]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Extended context summary.'
+
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+
+ result = await comp.before_model_request(ctx, rc)
+
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ sys_parts = [p for p in first_msg.parts if isinstance(p, SystemPromptPart)]
+ assert any('Extended context summary.' in p.content for p in sys_parts)
+
+
+# ---------------------------------------------------------------------------
+# Helpers for the new strategies
+# ---------------------------------------------------------------------------
+
+
+def _pair(name: str, cid: str, content: str = 'result content here') -> list[ModelMessage]:
+ return [_tool_call(name, cid), _tool_return(name, cid, content)]
+
+
+def _return_contents(messages: list[ModelMessage]) -> list[str]:
+ out: list[str] = []
+ for m in messages:
+ if isinstance(m, ModelRequest):
+ for p in m.parts:
+ if isinstance(p, ToolReturnPart):
+ out.append(str(p.content))
+ return out
+
+
+def _call_args(messages: list[ModelMessage]) -> list[object]:
+ out: list[object] = []
+ for m in messages:
+ if isinstance(m, ModelResponse):
+ for p in m.parts:
+ if isinstance(p, ToolCallPart):
+ out.append(p.args)
+ return out
+
+
+def _user_texts(messages: list[ModelMessage]) -> list[str]:
+ out: list[str] = []
+ for m in messages:
+ if isinstance(m, ModelRequest):
+ for p in m.parts:
+ if isinstance(p, UserPromptPart) and isinstance(p.content, str):
+ out.append(p.content)
+ return out
+
+
+def _orphan_free(messages: list[ModelMessage]) -> bool:
+ """True if every kept tool return has its matching tool call among *messages*."""
+ call_ids: set[str] = set()
+ return_ids: set[str] = set()
+ for m in messages:
+ if isinstance(m, ModelResponse):
+ for p in m.parts:
+ if isinstance(p, ToolCallPart) and p.tool_call_id:
+ call_ids.add(p.tool_call_id)
+ else:
+ for p in m.parts:
+ if isinstance(p, ToolReturnPart):
+ return_ids.add(p.tool_call_id)
+ return return_ids <= call_ids
+
+
+class TestHelperCoverage:
+ """Exercise every branch of the shared test-collection helpers with one diverse input."""
+
+ def test_collection_helpers(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='s'), UserPromptPart(content='u')]),
+ ModelResponse(parts=[TextPart(content='t'), ToolCallPart(tool_name='fn', args='{}', tool_call_id='c1')]),
+ _tool_return('fn', 'c1', 'r'),
+ ]
+ assert _user_texts(msgs) == ['u']
+ assert _return_contents(msgs) == ['r']
+ assert _call_args(msgs) == ['{}']
+ assert _orphan_free(msgs)
+
+ def test_file_key_edges(self):
+ assert _file_key(ToolCallPart(tool_name='other', args={}, tool_call_id='c')) is None
+ assert _file_key(ToolCallPart(tool_name='read_file', args='not-a-dict', tool_call_id='c')) is None
+ assert _file_key(ToolCallPart(tool_name='read_file', args={'path': 123}, tool_call_id='c')) is None
+ assert _file_key(ToolCallPart(tool_name='read_file', args={'path': 'p.py'}, tool_call_id='c')) == 'p.py'
+
+
+# ---------------------------------------------------------------------------
+# iter_tool_pairs
+# ---------------------------------------------------------------------------
+
+
+class TestIterToolPairs:
+ def test_skips_empty_ids_and_orphan_returns(self):
+ msgs: list[ModelMessage] = [
+ ModelResponse(parts=[ToolCallPart(tool_name='fn', args='{}', tool_call_id='')]),
+ _tool_return('fn', ''), # empty id, no matching call
+ _tool_return('fn', 'orphan'), # return with no matching call
+ _tool_call('g', 'g1'),
+ _tool_return('g', 'g1'),
+ ]
+ pairs = iter_tool_pairs(msgs)
+ assert [p.tool_call_id for p in pairs] == ['g1']
+ assert pairs[0].tool_name == 'g'
+ assert pairs[0].order == 0
+
+
+# ---------------------------------------------------------------------------
+# ClearToolResults
+# ---------------------------------------------------------------------------
+
+
+class TestClearToolResults:
+ def test_validation_no_trigger(self):
+ with pytest.raises(ValueError, match='At least one of max_messages or max_tokens must be set'):
+ ClearToolResults()
+
+ def test_validation_negative_max_messages(self):
+ with pytest.raises(ValueError, match='max_messages must be positive'):
+ ClearToolResults(max_messages=0)
+
+ def test_validation_negative_max_tokens(self):
+ with pytest.raises(ValueError, match='max_tokens must be positive'):
+ ClearToolResults(max_tokens=-1)
+
+ def test_validation_negative_keep_pairs(self):
+ with pytest.raises(ValueError, match='keep_pairs must be non-negative'):
+ ClearToolResults(max_messages=1, keep_pairs=-1)
+
+ def test_validation_negative_min_clear_tokens(self):
+ with pytest.raises(ValueError, match='min_clear_tokens must be non-negative'):
+ ClearToolResults(max_messages=1, min_clear_tokens=-1)
+
+ @pytest.mark.anyio
+ async def test_no_clear_below_threshold(self):
+ cap = ClearToolResults(max_messages=100, keep_pairs=0)
+ messages: list[ModelMessage] = [*_pair('fn', 'tc1'), *_pair('fn', 'tc2')]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert result.messages == messages
+
+ @pytest.mark.anyio
+ async def test_clears_old_keeps_recent_pairs(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=1)
+ messages: list[ModelMessage] = [
+ *_pair('fn', 'tc1'),
+ *_pair('fn', 'tc2'),
+ *_pair('fn', 'tc3'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ contents = _return_contents(result.messages)
+ assert contents == ['[tool result cleared]', '[tool result cleared]', 'result content here']
+
+ @pytest.mark.anyio
+ async def test_token_trigger(self):
+ cap = ClearToolResults(max_tokens=5, keep_pairs=0)
+ messages: list[ModelMessage] = [*_pair('fn', 'tc1', 'x' * 80)]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[tool result cleared]']
+
+ @pytest.mark.anyio
+ async def test_exclude_tools(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0, exclude_tools=frozenset({'keep'}))
+ messages: list[ModelMessage] = [*_pair('drop', 'tc1'), *_pair('keep', 'tc2')]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[tool result cleared]', 'result content here']
+
+ @pytest.mark.anyio
+ async def test_clear_tool_inputs(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0, clear_tool_inputs=True)
+ call = ModelResponse(parts=[ToolCallPart(tool_name='fn', args='{"q": "x"}', tool_call_id='tc1')])
+ messages: list[ModelMessage] = [call, _tool_return('fn', 'tc1')]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ # Cleared args stay JSON-valid so they don't reach a provider as malformed function-args.
+ assert _call_args(result.messages) == ['{}']
+
+ @pytest.mark.anyio
+ async def test_min_clear_tokens_skips_small_gain(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0, min_clear_tokens=10_000)
+ messages: list[ModelMessage] = [*_pair('fn', 'tc1', 'tiny')]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ # Reclaim is far below min_clear_tokens, so nothing is cleared.
+ assert _return_contents(result.messages) == ['tiny']
+
+ @pytest.mark.anyio
+ async def test_min_clear_tokens_proceeds_on_large_gain(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0, min_clear_tokens=1)
+ messages: list[ModelMessage] = [*_pair('fn', 'tc1', 'x' * 400)]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[tool result cleared]']
+
+ @pytest.mark.anyio
+ async def test_no_tool_pairs_is_noop(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0)
+ messages: list[ModelMessage] = [_user('a'), _assistant('b')]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert result.messages == messages
+
+ @pytest.mark.anyio
+ async def test_idempotent(self):
+ cap = ClearToolResults(max_messages=1, keep_pairs=0, clear_tool_inputs=True)
+ call = ModelResponse(parts=[ToolCallPart(tool_name='fn', args='{"q": "x"}', tool_call_id='tc1')])
+ messages: list[ModelMessage] = [call, _tool_return('fn', 'tc1')]
+ ctx = _make_ctx()
+ once = await cap.compact(messages, ctx)
+ twice = await cap.compact(once, ctx)
+ assert _return_contents(twice) == ['[tool result cleared]']
+ assert _call_args(twice) == ['{}']
+
+
+# ---------------------------------------------------------------------------
+# DeduplicateFileReads
+# ---------------------------------------------------------------------------
+
+
+def _read_call(cid: str, path: str) -> ModelResponse:
+ return ModelResponse(parts=[ToolCallPart(tool_name='read_file', args={'path': path}, tool_call_id=cid)])
+
+
+def _read_return(cid: str, content: str) -> ModelRequest:
+ return ModelRequest(parts=[ToolReturnPart(tool_name='read_file', content=content, tool_call_id=cid)])
+
+
+def _file_key(call: ToolCallPart) -> str | None:
+ if call.tool_name != 'read_file':
+ return None
+ args = call.args
+ if isinstance(args, dict):
+ path = args.get('path')
+ return path if isinstance(path, str) else None
+ return None
+
+
+class TestDeduplicateFileReads:
+ def test_validation_negative_max_messages(self):
+ with pytest.raises(ValueError, match='max_messages must be positive'):
+ DeduplicateFileReads(file_key=_file_key, max_messages=0)
+
+ def test_validation_negative_max_tokens(self):
+ with pytest.raises(ValueError, match='max_tokens must be positive'):
+ DeduplicateFileReads(file_key=_file_key, max_tokens=-1)
+
+ @pytest.mark.anyio
+ async def test_keeps_latest_read(self):
+ cap = DeduplicateFileReads(file_key=_file_key)
+ messages: list[ModelMessage] = [
+ _read_call('tc1', 'a.py'),
+ _read_return('tc1', 'first a'),
+ _read_call('tc2', 'b.py'),
+ _read_return('tc2', 'b body'),
+ _read_call('tc3', 'a.py'),
+ _read_return('tc3', 'second a'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[superseded file read]', 'b body', 'second a']
+
+ @pytest.mark.anyio
+ async def test_non_file_read_ignored(self):
+ cap = DeduplicateFileReads(file_key=_file_key)
+ messages: list[ModelMessage] = [
+ *_pair('search', 'tc1'),
+ *_pair('search', 'tc2'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ # search is not a file read -> file_key returns None -> nothing cleared.
+ assert _return_contents(result.messages) == ['result content here', 'result content here']
+
+ @pytest.mark.anyio
+ async def test_no_duplicates_is_noop(self):
+ cap = DeduplicateFileReads(file_key=_file_key)
+ messages: list[ModelMessage] = [
+ _read_call('tc1', 'a.py'),
+ _read_return('tc1', 'a body'),
+ _read_call('tc2', 'b.py'),
+ _read_return('tc2', 'b body'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert result.messages == messages
+
+ @pytest.mark.anyio
+ async def test_runs_always_without_trigger(self):
+ cap = DeduplicateFileReads(file_key=_file_key)
+ messages: list[ModelMessage] = [
+ _read_call('tc1', 'a.py'),
+ _read_return('tc1', 'first'),
+ _read_call('tc2', 'a.py'),
+ _read_return('tc2', 'second'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[superseded file read]', 'second']
+
+ @pytest.mark.anyio
+ async def test_trigger_gate_not_exceeded(self):
+ cap = DeduplicateFileReads(file_key=_file_key, max_messages=100)
+ messages: list[ModelMessage] = [
+ _read_call('tc1', 'a.py'),
+ _read_return('tc1', 'first'),
+ _read_call('tc2', 'a.py'),
+ _read_return('tc2', 'second'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ # Below the trigger threshold, so no dedup despite the duplicate.
+ assert result.messages == messages
+
+ @pytest.mark.anyio
+ async def test_trigger_gate_exceeded(self):
+ cap = DeduplicateFileReads(file_key=_file_key, max_messages=1)
+ messages: list[ModelMessage] = [
+ _read_call('tc1', 'a.py'),
+ _read_return('tc1', 'first'),
+ _read_call('tc2', 'a.py'),
+ _read_return('tc2', 'second'),
+ ]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert _return_contents(result.messages) == ['[superseded file read]', 'second']
+
+
+# ---------------------------------------------------------------------------
+# TieredCompaction
+# ---------------------------------------------------------------------------
+
+
+@dataclasses.dataclass
+class _RecordingTier:
+ label: str
+ calls: list[str]
+ drop: int = 0
+
+ async def compact(self, messages: list[ModelMessage], ctx: Any) -> list[ModelMessage]:
+ self.calls.append(self.label)
+ return messages[self.drop :] if self.drop else messages
+
+
+class TestTieredCompaction:
+ def test_validation_empty_tiers(self):
+ with pytest.raises(ValueError, match='tiers must not be empty'):
+ TieredCompaction(tiers=[], target_tokens=10)
+
+ def test_validation_target_tokens(self):
+ with pytest.raises(ValueError, match='target_tokens must be positive'):
+ TieredCompaction(tiers=[ClearToolResults(max_messages=1)], target_tokens=0)
+
+ @pytest.mark.anyio
+ async def test_noop_under_target(self):
+ calls: list[str] = []
+ tier = _RecordingTier('t1', calls)
+ cap = TieredCompaction(tiers=[tier], target_tokens=1_000_000)
+ messages: list[ModelMessage] = [_user('x' * 40)]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert result.messages == messages
+ assert calls == []
+
+ @pytest.mark.anyio
+ async def test_short_circuit_first_tier_suffices(self):
+ calls: list[str] = []
+ # Each message ~10 tokens; 5 messages = 50 tokens. Target 15.
+ t1 = _RecordingTier('t1', calls, drop=4) # leaves 1 message (~10 tokens) <= 15
+ t2 = _RecordingTier('t2', calls, drop=0)
+ cap = TieredCompaction(tiers=[t1, t2], target_tokens=15)
+ messages: list[ModelMessage] = [_user('x' * 40) for _ in range(5)]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert calls == ['t1'] # t2 never reached
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_full_escalation(self):
+ calls: list[str] = []
+ t1 = _RecordingTier('t1', calls, drop=1) # 5 -> 4 messages (~40 tokens) still > 15
+ t2 = _RecordingTier('t2', calls, drop=3) # 4 -> 1 message
+ cap = TieredCompaction(tiers=[t1, t2], target_tokens=15)
+ messages: list[ModelMessage] = [_user('x' * 40) for _ in range(5)]
+ rc = _make_request_context(messages)
+ result = await cap.before_model_request(_make_ctx(), rc)
+ assert calls == ['t1', 't2']
+ assert len(result.messages) == 1
+
+ @pytest.mark.anyio
+ async def test_composes_real_strategies(self):
+ # ClearToolResults then SummarizingCompaction, driven by the orchestrator.
+ clear = ClearToolResults(max_messages=1, keep_pairs=0)
+ summarizer = SummarizingCompaction(
+ model='test:m', max_messages=1, keep_messages=1, preserve_first_user_message=False
+ )
+ cap = TieredCompaction(tiers=[clear, summarizer], target_tokens=1)
+ messages: list[ModelMessage] = [*_pair('fn', 'tc1', 'x' * 200), _user('latest')]
+ rc = _make_request_context(messages)
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Tiered summary.'
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+ result = await cap.before_model_request(_make_ctx(), rc)
+
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ sys_parts = [p for p in first_msg.parts if isinstance(p, SystemPromptPart)]
+ assert any('Tiered summary.' in p.content for p in sys_parts)
+
+
+# ---------------------------------------------------------------------------
+# SummarizingCompaction — model inheritance + structured prompt
+# ---------------------------------------------------------------------------
+
+
+class TestSummarizingCompactionModel:
+ @pytest.mark.anyio
+ async def test_model_inherits_from_ctx_when_none(self):
+ comp = SummarizingCompaction(
+ max_messages=3, keep_messages=1, preserve_first_user_message=False, incremental=False
+ )
+ messages: list[ModelMessage] = [_user('a'), _assistant('b'), _user('c'), _assistant('d')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Inherited-model summary.'
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+ await comp.before_model_request(ctx, rc)
+
+ # The summarizer agent was constructed with the running agent's model.
+ assert MockAgent.call_args.args[0] is ctx.model
+ # And its usage is threaded into the parent run for honest accounting.
+ assert mock_agent_instance.run.call_args.kwargs['usage'] is ctx.usage
+
+ def test_default_prompt_has_structured_sections(self):
+ from pydantic_ai_harness.experimental.compaction._summarizing_compaction import _DEFAULT_SUMMARY_PROMPT
+
+ for heading in (
+ '## Intent',
+ '## Key decisions',
+ '## Artifacts',
+ '## Current state',
+ '## Next steps',
+ '## Open questions',
+ ):
+ assert heading in _DEFAULT_SUMMARY_PROMPT
+
+
+# ---------------------------------------------------------------------------
+# Public path — Agent(capabilities=[...])
+# ---------------------------------------------------------------------------
+
+
+class TestPublicPath:
+ @pytest.fixture
+ def anyio_backend(self) -> str:
+ # A full agent.run only needs to be exercised once; the trio backend hits a
+ # TestModel event-loop quirk in core unrelated to compaction.
+ return 'asyncio'
+
+ @pytest.mark.anyio
+ async def test_capabilities_wired_into_agent(self):
+ from pydantic_ai import Agent
+ from pydantic_ai.models.test import TestModel
+
+ agent = Agent(
+ TestModel(),
+ capabilities=[ClearToolResults(max_tokens=1, keep_pairs=0)],
+ )
+ result = await agent.run('hello')
+ assert result.output is not None
+
+
+# ---------------------------------------------------------------------------
+# Remaining branch coverage — defensive paths in shared helpers
+# ---------------------------------------------------------------------------
+
+
+class TestHelperBranchCoverage:
+ def test_prepend_returns_trimmed_when_first_user_not_discarded(self):
+ first = _user('task')
+ messages: list[ModelMessage] = [first, _assistant('a'), _user('b')]
+ # cutoff=0 -> first (idx 0) is not before the cut, so it is left as-is.
+ assert prepend_first_user_message(messages, 0, messages) == messages
+
+ def test_extract_system_prompts_all_system_loop_completes(self):
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[SystemPromptPart(content='a')]),
+ ModelRequest(parts=[SystemPromptPart(content='b')]),
+ ]
+ assert [p.content for p in _extract_system_prompts(msgs)] == ['a', 'b']
+
+ def test_collect_and_format_skip_unknown_part_types(self):
+ from pydantic_ai.messages import RetryPromptPart, ThinkingPart
+
+ msgs: list[ModelMessage] = [
+ ModelRequest(parts=[RetryPromptPart(content='retry')]),
+ ModelResponse(parts=[ThinkingPart(content='think')]),
+ ]
+ # Unknown part types contribute no countable text but exercise the skip branches.
+ assert estimate_token_count(msgs) == 0
+ assert _format_messages(msgs) == ''
+
+ def test_user_prompt_text_skips_non_text_content(self):
+ from pydantic_ai.messages import ImageUrl
+
+ part = UserPromptPart(content=[ImageUrl(url='https://example.com/y.png'), 'hello'])
+ msgs: list[ModelMessage] = [ModelRequest(parts=[part])]
+ assert estimate_token_count(msgs) == len('hello') // 4
+ assert 'hello' in _format_messages(msgs)
+
+
+class TestSummarizingCompactionPreserveBranches:
+ @pytest.mark.anyio
+ async def test_preserve_with_no_user_messages(self):
+ comp = SummarizingCompaction(
+ model='test:m', max_messages=2, keep_messages=1, preserve_first_user_message=True, incremental=False
+ )
+ messages: list[ModelMessage] = [_assistant('a'), _assistant('b'), _assistant('c')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'No-user summary.'
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+ result = await comp.before_model_request(ctx, rc)
+
+ # Summary message + preserved tail, no first-user message prepended.
+ first_msg = result.messages[0]
+ assert isinstance(first_msg, ModelRequest)
+ assert any(isinstance(p, SystemPromptPart) and 'No-user summary.' in p.content for p in first_msg.parts)
+
+ @pytest.mark.anyio
+ async def test_preserve_when_first_user_already_in_tail(self):
+ comp = SummarizingCompaction(
+ model='test:m', max_messages=2, keep_messages=2, preserve_first_user_message=True, incremental=False
+ )
+ messages: list[ModelMessage] = [_assistant('x'), _assistant('y'), _user('only user'), _assistant('z')]
+ rc = _make_request_context(messages)
+ ctx = _make_ctx()
+
+ mock_result = AsyncMock()
+ mock_result.output = 'Tail summary.'
+ with patch('pydantic_ai.Agent') as MockAgent:
+ mock_agent_instance = AsyncMock()
+ mock_agent_instance.run.return_value = mock_result
+ MockAgent.return_value = mock_agent_instance
+ result = await comp.before_model_request(ctx, rc)
+
+ # The only user message is within the kept tail, so it is not duplicated.
+ user_count = sum(
+ 1 for m in result.messages if isinstance(m, ModelRequest) for p in m.parts if isinstance(p, UserPromptPart)
+ )
+ assert user_count == 1
diff --git a/tests/experimental/test_warnings.py b/tests/experimental/test_warnings.py
new file mode 100644
index 0000000..9b440d8
--- /dev/null
+++ b/tests/experimental/test_warnings.py
@@ -0,0 +1,35 @@
+"""Tests for the experimental-capability warning convention."""
+
+from __future__ import annotations
+
+import importlib
+import warnings
+
+import pytest
+
+from pydantic_ai_harness.experimental import HarnessExperimentalWarning
+from pydantic_ai_harness.experimental._warn import warn_experimental
+
+
+class TestExperimentalWarning:
+ def test_message_names_feature_and_carries_silence_snippet(self):
+ with pytest.warns(HarnessExperimentalWarning) as rec:
+ warn_experimental('compaction')
+ assert len(rec) == 1
+ msg = str(rec[0].message)
+ assert '`pydantic_ai_harness.experimental.compaction`' in msg
+ # The message must hand the user the exact, category-wide silence line.
+ assert "warnings.filterwarnings('ignore', category=HarnessExperimentalWarning)" in msg
+
+ def test_one_filter_silences_every_capability(self):
+ # A single category filter mutes all experimental warnings — no per-capability lines.
+ with warnings.catch_warnings():
+ warnings.simplefilter('error') # baseline: any warning is an error
+ warnings.filterwarnings('ignore', category=HarnessExperimentalWarning)
+ warn_experimental('compaction')
+ warn_experimental('some_future_capability') # also silenced, same filter
+
+ def test_importing_a_capability_warns(self):
+ module = importlib.import_module('pydantic_ai_harness.experimental.compaction')
+ with pytest.warns(HarnessExperimentalWarning):
+ importlib.reload(module)