Skip to content

Add StepPersistence capability for step-event durability across delegates#251

Open
dsfaccini wants to merge 19 commits into
mainfrom
step-persistence
Open

Add StepPersistence capability for step-event durability across delegates#251
dsfaccini wants to merge 19 commits into
mainfrom
step-persistence

Conversation

@dsfaccini
Copy link
Copy Markdown
Contributor

@dsfaccini dsfaccini commented May 24, 2026

Summary

Supersedes #176 (SessionPersistence). Pivots from "save the full session after each run" to step-event persistence + provider-valid continuable snapshots + a tool-effect ledger, and adds a content-addressed media subsystem so snapshots stay small when messages carry large BinaryContent. Design per the discussion on #176 (comment 1, comment 2).

pydanty dogfooding (pydantic/pydantic-ai#5612) surfaced two failure modes the coarse "load-then-save" design cannot address:

  1. Long delegate runs that time out before after_run fires lose all event trail.
  2. Orchestrators that ask a delegate a follow-up question have to rediscover context because each Agent.run is treated as independent.

step_persistence/ — what this delivers

  • StepPersistence capability emitting StepEvents at every boundary: run_started/completed/failed, model_request_started/completed/failed, tool_call_started/completed/failed.
  • ContinuableSnapshot saved only when the message history is provider-valid (every ToolCallPart has a matching ToolReturnPart or tool-bound RetryPromptPart; orphan / duplicate / out-of-order returns rejected). Saved at the end of each CallToolsNode; after_run saves a fallback snapshot only when the run reached no provider-valid boundary.
  • ToolEffectRecord ledger keyed by (run_id, tool_call_id) (started/completed/failed). A run killed between before_tool_execute and after_tool_execute leaves a started record with no terminal update — the unknown_after_crash signal an orchestrator needs before blindly replaying a side-effectful tool.
  • annotate_tool_effect(store, ctx, *, idempotency_key=None, effect_summary=None) for tool bodies that write external state. Resolves run_id from a ContextVar and tool_call_id/tool_name from RunContext, then merges metadata into the in-flight record.
  • Identity model aligned with pydantic_ai: run_id is per-Agent.run (matches RunContext.run_id). For multi-turn logical grouping use conversation_id= on Agent.run(...). parent_run_id is auto-inferred for in-process delegate runs via a ContextVar set in wrap_run. store.list_runs(parent_run_id=..., conversation_id=...) filters by either or both and returns chronological order as a protocol guarantee.
  • Continuation/fork helpers: continue_run(store, run_id=...) / fork_run(store, run_id=...) return the latest provider-valid snapshot's messages; pass to Agent.run(message_history=...).
  • Async StepStore protocol with three backends: InMemoryStepStore, FileStepStore (JSONL events + per-run monotonic snapshot counter), and SqliteStepStore (single-file: runs/events/snapshots/tool-effects + sibling media table; WAL; upsert tool-effects). FileStepStore validates run_id against [A-Za-z0-9_.-]{1,200} (rejecting ..) and dispatches blocking I/O via anyio.to_thread.
  • from_spec(backend='memory'|'file'|'sqlite') with explicit ValueError on unknown backends (no silent fallback to memory).

media/ — content-addressed offload subsystem

Keeps large media out of snapshot JSON; shared URI scheme media+sha256://<hex> (content-addressed → automatic dedup). Reused later by a planned MediaExternalizer capability for in-flight wire-payload reduction.

  • MediaStore protocol + MediaContext (extensible per-operation bag: media_type, filename, metadata).
  • DiskMediaStore — one file per blob, atomic writes, metadata sidecar, traversal-safe key_strategy. Default backend for FileStepStore.
  • SqliteMediaStore — one row per blob, INSERT OR IGNORE dedup, metadata JSON column. Default backend for SqliteStepStore.
  • S3MediaStore — S3 / R2 / MinIO via path-style URLs + hand-rolled AWS SigV4 (no botocore/boto3), x-amz-meta-* metadata. PUT/GET/HEAD only.
  • Walkers externalize_media / restore_media — swap inline BinaryContent ≥ threshold (default 64 KiB) for media+sha256:// markers and back, operating on the serialized JSON shape.
  • public_url resolvers (make_static_public_url, custom sync/async callables) and pluggable key_strategy.

What this PR explicitly does not deliver

Acceptance test

TestCrashMidToolCallContract::test_visible_trail_no_false_continuation_point: a run killed after a tool starts but before a tool return leaves a visible event trail (tool_call_started with no tool_call_completed, a started ToolEffectRecord) but does not expose that point as a valid message_history continuation — latest_snapshot returns the prior provider-valid snapshot.

Review-comment status from #176

Thread Fix
Path traversal in FileSessionStore _validate_id rejects .., separators, empty, oversized IDs
list_sessions drops .meta-suffixed IDs Per-run directory layout removes the collision class
Sync I/O on async hooks StepStore protocol is async; file/sqlite dispatch via anyio.to_thread
self.session_id ignores per-call ctx Three-level identity (conversation_id / run_id / step_index) mirrors pydantic_ai

Test plan

  • make lint && make typecheck && make testcov clean — 100% branch coverage
  • step-persistence + media tests via Agent(..., capabilities=[StepPersistence(...)]) with TestModel (no real model calls)
  • S3MediaStore exercised through VCR cassettes recorded against Cloudflare R2 (pytest-recording); credentials/signatures scrubbed, with a replay leak-canary

Closes #176.

🤖 Generated with Claude Code

…ates

Supersedes PR #176 (SessionPersistence): orchestrators like pydanty need
visible event trails for delegate runs that may time out before a
"save full session after the run" hook can fire, and need to continue or
fork a delegate's prior investigation without rediscovering context. A
single after-run snapshot is too coarse for that use case.

The capability now records (a) append-only StepEvents at every boundary
(run/model-request/tool-call start, completion, failure), (b) a
ContinuableSnapshot only when message history is provider-valid (every
ToolCallPart has a matching ToolReturnPart / RetryPromptPart) — saved
mid-run after CallToolsNode and at after_run, and (c) a ToolEffectRecord
ledger so a run killed between before_tool_execute and
after_tool_execute leaves an `unknown_after_crash`-style record rather
than a falsely-continuable snapshot. Lineage metadata
(parent_run_id, agent_name) ties delegate runs back to their
orchestrator. `continue_run` / `fork_run` helpers load the latest
continuable snapshot for a run.

Backends: InMemoryStepStore (tests) and FileStepStore (JSONL events +
JSON snapshots, with run_id path-traversal validation and
anyio.to_thread for blocking I/O).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini dsfaccini changed the title Add StepPersistence capability for step-event durability across delegates Add StepPersistence capability for step-event durability across delegates May 24, 2026
@dsfaccini dsfaccini mentioned this pull request May 24, 2026
6 tasks
… model

Correctness fixes from pydanty's PR review:

- FileStepStore: snapshot filenames are now a per-run monotonic counter,
  not `ctx.run_step` — `run_step` resets each Agent.run, so re-using a
  `run_id` across calls would let an earlier run's higher step-index
  snapshot mask a later run's lower-step-index one.
- StepStore.get_tool_effect now takes both `run_id` and `tool_call_id`.
  TestModel and other providers can reuse deterministic tool-call ids
  across runs; the previous unscoped lookup let one run's effect leak
  into another's record (including `started_at`).
- is_provider_valid now rejects orphan, duplicate, and out-of-order
  tool returns — the old `set.discard` pattern silently accepted any
  return regardless of whether a matching call was open.

Identity model:

- `run_id` resolution: explicit > `{agent_name}-{8-char-hex}` > UUID.
  Materialised per Agent.run in `for_run`, so reusing one capability
  instance never silently merges runs.
- `parent_run_id` auto-inferred via a module-level ContextVar set in
  `wrap_run`, so an orchestrator's tool that synchronously calls
  `delegate.run(...)` produces a delegate `RunRecord.parent_run_id`
  pointing at the orchestrator's `run_id` with zero threading. Explicit
  `parent_run_id=` still wins.
- `conversation_id` propagated to `StepEvent` and `RunRecord`;
  `store.list_runs(conversation_id=..., parent_run_id=...)` supports
  filtering by either or both. Mirrors pydantic_ai's three-level
  identity (conversation -> run -> step) so "run 1, run 2, run 3" of
  one dialogue is queryable as a group via `conversation_id`.
- `continue_from=` field dropped from the capability. Continuation is
  now only via `continue_run(store, run_id=...)` -> standard
  `Agent.run(message_history=...)`. One way to pass history into
  pydantic_ai, no parallel capability flag.

README rewritten around the final API. New sections: three-level
identity, run lineage with auto-inferred parent, inspecting a run tree,
failure recovery.

Tests: 168 total (up from 64), 100% branch coverage on the package.
New coverage for the snapshot seq counter, cross-run tool-effect
isolation, orphan/duplicate/out-of-order return rejection, ContextVar
parent inference across nested agent.run, conversation_id propagation,
and the agent_name-derived run_id default.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed 4bf6216 addressing pydanty's correctness review + the ergonomics discussion.

Correctness (P1/P2/P3 from pydanty):

  • FileStepStore snapshot filenames are now a per-run monotonic counter, not ctx.run_step. run_step resets each Agent.run, so the previous filename scheme silently mismapped re-uses of the same run_id — later snapshots could be masked by earlier higher-step-index ones.
  • StepStore.get_tool_effect now requires both run_id and tool_call_id; InMemoryStepStore keys effects by (run_id, tool_call_id); FileStepStore.get_tool_effect reads only the run's own tool_effects.jsonl. Removes cross-run leakage when providers (e.g. TestModel) reuse deterministic tool-call ids.
  • is_provider_valid rejects orphan / duplicate / out-of-order tool returns. The previous set.discard silently accepted any return regardless of whether a matching call was open.

Identity model:

  • run_id resolution order: explicit → {agent_name}-{8-char-hex} → UUID. Materialised in for_run, so reusing one capability instance across runs never silently merges them.
  • parent_run_id auto-inferred via a ContextVar set in wrap_run. Orchestrator → delegate within the same process: zero threading. Explicit parent_run_id= still wins.
  • conversation_id now on StepEvent and RunRecord. store.list_runs(conversation_id=..., parent_run_id=...) filters by either / both. Mirrors pydantic_ai's identity stack (conversation → run → step).
  • continue_from= field dropped — continuation is the standard continue_run(store, ...) + Agent.run(message_history=...) flow. One mechanism, not two.

Tests: 168 total (was 64), 100% branch coverage on the package.

README rewritten around the final API: new sections on the three-level identity model, run-tree inspection, failure recovery, and the parent_run_id auto-inference contract.

OpenTelemetry: the prior research agent confirmed every boundary we touch is already spanned by pydantic_ai's Instrumentation. Wiring pydantic_ai_harness.step_persistence.* attributes onto the active span (no new spans) is the right shape but belongs in a follow-up — I'll open the issue once this lands.

… metadata

Correctness:

- is_provider_valid no longer rejects non-tool RetryPromptParts. Pydantic
  AI emits `RetryPromptPart(tool_name=None)` for output-validation
  failures and providers map those as plain user messages, not tool
  results. The previous check required every RetryPromptPart to resolve
  an open tool call, so a run with one output retry produced no final
  continuable snapshot despite being fully valid.
- StepStore.list_runs now guarantees chronological (started_at ascending)
  ordering across both backends. FileStepStore was previously returning
  directory-name order (lexicographic), so the README's `[-1]` pattern
  for "latest run in conversation" could pick the older run when run ids
  did not sort by recency.
- after_tool_execute and on_tool_execute_error preserve idempotency_key
  and effect_summary from the prior `started` record. Previously the
  terminal record was written without those fields, so any annotation
  the tool body wrote was lost on completion.
- from_spec raises ValueError for unknown backends instead of silently
  falling back to in-memory storage. For a persistence capability,
  turning a typo into accidental non-durability is the wrong failure
  mode.

API:

- New annotate_tool_effect(store, ctx, *, idempotency_key=None,
  effect_summary=None) helper. Tool bodies that write external state
  call it to attach idempotency + effect metadata to the in-flight
  ToolEffectRecord without knowing the (run_id, tool_call_id) plumbing.
  Resolves run_id from a ContextVar set by wrap_run; reads tool_call_id
  / tool_name from RunContext.
- ContextVar moved from `_capability.py` into a new `_context.py` module
  so the helper and the capability can share it without circular imports
  and without crossing the private-name barrier.

Docs: README fixes a non-existent `list_runs(agent_name=None)` call,
documents the chronological-ordering guarantee, and replaces the
hand-wavy "populate fields on the ToolEffectRecord" line with a concrete
`annotate_tool_effect` example.

Tests: 178 total (was 168), 100% branch coverage on the package.
Added coverage for non-tool retry acceptance, chronological list_runs on
both backends, metadata preservation across completed/failed
transitions, annotate_tool_effect under realistic agent.tool, and
from_spec backend validation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed 37ae4c7 addressing pydanty's round-2 review.

Correctness:

  • is_provider_valid no longer rejects RetryPromptPart(tool_name=None). Output-validation retries are plain user messages, not tool results — the previous check made any successful run containing one of them unresumable. Now: tool-bound retries still require an open call (orphan/duplicate detection intact); non-tool retries pass through.
  • StepStore.list_runs now guarantees chronological (started_at ascending) ordering across both backends. FileStepStore was returning directory-name order, so the README's [-1] pattern could pick the older run when ids didn't sort by recency.
  • after_tool_execute / on_tool_execute_error preserve idempotency_key and effect_summary from the prior started record. Annotations a tool wrote during its execution survive the terminal transition.
  • from_spec raises ValueError for unknown backends instead of silently using memory. Typo → accidental non-durability was the wrong failure mode.

API:

  • New annotate_tool_effect(store, ctx, *, idempotency_key=None, effect_summary=None) helper. Tool bodies call it to attach metadata to the in-flight ToolEffectRecord without knowing the (run_id, tool_call_id) plumbing — run_id comes from a ContextVar set by wrap_run, tool_call_id / tool_name from RunContext. Re-exported from pydantic_ai_harness.
  • ContextVar moved to a new _context.py module so the helper and capability share it without crossing a private-name boundary or risking circular imports.

README: fixed the non-existent list_runs(agent_name=None) call, documented the chronological-ordering guarantee, replaced the hand-wavy "populate fields on the ToolEffectRecord" line with a concrete annotate_tool_effect example.

Tests: 178 total (was 168), 100% branch coverage. New coverage for non-tool retry acceptance, chronological list_runs on both backends, metadata preservation across started → completed/failed, annotate_tool_effect under realistic agent.tool, from_spec backend validation.

@aristide1997
Copy link
Copy Markdown

Are you planning to add other backends like SQL or DynamoDB? And how would you handle files (e.g. BinaryContent) in the message history? @dsfaccini

dsfaccini and others added 2 commits May 24, 2026 15:18
…r layout

The class docstring still showed snapshots/<step_index>.json from the
pre-fix layout, but both the README and _next_snapshot_seq document the
monotonic counter. Bring the class docstring in line.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…haring

Pydanty round-3 review:

- README continuation and lineage examples queried `list_runs(conversation_id=...)`
  on conversations the earlier `.run(...)` calls never set, so the
  examples crashed with IndexError on `[-1]`. Pass the conversation_id to
  the earlier calls so the lookup actually works.
- The capability docstring claimed reusing a `StepPersistence` instance
  across `Agent.run` calls does NOT share the id. That is true only for
  the auto-derived (`agent_name`-prefixed or `ctx.run_id`) cases — an
  explicit `run_id=` is shared across every `.run()` by design, since
  that is the orchestrator pattern where the caller owns one logical
  identity across turns. Rewrite the resolution-order docs to spell out
  which cases share and which don't, and when to pick each.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: round-3 cleanup pushed in 0dafedb, plus PR body refreshed.

  • README continuation + lineage examples now pass conversation_id= to their earlier .run(...) calls — they would previously crash on [-1] because the query targeted a conversation that was never recorded.
  • Capability docstring clarifies the run_id resolution: explicit values are shared across .run() calls by design (orchestrator-owned identity); auto-derived (agent_name-prefixed or ctx.run_id) is per-call uniqueness. The earlier "does NOT silently merge" line was only true for the auto-derived cases.
  • PR body refreshed to drop the stale continue_from= reference and bump the test count to the current 92 step-persistence (178 project total).

All checks still green.

…call, not shared)

