feat: input and output guardrails (block, redact, retry)#249
feat: input and output guardrails (block, redact, retry)#249dsfaccini wants to merge 17 commits into
Conversation
block_message now accepts a callable so the refusal text can reflect the prompt/output that tripped the guard, rather than being frozen at construction time. InputGuard's sequential path moves from before_model_request into wrap_model_request, so a single hook covers both sequential and parallel modes instead of two hooks each branching on `parallel`. Tests move to tests/guardrails/ to match the tests/<capability>/ layout.
A guard now returns either a bare bool or a GuardResult carrying a refusal message, replacing the separate block_message constructor field. The message is produced when the guard decides, so it can reflect the guard's own reasoning rather than a string frozen at construction time. Guards (and the GuardResult path) may optionally take a RunContext as a first parameter, detected from the signature like pydantic-ai's output validators, so deps- and history-aware guards are possible without closing over globals. Prompt/output-only guards are unchanged.
A guard now reports one of four outcomes via GuardResult classmethods (bool shorthand still works): allow, block, replace, retry. - replace lets a guard redact rather than refuse — InputGuard rewrites the prompt sent to the model, OutputGuard substitutes the output. - retry lets OutputGuard send a bad output back to the model; OutputGuard moves from after_run to after_output_process so it can raise ModelRetry and return a modified output. - replace and block emit spans on the run tracer so a redaction or refusal is visible in Logfire; redacted content is included only when RunContext.trace_include_content is set. InputGuard replace requires sequential mode and retry is rejected as a usage error, since neither is meaningful for input.
The coverage gate measures test files too; the _prompt_text helper had unreached branches. Drop it and assert on message parts inline.
A pydantic-ai-correctness review surfaced two streaming points. Verified both empirically: - InputGuard(parallel=True) works under run_stream() — no deadlock. - OutputGuard GuardResult.retry() is unsupported under run_stream(): pydantic-ai does not retry output while streaming, so a retry verdict surfaces as UnexpectedModelBehavior. Document the retry limitation and that OutputGuard screens only the final output (partial chunks reach the caller first while streaming). Note that input redaction also rewrites persisted history and targets text prompts. Add streaming tests for both guards to lock the behavior in.
The test proving InputGuard(parallel=True) does not deadlock under run_stream() had no timeout — a reintroduced deadlock would hang CI instead of failing. Wrap it in asyncio.wait_for and document the reviewed concern it guards against.
| """ | ||
|
|
||
|
|
||
| def _takes_ctx(func: Callable[..., object]) -> bool: |
There was a problem hiding this comment.
We could import this from pydantic-ai although private we run the suite so we won't break harness
There was a problem hiding this comment.
Claude here: pydantic-ai does not expose a shared takes_ctx helper — _output.py::OutputValidator and _system_prompt.py each inline the same len(inspect.signature(...).parameters) > N check. Our _takes_ctx mirrors that convention exactly. Happy to swap if a public helper lands.
There was a problem hiding this comment.
Oh yeah it doesn't we'll need to import from a private module which I am fine with in this case given it is our private module
|
|
||
|
|
||
| GuardOutcome = bool | GuardResult | ||
| """What a guard callable returns: a bare `bool` (`True` = allow), or a `GuardResult`.""" |
There was a problem hiding this comment.
Public docstrings could take a look, it could be simpler
| outcome = guard(ctx, value) if _takes_ctx(guard) else guard(value) | ||
| if inspect.isawaitable(outcome): | ||
| outcome = await outcome | ||
| if isinstance(outcome, GuardResult): |
There was a problem hiding this comment.
Isn't this then enforced based on the type?
There was a problem hiding this comment.
Claude here: the type annotation is just a hint — runtime discrimination of the bool | GuardResult union needs the isinstance check to narrow. Python doesn't enforce union types at runtime.
| original prompt. | ||
|
|
||
| Scope: the guard runs exactly once per run — on the first model request — | ||
| and evaluates the original user prompt. Subsequent model requests in the |
There was a problem hiding this comment.
Claude has become increasingly verbose. I am sure it could explain the same thing in fewer words. I dunno if they wanna increase their token costs with this ://
| raise UserError('InputGuard could not find a user prompt to redact in the request.') | ||
| _trace_redaction(ctx, direction='input', original=prompt, replacement=replacement) | ||
|
|
||
| async def wrap_model_request( |
There was a problem hiding this comment.
I see something tricky here, I think the position for this capability needs to be innermost so that no user morphed messages can sneak by and we can be confident this runs only after all the other capabilities are done.
OutputGuard similarly would need to be outermost but there are outermost capabilities already so it needs to be wrapped by instrumentation or something? Look at deferred caps PR(pydantic/pydantic-ai#5230) for reference, ended up doing something similar without breaking instrumentation
There was a problem hiding this comment.
Claude here: applied. InputGuard.get_ordering() → position='innermost' so any message-morphing capability runs first and the guard sees the final prompt. OutputGuard.get_ordering() → position='outermost', wrapped_by=[Instrumentation] so the guard's block/redact spans are always captured by an enclosing Instrumentation span regardless of user list order. Tests added (TestInputGuardOrdering, TestOutputGuardOrdering).
| if ctx.partial_output: | ||
| return output | ||
| verdict = await _evaluate(self.guard, ctx, output) | ||
| if verdict.action == 'allow': |
There was a problem hiding this comment.
Can we use match instead?
Also, there are multiple if conditions, but this is a verdict, right? More than one cannot be true.
| """Record a zero-duration span marking a guardrail refusal.""" | ||
| ctx.tracer.start_span( | ||
| f'guardrail blocked {direction}', | ||
| attributes={'guardrail.direction': direction, 'guardrail.action': 'block', 'guardrail.message': message}, |
There was a problem hiding this comment.
Is it all right for us to dump the message as is? Is there a risk that the LLM explains in the message why it blocked it and ends up leaking it anyway?
There was a problem hiding this comment.
Claude here: applied — _trace_block now gates guardrail.message behind RunContext.trace_include_content, matching how _trace_redaction handles content. Default traces stay content-free; debugging can opt in via include_content. The user-facing refusal text (the SkipModelRequest response / OutputBlocked exception) is unchanged.
| _DEFAULT_OUTPUT_RETRY_MESSAGE = 'Output rejected by output guardrail.' | ||
|
|
||
|
|
||
| @dataclass |
There was a problem hiding this comment.
Let us use frozen and kw_only
|
|
||
| replacement: object | None = None | ||
| """For `replace`, the value substituted for the inspected one.""" | ||
|
|
There was a problem hiding this comment.
We should do a __post__init to verify the integrity of this result
| finally: | ||
| for task in (guard_task, handler_task): | ||
| if not task.done(): | ||
| task.cancel() |
There was a problem hiding this comment.
Worth letting Claude check if this needs a shield, anyio level cancellations from the graph can seep in and throw cancellations here which would crash the cancellation. I am still wrapping my head around it so I am not sure if that is 100% possible
There was a problem hiding this comment.
Claude here: investigated with a regression test (test_no_dangling_tasks_on_outer_cancellation) that cancels the outer task mid-flight and asserts no leftover guard/handler tasks. It passes without asyncio.shield — the outer cancel is consumed by the asyncio.wait above the finally, so task.cancel() + await asyncio.gather(...) complete cleanly. Adding shield would be defensive but could mask a genuine re-cancellation; the test locks the current behavior in.
| if verdict.action == 'block': | ||
| message = verdict.message or _DEFAULT_INPUT_BLOCK_MESSAGE | ||
| _trace_block(ctx, direction='input', message=message) | ||
| raise SkipModelRequest(ModelResponse(parts=[TextPart(content=message)])) |
There was a problem hiding this comment.
If we do this then the message remains in history and the model can see it in the next turn because we only go through the messages and pick the last prompt for InputGuard?
There was a problem hiding this comment.
Claude here: this is intended SkipModelRequest behavior in pydantic-ai — the canned ModelResponse is the assistant's turn, and a multi-turn chat that skips it would be broken. Per the SkipModelRequest docstring, the response is preserved in history. Callers that don't want the refusal carried forward can filter result.all_messages() or start a fresh agent for the next turn.
adtyavrdhn
left a comment
There was a problem hiding this comment.
Structurally great, have a few concerns.
Address @adtyavrdhn's review on #249. - `InputGuard.get_ordering()` → `position='innermost'` so any message-morphing capability runs first and the guard sees the final prompt the model will receive. - `OutputGuard.get_ordering()` → `position='outermost', wrapped_by= [Instrumentation]` so the guard's block/redact spans are always captured by an enclosing `Instrumentation` span regardless of user list order. - `GuardResult` is `frozen=True, kw_only=True` with a `__post_init__` that rejects field combinations the four-outcome contract does not allow (e.g. `replace` without a replacement). `block` with no message stays valid — the default kicks in at the use site. - `_run_guard` and `after_output_process` dispatch via `match action:` with `assert_never` exhaustiveness guards. - `_trace_block` gates the refusal `message` attribute behind `ctx.trace_include_content`, matching `_trace_redaction` — the message can quote sensitive content from the guarded value. - New tests: ordering declarations, `__post_init__` validation, frozen enforcement, outer-cancellation no-leak regression guard (which confirms `asyncio.shield` around the cleanup `gather` is not needed — the outer cancel is already consumed by `asyncio.wait`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds two guardrail capabilities with a minimal, callable-based API.
Supersedes #219. Original work by @DEENUU1 (Vstorm) — this PR is #219 plus a
mainmerge and a review-driven redesign, opened from a branch on this repo so it can land while the original fork branch is unreachable for push. Full credit to @DEENUU1; review input from @Kludex and @adtyavrdhn carried over.What it does
InputGuard(guard, parallel=False)— runs before the first model request.OutputGuard(guard)— runs as the model output is processed (after_output_process).A
guardis any sync/async callable. It receives the inspected value — the prompt, or the output — optionally preceded by aRunContext(signature-detected, like pydantic-ai's output validators). It returns a barebool(True= allow) or aGuardResult.Guard outcomes
GuardResult, built via classmethods:InputGuardOutputGuardallow()block(message=None)SkipModelRequest)OutputBlockedreplace(value)retry(message)ModelRetry)A guard that raises propagates the exception as a hard failure.
Observability
replaceandblockemit spans on the run's OpenTelemetry tracer (guardrail redacted input,guardrail blocked output, …) withguardrail.*attributes, so redactions and refusals are visible in Logfire. Redacted content is attached only whenRunContext.trace_include_contentis set.retryneeds no special tracing — the retried request appears in the trace on its own.Notable design points
OutputGuardusesafter_output_process(notafter_run) so it can redact and triggerModelRetry; it runs on the final output only, not streaming partials.InputGuardreplacerequires sequential mode (a parallel guard races a model call already started with the original prompt);retryis rejected for input.parallel=Truetrades tokens for latency — sequential never calls the model on a blocked prompt.Follow-ups
Prepackaged LLM-based guardrails and Presidio/Azure/OpenAI moderation docs are tracked in #248.
Checks
make format && make lint && make typecheckclean. Guardrails: 49 tests, 100% branch coverage; full suite green.