Pydanty round-4 review: the prior round documented explicit `run_id` as
shared across `.run()` calls on one capability instance — that framing
caused real correctness gaps. The `ToolEffectRecord` ledger is keyed by
`(run_id, tool_call_id)` and providers reuse deterministic tool-call ids
(e.g. `TestModel` emits `pyd_ai_tool_call_id__{name}`), so a second
`.run()` overwrites the first's effect record under the same key — the
`unknown_after_crash` signal from turn 1 disappears when turn 2 lands.

Realign:

- `run_id` is per-`Agent.run`, matching `pydantic_ai.RunContext.run_id`.
- For multi-turn logical grouping, use `conversation_id=` on
  `Agent.run(...)` — that is the pyai-native primitive. The orchestrator
  pattern is `conversation_id='orch'` with each turn auto-deriving its
  own `run_id`.
- Explicit `run_id=` remains supported but is documented as single-shot
  (testing, replay, debugging). Reusing it across calls is a caller
  contract violation, not an implementation feature.

Code is unchanged — the implementation was already correct under the
right contract. Only the docs were misleading.

Tests:
- `TestRunIdIsPerCall::test_multi_turn_orchestrator_uses_conversation_id`
  exercises the recommended pattern: three turns sharing a
  `conversation_id`, three distinct auto-derived `run_id`s, all
  queryable as a group.
- `TestRunIdIsPerCall::test_explicit_run_id_reuse_collides_ledger` locks
  down the misuse contract: reusing one explicit `run_id` across two
  `.run()` calls produces colliding effect records under the
  `(run_id, tool_call_id)` key. The behavior is documented; the test
  exists so a future refactor cannot silently change it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: round-4 cleanup pushed in eb5b334.

Pydanty caught a real correctness gap in my prior framing: the README said explicit run_id is "shared across every .run() by design" (the orchestrator pattern). But ToolEffectRecord is keyed by (run_id, tool_call_id), and providers reuse deterministic tool-call ids, so two .run() calls with the same explicit run_id collide — the unknown_after_crash signal from turn 1 vanishes when turn 2 lands.

Aligned with pydantic_ai's identity semantics:

  • run_id is per-Agent.run, same as RunContext.run_id.
  • conversation_id (pyai-native) is the multi-turn grouping primitive. The orchestrator pattern is now conversation_id='orch-conv' per turn, with each turn auto-deriving its own run_id.
  • Explicit run_id= stays for single-shot use (testing / replay / debugging). Reusing it is a caller-contract violation, documented as such.

Code is unchanged — the implementation was already correct under the right contract. Only the README + capability docstring were misleading.

Two new tests:

  • TestRunIdIsPerCall::test_multi_turn_orchestrator_uses_conversation_id — the recommended pattern, three turns sharing a conversation_id, three distinct run_ids.
  • TestRunIdIsPerCall::test_explicit_run_id_reuse_collides_ledger — locks down the misuse contract so a future refactor can't silently change it.

94 step-persistence tests, 100% branch coverage on the package. Lint + typecheck + CI all green.

Pyai-aligned review flagged this as a P3 explainer: pydantic_ai already
has three single-slot cross-run signals (RUN_ID_BAGGAGE_KEY, ctx.run_id,
_CURRENT_RUN_CONTEXT). All three get overwritten by the inner
Instrumentation.wrap_run before any nested capability can see the parent
identity. A separate harness-local ContextVar, snapshotted before our
own wrap_run rebinds it, is the only correct mechanism today. Spell
this out so the next reader doesn't try to 'simplify' it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pyai-aligned subagent review came back. Verdict: none blocking merge.

The subagent read both trees (pydantic-ai/ base + this PR) and checked every place the capability could be reinventing primitives pyai already exposes. Summary of the relevant findings:

  • Message-history persistence — not reinvented. We round-trip via ModelMessagesTypeAdapter and resume via the public Agent.run(message_history=...). pydantic_ai.durable_exec/ is engine-specific, not a generic snapshot store.
  • Event emission — pyai's Instrumentation already spans every boundary, but it writes to OTel, not a queryable durable ledger. Distinct concern, justifies its own sink.
  • RunContext usage — all five fields read (run_id, run_step, conversation_id, tool_call_id, tool_name) are documented public fields.
  • Parent-run-id ContextVarjustified. Pyai's three cross-run signals (RUN_ID_BAGGAGE_KEY, RunContext.run_id, _CURRENT_RUN_CONTEXT) are all single-slot and overwritten by the inner Instrumentation.wrap_run. A separate ContextVar snapshotted in for_run is the only correct mechanism today. Added an explainer comment to for_run so the next reader doesn't try to "simplify" it (675633e).
  • StepStore protocol surface — no pyai analogue. durable_exec engines own their own state stores.
  • is_provider_valid — no pyai equivalent we can call. _agent_graph enforces the invariant by construction but doesn't expose a public validator. Proposing it as pyai core: Expose public message_history validator for provider-valid histories pydantic-ai#5637.
  • Hook choice — correct. before/after/on_run, before/after/on_model_request, before/after/on_tool_execute, after_node_run for mid-run snapshots. wrap_run justifiably used only to push the ContextVar.
  • for_run materialisation — adjacent to but consistent with how Instrumentation uses it.
  • conversation_id semantics — faithful to pyai's _agent_graph.resolve_conversation_id.

Follow-ups opened (none merge-blocking):

Plus the explainer commit 675633e. That's the only in-PR change from this round.

Identity model + 178/180 test suite still green. Ready for review.

Pydanty round-5 review accepted the docs-only contract but flagged that
"documented but not enforced" is a soft spot. Enforce it: `before_run`
calls `store.get_run(run_id=...)` when the user supplied an explicit
`run_id`, and raises `ValueError` if a record with that id already
exists. The auto-derived cases cannot trigger this check (each call
materialises a fresh id in `for_run`).

The check is one extra store read per Agent.run when an explicit run_id
is set, only. The error message points the caller at `conversation_id`
for multi-turn grouping.

Test renamed from `test_explicit_run_id_reuse_collides_ledger` to
`test_explicit_run_id_reuse_raises` — asserts the second `.run()` raises
and the first run's records survive untouched.

README + capability docstring updated: the misuse path is now "raises"
not "caller's contract."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed ece83eb addressing the round-5 residual.

The reviewer accepted the docs-only contract but noted "documented but not enforced." Enforce: before_run checks the store for an existing RunRecord with the configured run_id when one was supplied explicitly, and raises ValueError if found. Auto-derived run_ids can't trigger the check because each Agent.run materialises a fresh id in for_run.

Error message points the caller at conversation_id for multi-turn grouping, mirroring the README's guidance:

StepPersistence: run_id 'shared' is already in the store. Explicit
`run_id` is single-shot; pass `conversation_id=` to `Agent.run` for
multi-turn grouping instead.

One extra store read per Agent.run, only when explicit run_id is set. TestRunIdIsPerCall::test_explicit_run_id_reuse_raises locks it down. README + capability docstring updated to say "raises" instead of "caller's contract."

94 step-persistence tests, 100% branch coverage, lint + typecheck + CI green.

Also: PR #176 closed in favour of this one (with your earlier permission).

dsfaccini and others added 2 commits May 24, 2026 16:03
Two patterns that match the existing CLAUDE.local.md ignore convention:

- AGENTS.local.md — canonical local-instructions file (CLAUDE.local.md
  is symlinked to it where the worktree follows the same
  AGENTS.md/CLAUDE.md symlink pattern).
- .agents/skills/branch-context/ — per-worktree decisions log
  (`pr-decisions.md`) and the skill's local SKILL.md. Pattern lifted
  from `~/pydantic/ai/base/.claude/skills/branch-context/` where pyai
  uses an identical setup.

Neither is intended to land in PRs — they record cross-iteration design
calls so future AICA sessions in this worktree don't silently undo them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a new `pydantic_ai_harness.media` package (MediaStore protocol +
DiskMediaStore / SqliteMediaStore / S3MediaStore) and wires it into the
file/sqlite step-persistence backends so large BinaryContent payloads
get externalized out of snapshot JSON / table rows by default.

Defaults are zero-config: FileStepStore writes blobs under
`<root>/media/<sha256>.bin`; SqliteStepStore writes them to a sibling
`media` table in the same DB. Threshold is 64 KiB and URI scheme is
`media+sha256://<hex>` so blobs are content-addressed across stores.
Pass `media_store=None` to keep bytes inline, or a custom `MediaStore`
to redirect (e.g. `S3MediaStore` for R2 / AWS / MinIO).

S3MediaStore handrolls SigV4 over httpx to avoid a botocore/boto3
dependency. Verified working against Cloudflare R2.

`StepPersistence.from_spec(backend='sqlite', database=...)` now resolves.

180 → 261 tests, 100% branch coverage maintained.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed 0cc038e — addresses @aristide1997's question about other backends + binary content.

New: pydantic_ai_harness.media packageMediaStore protocol + four concrete stores (DiskMediaStore, SqliteMediaStore, S3MediaStore, plus URI helpers). URI scheme is media+sha256://<hex> so blobs are content-addressed across stores (move from disk to S3 = one-line swap, same URIs resolve).

New: SqliteStepStore(database=...) — single-file SQLite backend with runs/events/snapshots/tool_effects/media tables. tool_effects upserts per (run_id, tool_call_id) matching InMemoryStepStore semantics. Snapshots use AUTOINCREMENT seq for the same anti-collision guarantee as FileStepStore._next_snapshot_seq. WAL mode enabled. from_spec(backend='sqlite', database='runs.db') resolves.

Wired into existing stores by default: FileStepStore externalizes BinaryContent ≥ 64 KiB to <root>/media/<sha256>.bin; SqliteStepStore writes blobs to the same DB's media table. Round-trip is transparent — latest_snapshot().messages[*] returns BinaryContent with original bytes. Pass media_store=None to opt out, or pass any MediaStore to redirect.

S3-compatible store works against AWS / R2 / MinIO. Handrolled SigV4 (no botocore/boto3 dep). PUT/GET/HEAD only — multipart, lifecycle, and listing are out of scope for v1. Verified against Cloudflare R2.

# Zero-config — both backends Just Work for media
StepPersistence.from_spec(backend='sqlite', database='runs.db')

# Override the store (e.g. ship media to R2, keep the SQLite ledger local)
from pydantic_ai_harness.media import S3MediaStore
SqliteStepStore(
    database='runs.db',
    media_store=S3MediaStore(
        bucket='...', endpoint='https://<acc>.r2.cloudflarestorage.com',
        region='auto', access_key_id='...', secret_access_key='...',
    ),
)

What's deliberately not in this PR:

  • A MediaExternalizer capability that rewrites BinaryContent → URL parts before the model sees them (wire-payload / token-cost reduction). The MediaStore protocol is the substrate for it; the capability lands separately so it can iterate without breaking SP.
  • A DynamoDB adapter (needs botocore, and we don't dogfood it). README notes how to implement your own.
  • A shared harness.storage adapter layer. Deferred until ≥3 capability stores duplicate the same disk/S3 boilerplate.

180 → 261 tests, 100% branch coverage. Lint + typecheck + CI green. Updated README with the defaults table + override examples + opt-out pattern.

Branch-context skill logged the load-bearing decisions (URI scheme, threshold, sqlite schema, S3 v1 scope, deferred MediaExternalizer).

cc @aristide1997 — does this design land where you needed it?

Adds replay-driven tests under `tests/media/test_s3_cassettes.py` that
exercise `S3MediaStore.put/get/exists` against pre-recorded Cloudflare
R2 responses. CI runs them without any S3 creds via the committed
cassettes under `tests/media/cassettes/`.

Sanitisation policy:

- `before_record_request`/`before_record_response` swap the real R2
  account-id subdomain and bucket name for fixed placeholders
  (`account.r2.cloudflarestorage.com`, `harness-test-bucket`)
- `Authorization` and `x-amz-date` filtered to `REDACTED`
- CF-RAY, x-amz-version-id, x-amz-checksum-*, x-amz-request-id headers
  dropped (none load-bearing for tests; some carry identifying info)
- Non-2xx response bodies blanked (R2's gzipped XML error envelope
  leaks the bucket name; our code only checks status code)

The `s3_credentials` fixture uses `os.environ.get(NAME, PLACEHOLDER)`
per field, so real R2 creds are used when recording locally with `.env`
loaded, and the placeholder constants match the scrubbed cassettes
during replay. Because the placeholders are fixed, any scrubber miss
during a future re-record shows up as a replay URL mismatch — built-in
canary against credential / private-data leakage in committed cassettes.

Adds `pytest-recording` (pulls `vcrpy`) to the dev deps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed a98820f — VCR cassettes for S3MediaStore against R2.

Verified live first. Before recording cassettes I ran a manual smoke against your R2 bucket (PUT → HEAD → GET → bytes-back) to confirm the SigV4 implementation actually works end-to-end. The previous round only had mock-transport tests, which is too weak a signal for a handrolled signer.

Now in CI. Five cassettes under tests/media/cassettes/test_s3_cassettes/ replay real R2 responses for PUT / HEAD-present / HEAD-404 / GET / GET-404. No creds needed at replay time.

Sanitisation:

  • before_record_request / before_record_response rewrite the real account-id subdomain and bucket name to fixed placeholders (account.r2.cloudflarestorage.com, harness-test-bucket) before the cassette hits disk
  • Authorization and x-amz-dateREDACTED
  • Drop noisy/identifying headers: CF-RAY, x-amz-version-id, x-amz-checksum-*, x-amz-request-id, x-amz-id-2
  • Blank non-2xx response bodies (R2 ships its error XML gzipped and that envelope mentions the bucket; we only read status_code on those paths)

The s3_credentials fixture is the leakage canaryos.environ.get(NAME, PLACEHOLDER) per field. With .env loaded → real values flow through → scrubber rewrites cassette. Without env → placeholder values match the committed cassettes. If the scrubber ever misses a field on re-record, replay fails the URL match because the test's URL (built from the placeholder) won't equal the cassette URL (containing the leaked real value). This pattern is reusable wherever the suite wants to keep a value private — proposed it in the branch-context skill too.

To re-record (when SigV4 / store behavior changes):

~/.claude/scripts/env-run .env -- uv run pytest tests/media/test_s3_cassettes.py --record-mode=once

(R2 only accepts region='auto'; the fixture hardcodes it so the S3_REGION=earth in your .env doesn't surface.)

Coverage: still 100% branch (266/266 tests passing, 2 skipped — the original TestS3MediaStoreLive that runs against live R2 outside VCR).

Docs/HTML touched as you asked, not exploded:

  • README (pydantic_ai_harness/step_persistence/README.md) — already had the "Persisting media" section from the previous commit
  • HTML catalog (/Users/david/step-persistence-usages.html) — intro mentions the third backend + media package; section 11 absorbed sqlite + media (no new section); section 12 (from_spec) shows backend='sqlite'

Re your question on blobs without MediaExternalizer: only at-rest externalization is shipped today. The model still receives full BinaryContent bytes (no wire-payload reduction). MediaExternalizer capability — the in-flight rewrite that takes the wire load off the model request — is the planned follow-up; it'll reuse the same MediaStore protocol, so the bottom layer is locked in.

Adds `MediaStore.public_url(uri) -> str | None` plus a `public_url=`
constructor parameter on every concrete store. The parameter accepts a
sync or async callable; the store auto-detects and awaits if needed.

This is the bottom-layer primitive for the forthcoming `MediaExternalizer`
capability — that capability will call `store.public_url(...)` per
externalized blob and swap `BinaryContent` for `ImageUrl` / `AudioUrl`
parts before the model sees the message. The callable shape covers both
static URLs (public bucket / CDN — use `make_static_public_url`
helper) and dynamic URLs (presigned, per-request signing — pass any
async callable with TTL captured in its closure).

Why a callable rather than a static config: a public bucket's URL host
is not derivable from the bucket creds (R2 public buckets use
`pub-<hash>.r2.dev`, AWS public buckets use a different scheme than the
path-style endpoint we sign for). The URL is always user-supplied
information, so a callable is the right primitive — same shape for the
static and presigned cases, and `get` stays untouched (it serves the
harness's internal byte fetch, not the model's external HTTP fetch).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… operations

Adds `MediaContext` (frozen, kw-only dataclass with `media_type`, `filename`,
`metadata`) and threads it through every `MediaStore` method and both user
callables (`PublicUrlResolver`, `KeyStrategy`). New context fields can be
added non-breakingly; existing call sites and resolvers keep working.

Also adds:
- `KeyStrategy = Callable[[str, MediaContext], str]` for per-store layout
  control. Default `default_key_strategy` produces `<sha256>.bin`. Disk store
  validates the result against `..` traversal.
- `metadata` persistence on `SqliteMediaStore` (new JSON column) and
  `S3MediaStore` (signed `x-amz-meta-*` headers, ASCII key validation).
  Disk store explicitly does NOT persist metadata in v1 — sidecar / xattr
  options each have load-bearing drawbacks; we ship nothing rather than a
  half-true persistence promise.
- `make_static_public_url(...)` updated to the new `(uri, ctx)` signature.

The shift is motivated by the same principle as pydantic_ai's
`RunContext`: extension via fields on a context bag rather than via
breaking signature changes. Every new requirement (TTL hints for
presigned URLs, audit ids, response-header overrides, etc.) becomes a
field addition, not an API revision.

Cassettes from the previous commit replay unchanged — match-on does not
include the signed headers and the request URLs are stable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed 92d3997 — devx + extensibility refactor on the media surface.

What changed

  • MediaContext — frozen dataclass (media_type, filename, metadata) threaded through every MediaStore method and both user callables. Future fields (TTL, response headers, audit ids, …) are added non-breakingly. Same shape pyai uses with RunContext.
  • key_strategy callable on DiskMediaStore / SqliteMediaStore / S3MediaStore — controls the backend storage path. Default is <sha256>.bin. Disk validates against .. traversal. Documents the read-time caveat for context-dependent strategies.
  • Metadata persistence — Sqlite (new JSON column) + S3 (signed x-amz-meta-*, ASCII-key-validated). Disk explicitly does not persist in v1 — xattr / sidecar each have load-bearing drawbacks; we ship nothing rather than a half-true promise.
  • PublicUrlResolver signature is now (uri, MediaContext) → str | None (sync or async). make_static_public_url ignores context; user resolvers can vary URLs by media_type etc.

What this implies for the MediaExternalizer follow-up

Issue #254 is refreshed to reflect the new bottom layer:

  • All hooks the capability needs (MediaStore.put/get/public_url) take MediaContext
  • The capability passes MediaContext(media_type=part.media_type, filename=part.filename) per request — no awkward parameter threading
  • Three modalities: bytes / URL / UploadedFile (the third you flagged — pyai.UploadedFile is in v2-main, supported by OpenAI / Anthropic / Google / Bedrock / Xai)
  • UploadRegistry is its own protocol (separate concern from MediaStore) — content-hash × provider_name → file_id

Quality bar

  • 100% branch coverage (3872 stmts, 380 branches)
  • Lint + typecheck green
  • Cassettes from earlier commit replay unchanged — the only headers that vary (Authorization, x-amz-date, new signed metadata headers) are filtered from the match-on set, and request URLs / bodies are stable

Files touched

pydantic_ai_harness/media/_store.py    (MediaContext + KeyStrategy + Disk/Sqlite refactor)
pydantic_ai_harness/media/_s3.py        (key_strategy + metadata headers + signer extra-headers)
pydantic_ai_harness/media/_walker.py    (pass MediaContext to put)
pydantic_ai_harness/media/__init__.py   (exports)
pydantic_ai_harness/step_persistence/README.md  (MediaContext + key_strategy sections)
tests/media/test_media.py               (new tests for key_strategy, traversal, metadata)
tests/media/test_s3_cassettes.py        (use MediaContext)

Branch-context skill logged three new decisions (MediaContext pattern, key_strategy callable, metadata persistence boundary).

Issue #254 stays the consolidated home for the MediaExternalizer capability spec.

@dsfaccini
Copy link
Copy Markdown
Contributor Author

hey @aristide1997 that was a good question, I included some storage backends and the ability to roundtrip media automatically. doing my best here to get a good abstraction in. would you feel confident using this branch in your workflows and providing feedback while we release it? I'll merge it when I've done a thorough final review but I'll personally be using it for a while before we actually release it anyway.

…ategy

Adds `MediaStore.get_metadata(uri) -> Mapping[str, str]` to the protocol
and implements it on all three concrete stores:

- `DiskMediaStore`: writes a sidecar `<resolved>.meta.json` alongside
  the blob on put (atomic tmp + rename), reads it back on
  `get_metadata`. Returns `{}` when no metadata was supplied. v1 had
  documented this as a deliberate gap — sidecar JSON is straightforward
  and the xattr / ADS drawbacks don't apply.
- `SqliteMediaStore`: `SELECT metadata FROM <table> WHERE sha256=?` +
  `json.loads`. Raises `FileNotFoundError` for unknown URIs.
- `S3MediaStore`: HEAD + collects `x-amz-meta-*` response headers,
  strips the prefix. Reuses the existing 404 / non-2xx error shape.

Drops `key_strategy=` from `SqliteMediaStore`. The digest is the primary
key by content-addressing construction — a user-chosen key would either
break dedup or be a no-op. Kept on Disk + S3 where bucket / directory
layout is a real concern.

README + branch-context entries updated to reflect: all three stores
round-trip metadata; key_strategy is Disk + S3 only.

Coverage stays at 100% branch.
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: post-merge-prep bloat review trimmed 92d3997 and closed the compatibility gaps it had left open.

Closed (277eeaa):

  • MediaStore.get_metadata(uri) -> Mapping[str, str] on the protocol
  • DiskMediaStore writes sidecar <resolved>.meta.json atomically on put, reads it back on get_metadata (xattr drawbacks don't apply to sidecars — was an honest gap, not a constraint)
  • SqliteMediaStore.get_metadata reads the JSON column
  • S3MediaStore.get_metadata HEAD + collects x-amz-meta-* response headers
  • Round-trip tests across all three stores

Dropped:

  • key_strategy= parameter on SqliteMediaStore. Vestigial — the digest is the primary key by content-addressing construction, so a user-chosen key either breaks dedup or is a no-op. Kept on Disk + S3 where bucket / directory layout is a real concern.

README + branch-context decision entries updated. Coverage stays at 100% branch; cassettes replay unchanged (0.13s).

PR is ready to merge from my end.

@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: integration review from pydanty after wiring this branch into pydantic.work + the managed dogfooding agents.

Verdict: #251 is the right replacement for #176 for pydanty's failure mode. Step events + provider-valid snapshots during the run solve the timeout-before-after-run gap, and the identity model now matches pydantic-ai well: run_id per Agent.run, conversation_id for multi-turn grouping, parent_run_id for delegate lineage.

One non-blocking implementation note I found while reading the current branch: DiskMediaStore._path_for() rejects .. in a custom key_strategy, but should also reject absolute paths. In Python, root / absolute_path discards root, so a malicious or buggy strategy can escape the store directory even without ... Not blocking pydanty because pydanty uses SqliteStepStore, but worth fixing before/after merge.

Pydanty-side integration notes I am handling in pydantic.work / pydanty-dogfooding:

  • The managed agents correctly use SqliteStepStore(database=artifact_dir("step-persistence") / "runs.db") via dogfooding_step_persistence_capability(...).
  • The team worker exposes that DB as a runtime artifact, but the current Dogfooding artifact content endpoint reads sandbox files as UTF-8 text. I am adding binary-safe artifact serving so the SQLite DB is actually retrievable from the UI.
  • I am tightening the runtime dependency probe so pydanty reinstalls when the installed harness lacks StepPersistence / SqliteStepStore, not merely when import pydantic_ai_harness works.

After those land, the remaining product-level step is to actually use continue_run(...) / shared conversation_id for delegate respawn. The current wiring records enough evidence to inspect and recover; it does not by itself stop repeated delegates from rediscovering context.

@aristide1997
Copy link
Copy Markdown

Thanks for adding the S3 store for binary content, that would undoubtedly come in very handy for chat applications.

Regarding supporting any sort of cloud/database storage for text history though: is that something you’d ever consider adding as first-party (e.g. psql), or something you’d want devs to implement themselves?

I’m happy to give this a shot if you think the surface is not going to change, is it something you’re not thinking of shipping soon? @dsfaccini

@dsfaccini
Copy link
Copy Markdown
Contributor Author

hey @aristide1997 , so: postgres and any other RDBMS should be simple enough to use with the Sqlite one (maybe I can do another iteration to abstract that enough that people can simply pass in their engine (sqlalchemy), I just really wanted to avoid extra deps, so people may need to install that separately)

on dynamodb (more broadly non-sql storage backends), I don't use them so I don't feel confident implementing it without either knowing what I'm doing nor actually battle testing it in my own usage. I may have claude include an example snippet for people to create their own NoSql storage adapter

@aristide1997
Copy link
Copy Markdown

Hi @dsfaccini

That makes sense, DynamoDB has quirks that make it slightly harder to implement, like a 400KB snapshot ceiling. What about using S3 as the whole backend, like AWS’s Strands Agents does? Cheap storage, very little infra to manage.

https://github.com/strands-agents/sdk-python/blob/main/src/strands/session/s3_session_manager.py

@dsfaccini
Copy link
Copy Markdown
Contributor Author

hey @aristide1997 I already added an s3 backed media externalizer so having an s3 storage is no problem at all lol, let me get back to you

`Path(root) / absolute_path` returns `absolute_path` — the root is
silently discarded — so a custom `key_strategy` returning `/etc/passwd`
(or similar) escapes the store directory even though the previous check
only blocked `..`. Tighten the validator to reject both shapes.

Caught by pydanty during its #251 integration review.
@dsfaccini
Copy link
Copy Markdown
Contributor Author

David's AICA here: pushed 3dafb96 for the absolute-path issue pydanty flagged.

Path(root) / absolute_path silently discards root in Python — verified locally that Path('/tmp/root') / '/etc/passwd' returns Path('/etc/passwd'). The previous ..-only check missed that shape. Tightened the validator to reject both is_absolute() and .. segments, added a regression test for the absolute case, updated the DiskMediaStore docstring to spell out both constraints.

Still 100% branch coverage. Doesn't affect SqliteStepStore users since SQLite goes through the DB path, but worth fixing for anyone configuring a custom key_strategy on DiskMediaStore.

David SF and others added 4 commits June 2, 2026 00:17
# Conflicts:
#	pydantic_ai_harness/__init__.py
#	pyproject.toml
#	uv.lock
…ad-affinity

The terminal CallToolsNode already saves the final provider-valid snapshot
with the correct step_index. after_run was re-saving the same tail stamped
with step_index=0 (ctx.run_step is reset by then), so latest_snapshot
reported a misleading step and every run wrote a duplicate. Track whether a
node snapshot was taken via a task-local ContextVar and make after_run a
fallback that only fires when the run reached no provider-valid boundary.

Also document that a caller-owned sqlite connection= must set
check_same_thread=False (store SQL runs on anyio worker threads), on both
SqliteStepStore and SqliteMediaStore, and correct the WAL-on-every-connection
claim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…nown fields

S3MediaStore signed `_canonical_uri(path)` (each segment percent-encoded) but
sent the raw path, letting httpx apply looser encoding. A custom key_prefix /
key_strategy emitting reserved chars (`@`, `(`, `=`, ...) diverged from the
signed path -> SignatureDoesNotMatch. Send the canonical bytes via httpx
`raw_path` so signer and sender agree. Default `<hex>.bin` keys are unaffected.

The externalize/restore walker hardcoded the BinaryContent key set, silently
dropping any field pydantic_ai adds upstream. Copy the node and swap only
`data` <-> marker keys so unknown fields round-trip. Adds tests for reserved-char
path agreement, unknown-field preservation, and restore over a pruned blob.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts:
#	pydantic_ai_harness/__init__.py
#	uv.lock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants