Skip to content

pydantic_ai.capabilities

ToolSearchFunc module-attribute

ToolSearchFunc = Callable[
    [
        RunContext[AgentDepsT],
        Sequence[str],
        Sequence["ToolDefinition"],
    ],
    Sequence[str] | Awaitable[Sequence[str]],
]

Custom search function for ToolSearch's strategy field.

Takes the run context, the list of search queries, and the deferred tool definitions, and returns the matching tool names ordered by relevance. Both sync and async implementations are accepted.

Usage ToolSearchFunc[AgentDepsT].

ToolSearchLocalStrategy module-attribute

ToolSearchLocalStrategy = Literal['keywords']

Named local tool search strategy.

'keywords' opts into the built-in keyword-overlap algorithm explicitly — use this to lock in the current local algorithm rather than the None default (which lets Pydantic AI pick the best algorithm per provider and may change over time).

Future local strategies (e.g. local BM25, TF-IDF, regex) will join this Literal as they're added; the single-member shape today is forward-compat scaffolding.

ToolSearchNativeStrategy module-attribute

ToolSearchNativeStrategy = Literal['bm25', 'regex']

Named provider-native tool search strategy.

'bm25' and 'regex' correspond to Anthropic's server-side tool search variants. OpenAI's Responses API does not expose distinct named native strategies, so these values are rejected by the OpenAI adapter.

ToolSearchStrategy module-attribute

Strategy value accepted by ToolSearch.strategy.

  • 'keywords': force the local keyword-overlap algorithm regardless of provider.
  • 'bm25' / 'regex': force a specific provider-native strategy (Anthropic). The request fails on providers that can't honor the choice.
  • Callable (ctx, queries, tools) -> names: custom search function. Used locally, and also by the native "client-executed" surface on providers that support it (Anthropic custom tool-reference blocks, OpenAI ToolSearchToolParam(execution='client')).

None is not part of the union — it's accepted as the default on the ToolSearch.strategy field and means "let Pydantic AI pick"; see that field's docstring for details.

OutputContext dataclass

Context about the output being processed, passed to output hooks.

Source code in pydantic_ai_slim/pydantic_ai/output.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@dataclass
class OutputContext:
    """Context about the output being processed, passed to output hooks."""

    mode: OutputMode
    """The schema's output mode ('text', 'native', 'prompted', 'tool', 'image', 'auto').

    This reflects the configured schema, not the format of this particular response. For
    example, a `ToolOutputSchema` with a `text_processor` (hybrid mode) reports `'tool'`
    even if the model returned text — check [`tool_call`][pydantic_ai.output.OutputContext.tool_call]
    to distinguish."""
    output_type: type[Any] | None
    """The resolved output type (e.g. MyModel, str). For output functions, the function's input type (what the model produces)."""
    object_def: OutputObjectDefinition | None
    """The output object definition (schema, name, description), if structured output."""
    has_function: bool
    """Whether there's an output function to call in the execute step."""
    function_name: str | None = None
    """Name of the output function that will run, when known. `None` for union processors that dispatch
    by output subtype, or when the schema has no function."""
    tool_call: ToolCallPart | None = None
    """The tool call part, for tool-based output. `None` when the current output did not arrive via a tool call (text or image)."""
    tool_def: ToolDefinition | None = None
    """The tool definition, for tool-based output. `None` when the current output did not arrive via a tool call."""
    allows_text: bool = False
    """Whether the schema accepts text output (including via a `text_processor` on a `ToolOutputSchema`)."""
    allows_image: bool = False
    """Whether the schema accepts image output."""
    allows_deferred_tools: bool = False
    """Whether the schema accepts deferred tool requests as output."""

mode instance-attribute

mode: OutputMode

The schema's output mode ('text', 'native', 'prompted', 'tool', 'image', 'auto').

This reflects the configured schema, not the format of this particular response. For example, a ToolOutputSchema with a text_processor (hybrid mode) reports 'tool' even if the model returned text — check tool_call to distinguish.

output_type instance-attribute

output_type: type[Any] | None

The resolved output type (e.g. MyModel, str). For output functions, the function's input type (what the model produces).

object_def instance-attribute

object_def: OutputObjectDefinition | None

The output object definition (schema, name, description), if structured output.

has_function instance-attribute

has_function: bool

Whether there's an output function to call in the execute step.

function_name class-attribute instance-attribute

function_name: str | None = None

Name of the output function that will run, when known. None for union processors that dispatch by output subtype, or when the schema has no function.

tool_call class-attribute instance-attribute

tool_call: ToolCallPart | None = None

The tool call part, for tool-based output. None when the current output did not arrive via a tool call (text or image).

tool_def class-attribute instance-attribute

tool_def: ToolDefinition | None = None

The tool definition, for tool-based output. None when the current output did not arrive via a tool call.

allows_text class-attribute instance-attribute

allows_text: bool = False

Whether the schema accepts text output (including via a text_processor on a ToolOutputSchema).

allows_image class-attribute instance-attribute

allows_image: bool = False

Whether the schema accepts image output.

allows_deferred_tools class-attribute instance-attribute

allows_deferred_tools: bool = False

Whether the schema accepts deferred tool requests as output.

CapabilityFunc module-attribute

A sync/async function which takes a run context and returns a capability.

DynamicCapability dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that builds another capability dynamically using a function that takes the run context.

The factory is called once per agent run from for_run. The returned capability replaces this wrapper for the rest of the run, so its instructions, model settings, toolset, native tools, and hooks all flow through normally.

Pass a CapabilityFunc directly to Agent(capabilities=[...]) or agent.run(capabilities=[...]) and it will be wrapped in a DynamicCapability automatically.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/_dynamic.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class DynamicCapability(AbstractCapability[AgentDepsT]):
    """A capability that builds another capability dynamically using a function that takes the run context.

    The factory is called once per agent run from
    [`for_run`][pydantic_ai.capabilities.AbstractCapability.for_run]. The returned
    capability replaces this wrapper for the rest of the run, so its
    instructions, model settings, toolset, native tools, and hooks all flow
    through normally.

    Pass a [`CapabilityFunc`][pydantic_ai.capabilities.CapabilityFunc] directly
    to `Agent(capabilities=[...])` or `agent.run(capabilities=[...])` and it
    will be wrapped in a `DynamicCapability` automatically.
    """

    capability_func: CapabilityFunc[AgentDepsT]
    """The function that takes the run context and returns a capability or `None`."""

    async def for_run(self, ctx: RunContext[AgentDepsT]) -> AbstractCapability[AgentDepsT]:
        capability = self.capability_func(ctx)
        if inspect.isawaitable(capability):
            capability = await capability
        if capability is None:
            return self
        return await capability.for_run(ctx)

capability_func instance-attribute

capability_func: CapabilityFunc[AgentDepsT]

The function that takes the run context and returns a capability or None.

ToolSearch dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that provides tool discovery for large toolsets.

Tools marked with defer_loading=True are hidden from the model until discovered. Auto-injected into every agent — zero overhead when no deferred tools exist.

When the model supports native tool search (Anthropic BM25/regex, OpenAI Responses), discovery is handled by the provider: the deferred tools are sent with defer_loading on the wire and the provider exposes them once they've been discovered. Otherwise, discovery happens locally via a search_tools function that the model can call.

On providers that support a native "client-executed" surface (Anthropic, OpenAI), the discovery message is delivered append-only — prompt cache is preserved across discovery turns, so growing the message history with discovered-tool results does not invalidate the cached prefix.

from collections.abc import Sequence

from pydantic_ai import Agent, RunContext, Tool
from pydantic_ai.capabilities import ToolSearch
from pydantic_ai.tools import ToolDefinition


# Tools become deferred via `defer_loading=True`. They stay hidden from the model
# until tool search discovers them.
def get_weather(city: str) -> str:
    ...


weather_tool = Tool(get_weather, defer_loading=True)

# Default: native search on supporting providers, local keyword matching elsewhere.
agent = Agent('anthropic:claude-sonnet-4-6', tools=[weather_tool], capabilities=[ToolSearch()])

# Force a specific Anthropic native strategy; errors on providers that can't honor it.
agent = Agent(
    'anthropic:claude-sonnet-4-6',
    tools=[weather_tool],
    capabilities=[ToolSearch(strategy='regex')],
)

# Always run the local keyword-overlap algorithm, regardless of provider.
agent = Agent(
    'anthropic:claude-sonnet-4-6',
    tools=[weather_tool],
    capabilities=[ToolSearch(strategy='keywords')],
)

# Custom search function — used locally, and by provider-native "client-executed"
# modes when supported.
def my_search(
    ctx: RunContext, queries: Sequence[str], tools: Sequence[ToolDefinition]
) -> list[str]:
    return [
        t.name
        for t in tools
        if any(q.lower() in (t.description or '').lower() for q in queries)
    ]

agent = Agent(
    'anthropic:claude-sonnet-4-6',
    tools=[weather_tool],
    capabilities=[ToolSearch(strategy=my_search)],
)
Source code in pydantic_ai_slim/pydantic_ai/capabilities/_tool_search.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@dataclass
class ToolSearch(AbstractCapability[AgentDepsT]):
    """Capability that provides tool discovery for large toolsets.

    Tools marked with `defer_loading=True` are hidden from the model until discovered.
    Auto-injected into every agent — zero overhead when no deferred tools exist.

    When the model supports native tool search (Anthropic BM25/regex, OpenAI Responses),
    discovery is handled by the provider: the deferred tools are sent with `defer_loading`
    on the wire and the provider exposes them once they've been discovered. Otherwise,
    discovery happens locally via a `search_tools` function that the model can call.

    On providers that support a native "client-executed" surface (Anthropic, OpenAI),
    the discovery message is delivered append-only — prompt cache is preserved across
    discovery turns, so growing the message history with discovered-tool results does
    not invalidate the cached prefix.

    ```python
    from collections.abc import Sequence

    from pydantic_ai import Agent, RunContext, Tool
    from pydantic_ai.capabilities import ToolSearch
    from pydantic_ai.tools import ToolDefinition


    # Tools become deferred via `defer_loading=True`. They stay hidden from the model
    # until tool search discovers them.
    def get_weather(city: str) -> str:
        ...


    weather_tool = Tool(get_weather, defer_loading=True)

    # Default: native search on supporting providers, local keyword matching elsewhere.
    agent = Agent('anthropic:claude-sonnet-4-6', tools=[weather_tool], capabilities=[ToolSearch()])

    # Force a specific Anthropic native strategy; errors on providers that can't honor it.
    agent = Agent(
        'anthropic:claude-sonnet-4-6',
        tools=[weather_tool],
        capabilities=[ToolSearch(strategy='regex')],
    )

    # Always run the local keyword-overlap algorithm, regardless of provider.
    agent = Agent(
        'anthropic:claude-sonnet-4-6',
        tools=[weather_tool],
        capabilities=[ToolSearch(strategy='keywords')],
    )

    # Custom search function — used locally, and by provider-native "client-executed"
    # modes when supported.
    def my_search(
        ctx: RunContext, queries: Sequence[str], tools: Sequence[ToolDefinition]
    ) -> list[str]:
        return [
            t.name
            for t in tools
            if any(q.lower() in (t.description or '').lower() for q in queries)
        ]

    agent = Agent(
        'anthropic:claude-sonnet-4-6',
        tools=[weather_tool],
        capabilities=[ToolSearch(strategy=my_search)],
    )
    ```
    """

    strategy: ToolSearchStrategy[AgentDepsT] | None = None
    """The search strategy to use.

    * `None` (default): let Pydantic AI pick the best strategy for the current provider
      — native on supporting models (Anthropic BM25, OpenAI server-executed tool search),
      local keyword matching elsewhere. The choice may change in future versions.
    * `'keywords'`: always use the local keyword-overlap algorithm. Still prompt-cache
      compatible on providers that expose a "client-executed" native surface (Anthropic,
      OpenAI): the algorithm rides the same `defer_loading` wire as a custom callable,
      so the tool list stays stable across discovery rounds and the cached prefix is
      preserved.
    * `'bm25'` / `'regex'`: force a specific Anthropic native strategy. Raises on
      providers that can't honor the choice (including OpenAI, which has no named
      native strategies).
    * Callable `(ctx, queries, tools) -> names`: custom search function (sync or async).
      Used locally, and by the native "client-executed" surface on providers that support
      it (Anthropic custom tool-reference blocks, OpenAI `execution='client'`).
    """

    max_results: int = 10
    """Maximum number of matches returned by the local search algorithm."""

    tool_description: str | None = None
    """Custom description for the local `search_tools` function shown to the model."""

    parameter_description: str | None = None
    """Custom description for the `queries` parameter on the local `search_tools` function."""

    _search_fn: ToolSearchFunc[AgentDepsT] | None = field(init=False, repr=False, default=None)

    def __post_init__(self) -> None:
        # `'keywords'` and a callable strategy both run their algorithm on our side and
        # both engage the provider's "client-executed" native mode where supported, so
        # they share a `_search_fn` that the toolset routes through `_run_search_fn`.
        # The named strategies `'bm25'` / `'regex'` only take effect server-side
        # (Anthropic) — no local implementation today — and `None` falls through to
        # the toolset's default keyword-overlap algorithm.
        if self.strategy == 'keywords':
            self._search_fn = keywords_search_fn
        elif callable(self.strategy):
            self._search_fn = self.strategy
        else:
            self._search_fn = None

    def get_ordering(self) -> CapabilityOrdering:
        return CapabilityOrdering(position='outermost')

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        # `'keywords'` and a callable strategy both register the `'custom'` builtin so
        # the provider's "client-executed" native mode engages where supported (cache
        # benefit on Anthropic and OpenAI), and silently fall back to the local
        # `search_tools` function tool elsewhere via `optional=True`. Same dispatch
        # path differs only in *which* algorithm runs as `_search_fn`.
        if self.strategy == 'keywords' or callable(self.strategy):
            return [ToolSearchTool(strategy='custom', optional=True)]
        # `None` means "pick the best native option available, otherwise fall back
        # locally" — `optional=True` so the swap silently falls back on unsupported
        # models.
        elif self.strategy is None:
            return [ToolSearchTool(optional=True)]
        # Explicit named native strategy (`'bm25'` / `'regex'`). The user committed
        # to a specific algorithm, so `optional=False`: if the model can't honor it,
        # the request must error rather than silently substitute a different algorithm.
        #
        # Assumes no local implementation of bm25/regex exists — if we ever port either
        # to Python, the strategy should join the `'keywords'` branch above so models
        # without native support can still honor the choice via the local path.
        else:
            named: ToolSearchNativeStrategy = self.strategy
            return [ToolSearchTool(strategy=named, optional=False)]

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
        # For explicit named native strategies (`'bm25'` / `'regex'`) the
        # `ToolSearchTool` builtin is registered with `optional=False` (see
        # `get_native_tools` above), so `prepare_request` will raise on a model
        # without native support. To make that raise actually fire — and to avoid
        # emitting a redundant `search_tools` function tool alongside the native
        # builtin on supported providers — the toolset must NOT emit the local
        # `search_tools` function at all in this mode. We signal that via
        # `enable_fallback=False` for the named-native strategies; `None`,
        # `'keywords'`, and callable strategies all have a real local
        # implementation and keep `search_tools` wired up.
        #
        # Always wrap with `ToolSearchToolset` so the deferred corpus is exposed
        # via the per-tool `with_native='tool_search'` flag — the wrapper toolset
        # is what teaches `_resolve_builtin_tool_swap` which function tools belong
        # to the tool-search corpus, regardless of whether `search_tools` itself is
        # emitted.
        return ToolSearchToolset(
            wrapped=toolset,
            search_fn=self._search_fn,
            max_results=self.max_results,
            tool_description=self.tool_description,
            parameter_description=self.parameter_description,
            enable_fallback=self.strategy not in ('bm25', 'regex'),
        )

strategy class-attribute instance-attribute

strategy: ToolSearchStrategy[AgentDepsT] | None = None

The search strategy to use.

  • None (default): let Pydantic AI pick the best strategy for the current provider — native on supporting models (Anthropic BM25, OpenAI server-executed tool search), local keyword matching elsewhere. The choice may change in future versions.
  • 'keywords': always use the local keyword-overlap algorithm. Still prompt-cache compatible on providers that expose a "client-executed" native surface (Anthropic, OpenAI): the algorithm rides the same defer_loading wire as a custom callable, so the tool list stays stable across discovery rounds and the cached prefix is preserved.
  • 'bm25' / 'regex': force a specific Anthropic native strategy. Raises on providers that can't honor the choice (including OpenAI, which has no named native strategies).
  • Callable (ctx, queries, tools) -> names: custom search function (sync or async). Used locally, and by the native "client-executed" surface on providers that support it (Anthropic custom tool-reference blocks, OpenAI execution='client').

max_results class-attribute instance-attribute

max_results: int = 10

Maximum number of matches returned by the local search algorithm.

tool_description class-attribute instance-attribute

tool_description: str | None = None

Custom description for the local search_tools function shown to the model.

parameter_description class-attribute instance-attribute

parameter_description: str | None = None

Custom description for the queries parameter on the local search_tools function.

AbstractCapability dataclass

Bases: ABC, Generic[AgentDepsT]

Abstract base class for agent capabilities.

A capability is a reusable, composable unit of agent behavior that can provide instructions, model settings, tools, and request/response hooks.

Lifecycle: capabilities are passed to an Agent at construction time, where most get_* methods are called to collect static configuration (instructions, model settings, toolsets, native tools). The exception is get_wrapper_toolset, which is called per-run during toolset assembly. Then, on each model request during a run, the before_model_request and after_model_request hooks are called to allow dynamic adjustments.

See the capabilities documentation for built-in capabilities.

get_serialization_name and from_spec support YAML/JSON specs (via [Agent.from_spec][pydantic_ai.Agent.from_spec]); they have sensible defaults and typically don't need to be overridden.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
@dataclass
class AbstractCapability(ABC, Generic[AgentDepsT]):
    """Abstract base class for agent capabilities.

    A capability is a reusable, composable unit of agent behavior that can provide
    instructions, model settings, tools, and request/response hooks.

    Lifecycle: capabilities are passed to an [`Agent`][pydantic_ai.Agent] at construction time, where
    most `get_*` methods are called to collect static configuration (instructions, model
    settings, toolsets, native tools). The exception is
    [`get_wrapper_toolset`][pydantic_ai.capabilities.AbstractCapability.get_wrapper_toolset],
    which is called per-run during toolset assembly. Then, on each model request during a
    run, the [`before_model_request`][pydantic_ai.capabilities.AbstractCapability.before_model_request]
    and [`after_model_request`][pydantic_ai.capabilities.AbstractCapability.after_model_request]
    hooks are called to allow dynamic adjustments.

    See the [capabilities documentation](capabilities.md) for built-in capabilities.

    [`get_serialization_name`][pydantic_ai.capabilities.AbstractCapability.get_serialization_name]
    and [`from_spec`][pydantic_ai.capabilities.AbstractCapability.from_spec] support
    YAML/JSON specs (via [`Agent.from_spec`][pydantic_ai.Agent.from_spec]); they have
    sensible defaults and typically don't need to be overridden.
    """

    def apply(self, visitor: Callable[[AbstractCapability[AgentDepsT]], None]) -> None:
        """Run a visitor function on all leaf capabilities in this tree.

        For a single capability, calls the visitor on itself.
        Overridden by [`CombinedCapability`][pydantic_ai.capabilities.CombinedCapability]
        to recursively visit all child capabilities, and by
        [`WrapperCapability`][pydantic_ai.capabilities.WrapperCapability]
        to delegate to the wrapped capability.
        """
        visitor(self)

    @property
    def has_wrap_node_run(self) -> bool:
        """Whether this capability (or any sub-capability) overrides wrap_node_run."""
        return type(self).wrap_node_run is not AbstractCapability.wrap_node_run

    @property
    def has_wrap_run_event_stream(self) -> bool:
        """Whether this capability (or any sub-capability) overrides wrap_run_event_stream."""
        return type(self).wrap_run_event_stream is not AbstractCapability.wrap_run_event_stream

    @classmethod
    def get_serialization_name(cls) -> str | None:
        """Return the name used for spec serialization (CamelCase class name by default).

        Return None to opt out of spec-based construction.
        """
        return cls.__name__

    @classmethod
    def from_spec(cls, *args: Any, **kwargs: Any) -> AbstractCapability[Any]:
        """Create from spec arguments. Default: `cls(*args, **kwargs)`.

        Override when `__init__` takes non-serializable types.
        """
        return cls(*args, **kwargs)

    def get_ordering(self) -> CapabilityOrdering | None:
        """Return ordering constraints for this capability, or `None` for default behavior.

        Override to declare a fixed position (`'outermost'` / `'innermost'`),
        relative ordering (`wraps` / `wrapped_by` other capability types or instances),
        or dependency requirements (`requires`).

        [`CombinedCapability`][pydantic_ai.capabilities.CombinedCapability] uses
        these to topologically sort its children at construction time.
        """
        return None

    async def for_run(self, ctx: RunContext[AgentDepsT]) -> AbstractCapability[AgentDepsT]:
        """Return the capability instance to use for this agent run.

        Called once per run, before `get_*()` re-extraction and before any hooks fire.
        Override to return a fresh instance for per-run state isolation.
        Default: return `self` (shared across runs).
        """
        return self

    def get_instructions(self) -> AgentInstructions[AgentDepsT] | None:
        """Return instructions to include in the system prompt, or None.

        This method is called once at agent construction time. To get dynamic
        per-request behavior, return a callable that receives
        [`RunContext`][pydantic_ai.tools.RunContext] or a
        [`TemplateStr`][pydantic_ai.TemplateStr] — not a dynamic string.
        """
        return None

    def get_model_settings(self) -> AgentModelSettings[AgentDepsT] | None:
        """Return model settings to merge into the agent's defaults, or None.

        This method is called once at agent construction time. Return a static
        `ModelSettings` dict when the settings don't change between requests.
        Return a callable that receives [`RunContext`][pydantic_ai.tools.RunContext]
        when settings need to vary per step (e.g. based on `ctx.run_step` or `ctx.deps`).

        When the callable is invoked, `ctx.model_settings` contains the merged
        result of all layers resolved before this capability (model defaults and
        agent-level settings). The returned dict is merged on top of that.
        """
        return None

    def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
        """Return a toolset to register with the agent, or None."""
        return None

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        """Return native tools to register with the agent."""
        return []

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None:
        """Wrap the agent's assembled toolset, or return None to leave it unchanged.

        Called per-run with the combined non-output toolset (after the
        [`prepare_tools`][pydantic_ai.capabilities.AbstractCapability.prepare_tools] hook
        has already wrapped it). Output tools are added separately and are not included.

        Unlike the other `get_*` methods which are called once at agent construction,
        this is called each run (after [`for_run`][pydantic_ai.capabilities.AbstractCapability.for_run]).
        When multiple capabilities provide wrappers, they follow middleware semantics:
        the first capability in the list wraps outermost (matching `wrap_*` hooks).

        Use this to apply cross-cutting toolset wrappers like
        [`PreparedToolset`][pydantic_ai.toolsets.PreparedToolset],
        [`FilteredToolset`][pydantic_ai.toolsets.FilteredToolset],
        or custom [`WrapperToolset`][pydantic_ai.toolsets.WrapperToolset] subclasses.
        """
        return None

    # --- Tool preparation hooks ---

    async def prepare_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        """Filter or modify function tool definitions for this step.

        Receives **function** tools only. For [output tools][pydantic_ai.output.ToolOutput],
        override
        [`prepare_output_tools`][pydantic_ai.capabilities.AbstractCapability.prepare_output_tools]
        — it runs separately, with `ctx.retry`/`ctx.max_retries` reflecting the **output**
        retry budget instead of the function-tool budget.

        Return a filtered or modified list. The result flows into both the model's request
        parameters and `ToolManager.tools`, so filtering also blocks tool execution.
        """
        return tool_defs

    async def prepare_output_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        """Filter or modify output tool definitions for this step.

        Receives only [output tools][pydantic_ai.output.ToolOutput]. `ctx.retry` and
        `ctx.max_retries` reflect the **output** retry budget (agent-level
        `max_output_retries`), matching the output hook lifecycle.

        Return a filtered or modified list. The result flows into both the model's request
        parameters and `ToolManager.tools`, so filtering also blocks tool execution.
        """
        return tool_defs

    # --- Run lifecycle hooks ---

    async def before_run(
        self,
        ctx: RunContext[AgentDepsT],
    ) -> None:
        """Called before the agent run starts. Observe-only; use wrap_run for modification."""

    async def after_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        result: AgentRunResult[Any],
    ) -> AgentRunResult[Any]:
        """Called after the agent run completes. Can modify the result."""
        return result

    async def wrap_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        handler: WrapRunHandler,
    ) -> AgentRunResult[Any]:
        """Wraps the entire agent run. `handler()` executes the run.

        If `handler()` raises and this method catches the exception and
        returns a result instead, the error is suppressed and the recovery
        result is used.

        If this method does not call `handler()` (short-circuit), the run
        is skipped and the returned result is used directly.

        Note: if the caller cancels the run (e.g. by breaking out of an
        `iter()` loop), this method receives an `asyncio.CancelledError`.
        Implementations that hold resources should handle cleanup accordingly.
        """
        return await handler()

    async def on_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        error: BaseException,
    ) -> AgentRunResult[Any]:
        """Called when the agent run fails with an exception.

        This is the error counterpart to
        [`after_run`][pydantic_ai.capabilities.AbstractCapability.after_run]:
        while `after_run` is called on success, `on_run_error` is called on
        failure (after [`wrap_run`][pydantic_ai.capabilities.AbstractCapability.wrap_run]
        has had its chance to recover).

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** an [`AgentRunResult`][pydantic_ai.run.AgentRunResult] to suppress
        the error and recover the run.

        Not called for `GeneratorExit` or `KeyboardInterrupt`.
        """
        raise error

    # --- Node run lifecycle hooks ---

    async def before_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
    ) -> AgentNode[AgentDepsT]:
        """Called before each graph node executes. Can observe or replace the node."""
        return node

    async def after_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        result: NodeResult[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        """Called after each graph node succeeds. Can modify the result (next node or `End`)."""
        return result

    async def wrap_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        handler: WrapNodeRunHandler[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        """Wraps execution of each agent graph node (run step).

        Called for every node in the agent graph (`UserPromptNode`,
        `ModelRequestNode`, `CallToolsNode`).  `handler(node)` executes
        the node and returns the next node (or `End`).

        Override to inspect or modify nodes before execution, inspect or modify
        the returned next node, call `handler` multiple times (retry), or
        return a different node to redirect graph progression.

        Note: this hook fires when using [`agent.run()`][pydantic_ai.Agent.run],
        [`agent.run_stream()`][pydantic_ai.Agent.run_stream], and when manually driving
        an [`agent.iter()`][pydantic_ai.Agent.iter] run with
        [`next()`][pydantic_ai.result.AgentRun.next], but it does **not** fire when
        iterating over the run with bare `async for` (which yields stream events, not
        node results).

        When using `agent.run()` with `event_stream_handler`, the handler wraps both
        streaming and graph advancement (i.e. the model call happens inside the wrapper).
        When using `agent.run_stream()`, the handler wraps only graph advancement — streaming
        happens before the wrapper because `run_stream()` must yield the stream to the caller
        while the stream context is still open, which cannot happen from inside a callback.
        """
        return await handler(node)

    async def on_node_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        error: Exception,
    ) -> NodeResult[AgentDepsT]:
        """Called when a graph node fails with an exception.

        This is the error counterpart to
        [`after_node_run`][pydantic_ai.capabilities.AbstractCapability.after_node_run].

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** a next node or `End` to recover and continue the graph.

        Useful for recovering from
        [`UnexpectedModelBehavior`][pydantic_ai.exceptions.UnexpectedModelBehavior]
        by redirecting to a different node (e.g. retry with different model settings).
        """
        raise error

    # --- Event stream hook ---

    async def wrap_run_event_stream(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        stream: AsyncIterable[AgentStreamEvent],
    ) -> AsyncIterable[AgentStreamEvent]:
        """Wraps the event stream for a streamed node. Can observe or transform events.

        Note: when this method is overridden (or [`Hooks.on.event`][pydantic_ai.capabilities.hooks.Hooks.on]
        / [`Hooks.on.run_event_stream`][pydantic_ai.capabilities.hooks.Hooks.on] are registered),
        [`agent.run()`][pydantic_ai.Agent.run] automatically enables streaming mode so this hook
        fires even without an explicit `event_stream_handler`.
        """
        async for event in stream:
            yield event

    # --- Model request lifecycle hooks ---

    async def before_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        request_context: ModelRequestContext,
    ) -> ModelRequestContext:
        """Called before each model request. Can modify messages, settings, and parameters."""
        return request_context

    async def after_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        response: ModelResponse,
    ) -> ModelResponse:
        """Called after each model response. Can modify the response before further processing.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the response and
        ask the model to try again. The original response is still appended to message history
        so the model can see what it said. Retries count against the output side of the agent's retry budget.
        """
        return response

    async def wrap_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        handler: WrapModelRequestHandler,
    ) -> ModelResponse:
        """Wraps the model request. handler() calls the model.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip `on_model_request_error`
        and directly retry the model request with a retry prompt. If the handler was called,
        the model response is preserved in history for context (same as `after_model_request`).
        """
        return await handler(request_context)

    async def on_model_request_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        error: Exception,
    ) -> ModelResponse:
        """Called when a model request fails with an exception.

        This is the error counterpart to
        [`after_model_request`][pydantic_ai.capabilities.AbstractCapability.after_model_request].

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** a [`ModelResponse`][pydantic_ai.messages.ModelResponse] to suppress
        the error and use the response as if the model call succeeded.
        **Raise** [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to retry the model request
        with a retry prompt instead of recovering or propagating.

        Not called for [`SkipModelRequest`][pydantic_ai.exceptions.SkipModelRequest]
        or [`ModelRetry`][pydantic_ai.exceptions.ModelRetry].
        """
        raise error

    # --- Tool validate lifecycle hooks ---

    async def before_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
    ) -> RawToolArgs:
        """Modify raw args before validation.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip validation and
        ask the model to redo the tool call.
        """
        return args

    async def after_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
    ) -> ValidatedToolArgs:
        """Modify validated args. Called only on successful validation.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the validated args
        and ask the model to redo the tool call.
        """
        return args

    async def wrap_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        handler: WrapToolValidateHandler,
    ) -> ValidatedToolArgs:
        """Wraps tool argument validation. handler() runs the validation."""
        return await handler(args)

    async def on_tool_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        error: ValidationError | ModelRetry,
    ) -> ValidatedToolArgs:
        """Called when tool argument validation fails.

        This is the error counterpart to
        [`after_tool_validate`][pydantic_ai.capabilities.AbstractCapability.after_tool_validate].
        Fires for [`ValidationError`][pydantic.ValidationError] (schema mismatch) and
        [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] (custom validator rejection).

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** validated args to suppress the error and continue as if validation passed.

        Not called for [`SkipToolValidation`][pydantic_ai.exceptions.SkipToolValidation].
        """
        raise error

    # --- Tool execute lifecycle hooks ---

    async def before_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
    ) -> ValidatedToolArgs:
        """Modify validated args before execution.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip execution and
        ask the model to redo the tool call.
        """
        return args

    async def after_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        result: Any,
    ) -> Any:
        """Modify result after execution.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the tool result
        and ask the model to redo the tool call.
        """
        return result

    async def wrap_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        handler: WrapToolExecuteHandler,
    ) -> Any:
        """Wraps tool execution. handler() runs the tool."""
        return await handler(args)

    async def on_tool_execute_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        error: Exception,
    ) -> Any:
        """Called when tool execution fails with an exception.

        This is the error counterpart to
        [`after_tool_execute`][pydantic_ai.capabilities.AbstractCapability.after_tool_execute].

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** any value to suppress the error and use it as the tool result.
        **Raise** [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to ask the model to
        redo the tool call instead of recovering or propagating.

        Not called for control flow exceptions
        ([`SkipToolExecution`][pydantic_ai.exceptions.SkipToolExecution],
        [`CallDeferred`][pydantic_ai.exceptions.CallDeferred],
        [`ApprovalRequired`][pydantic_ai.exceptions.ApprovalRequired])
        or retry signals ([`ToolRetryError`][pydantic_ai.exceptions.ToolRetryError]
        from [`ModelRetry`][pydantic_ai.exceptions.ModelRetry]).
        Use [`wrap_tool_execute`][pydantic_ai.capabilities.AbstractCapability.wrap_tool_execute]
        to intercept retries.
        """
        raise error

    # --- Output validate lifecycle hooks ---

    async def before_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
    ) -> RawOutput:
        """Modify raw model output before validation/parsing.

        The primary hook for pre-parse repair and normalization of model output.
        Fires only for structured output that requires parsing: prompted, native,
        tool, and union output. Does **not** fire for plain text or image output.

        For structured text output, `output` is the raw text string from the model.
        For tool output, `output` is the raw tool arguments (string or dict).

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip validation and
        ask the model to try again with a custom message.

        During streaming, this hook fires on every partial validation attempt as well as
        the final result. Check `ctx.partial_output` to distinguish and avoid expensive
        work on partial results.
        """
        return output

    async def after_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        """Modify validated output after successful parsing. Called only on success.

        `output` is the **semantic value** the model was asked to produce — e.g., a
        `MyModel` instance for `output_type=MyModel`, or `42` for `output_type=int`, or
        the input to a single-arg output function. For multi-arg output functions, this
        is the `dict` of arguments (the genuine multi-value input).

        Note: this differs from *tool* hooks (`after_tool_validate`), which always see
        `dict[str, Any]` — tool args follow the schema contract. Output hooks see the
        semantic output value, regardless of how it's internally represented during
        validation.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the validated
        output and ask the model to try again.
        """
        return output

    async def wrap_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        handler: WrapOutputValidateHandler,
    ) -> Any:
        """Wraps output validation. handler(output) performs the validation.

        [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] from within the handler goes to
        [`on_output_validate_error`][pydantic_ai.capabilities.AbstractCapability.on_output_validate_error].
        `ModelRetry` raised directly (not from the handler) bypasses the error hook.
        """
        return await handler(output)

    async def on_output_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        error: ValidationError | ModelRetry,
    ) -> Any:
        """Called when output validation fails.

        This is the error counterpart to
        [`after_output_validate`][pydantic_ai.capabilities.AbstractCapability.after_output_validate].

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** validated output to suppress the error and continue.
        """
        raise error

    # --- Output process lifecycle hooks ---

    async def before_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        """Modify validated output before processing (extraction, output function call).

        `output` is the **semantic value** — e.g., a `MyModel` instance or `42`, matching
        `after_output_validate`. For multi-arg output functions, it's the `dict` of args.
        See [`after_output_validate`][pydantic_ai.capabilities.AbstractCapability.after_output_validate]
        for a full explanation of the semantic-value contract.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip processing and
        ask the model to try again.
        """
        return output

    async def after_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        """Modify result after output processing.

        Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the result
        and ask the model to try again.
        """
        return output

    async def wrap_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        handler: WrapOutputProcessHandler,
    ) -> Any:
        """Wraps output processing. handler(output) runs extraction + output function call.

        [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] bypasses
        [`on_output_process_error`][pydantic_ai.capabilities.AbstractCapability.on_output_process_error]
        (treated as control flow, not an error).

        During streaming, this fires only when partial validation succeeds, and on the
        final result. Check `ctx.partial_output` to skip expensive work on partial results.
        """
        return await handler(output)

    async def on_output_process_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        error: Exception,
    ) -> Any:
        """Called when output processing fails with an exception.

        This is the error counterpart to
        [`after_output_process`][pydantic_ai.capabilities.AbstractCapability.after_output_process].

        **Raise** the original `error` (or a different exception) to propagate it.
        **Return** any value to suppress the error and use it as the output.

        Not called for retry signals ([`ToolRetryError`][pydantic_ai.exceptions.ToolRetryError]
        from [`ModelRetry`][pydantic_ai.exceptions.ModelRetry]).
        """
        raise error

    # --- Deferred tool call hooks ---

    async def handle_deferred_tool_calls(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        requests: DeferredToolRequests,
    ) -> DeferredToolResults | None:
        """Handle deferred tool calls (approval-required or externally-executed) inline during an agent run.

        Called by [`ToolManager`][pydantic_ai.tool_manager.ToolManager] when:

        - a tool raises [`ApprovalRequired`][pydantic_ai.exceptions.ApprovalRequired] or
          [`CallDeferred`][pydantic_ai.exceptions.CallDeferred] during execution, or
        - the model calls a tool registered with `requires_approval=True` (see
          [Human-in-the-Loop Tool Approval](../deferred-tools.md#human-in-the-loop-tool-approval))
          or a tool backed by [external execution](../deferred-tools.md#external-tool-execution).

        Uses accumulation dispatch: each capability in the chain receives remaining
        unresolved requests and can resolve some or all of them. Results are merged
        and unresolved calls are passed to the next capability.

        **Return** a [`DeferredToolResults`][pydantic_ai.tools.DeferredToolResults] to resolve
        some or all calls.
        **Return** `None` to leave all calls unresolved.
        """
        return None

    # --- Convenience methods ---

    def prefix_tools(self, prefix: str) -> PrefixTools[AgentDepsT]:
        """Returns a new capability that wraps this one and prefixes its tool names.

        Only this capability's tools are prefixed; other agent tools are unaffected.
        """
        from .prefix_tools import PrefixTools

        return PrefixTools(wrapped=self, prefix=prefix)

apply

apply(
    visitor: Callable[
        [AbstractCapability[AgentDepsT]], None
    ],
) -> None

Run a visitor function on all leaf capabilities in this tree.

For a single capability, calls the visitor on itself. Overridden by CombinedCapability to recursively visit all child capabilities, and by WrapperCapability to delegate to the wrapped capability.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
156
157
158
159
160
161
162
163
164
165
def apply(self, visitor: Callable[[AbstractCapability[AgentDepsT]], None]) -> None:
    """Run a visitor function on all leaf capabilities in this tree.

    For a single capability, calls the visitor on itself.
    Overridden by [`CombinedCapability`][pydantic_ai.capabilities.CombinedCapability]
    to recursively visit all child capabilities, and by
    [`WrapperCapability`][pydantic_ai.capabilities.WrapperCapability]
    to delegate to the wrapped capability.
    """
    visitor(self)

has_wrap_node_run property

has_wrap_node_run: bool

Whether this capability (or any sub-capability) overrides wrap_node_run.

has_wrap_run_event_stream property

has_wrap_run_event_stream: bool

Whether this capability (or any sub-capability) overrides wrap_run_event_stream.

get_serialization_name classmethod

get_serialization_name() -> str | None

Return the name used for spec serialization (CamelCase class name by default).

Return None to opt out of spec-based construction.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
177
178
179
180
181
182
183
@classmethod
def get_serialization_name(cls) -> str | None:
    """Return the name used for spec serialization (CamelCase class name by default).

    Return None to opt out of spec-based construction.
    """
    return cls.__name__

from_spec classmethod

from_spec(
    *args: Any, **kwargs: Any
) -> AbstractCapability[Any]

Create from spec arguments. Default: cls(*args, **kwargs).

Override when __init__ takes non-serializable types.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
185
186
187
188
189
190
191
@classmethod
def from_spec(cls, *args: Any, **kwargs: Any) -> AbstractCapability[Any]:
    """Create from spec arguments. Default: `cls(*args, **kwargs)`.

    Override when `__init__` takes non-serializable types.
    """
    return cls(*args, **kwargs)

get_ordering

get_ordering() -> CapabilityOrdering | None

Return ordering constraints for this capability, or None for default behavior.

Override to declare a fixed position ('outermost' / 'innermost'), relative ordering (wraps / wrapped_by other capability types or instances), or dependency requirements (requires).

CombinedCapability uses these to topologically sort its children at construction time.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
193
194
195
196
197
198
199
200
201
202
203
def get_ordering(self) -> CapabilityOrdering | None:
    """Return ordering constraints for this capability, or `None` for default behavior.

    Override to declare a fixed position (`'outermost'` / `'innermost'`),
    relative ordering (`wraps` / `wrapped_by` other capability types or instances),
    or dependency requirements (`requires`).

    [`CombinedCapability`][pydantic_ai.capabilities.CombinedCapability] uses
    these to topologically sort its children at construction time.
    """
    return None

for_run async

Return the capability instance to use for this agent run.

Called once per run, before get_*() re-extraction and before any hooks fire. Override to return a fresh instance for per-run state isolation. Default: return self (shared across runs).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
205
206
207
208
209
210
211
212
async def for_run(self, ctx: RunContext[AgentDepsT]) -> AbstractCapability[AgentDepsT]:
    """Return the capability instance to use for this agent run.

    Called once per run, before `get_*()` re-extraction and before any hooks fire.
    Override to return a fresh instance for per-run state isolation.
    Default: return `self` (shared across runs).
    """
    return self

get_instructions

get_instructions() -> AgentInstructions[AgentDepsT] | None

Return instructions to include in the system prompt, or None.

This method is called once at agent construction time. To get dynamic per-request behavior, return a callable that receives RunContext or a [TemplateStr][pydantic_ai.TemplateStr] — not a dynamic string.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
214
215
216
217
218
219
220
221
222
def get_instructions(self) -> AgentInstructions[AgentDepsT] | None:
    """Return instructions to include in the system prompt, or None.

    This method is called once at agent construction time. To get dynamic
    per-request behavior, return a callable that receives
    [`RunContext`][pydantic_ai.tools.RunContext] or a
    [`TemplateStr`][pydantic_ai.TemplateStr] — not a dynamic string.
    """
    return None

get_model_settings

get_model_settings() -> (
    AgentModelSettings[AgentDepsT] | None
)

Return model settings to merge into the agent's defaults, or None.

This method is called once at agent construction time. Return a static ModelSettings dict when the settings don't change between requests. Return a callable that receives RunContext when settings need to vary per step (e.g. based on ctx.run_step or ctx.deps).

When the callable is invoked, ctx.model_settings contains the merged result of all layers resolved before this capability (model defaults and agent-level settings). The returned dict is merged on top of that.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
224
225
226
227
228
229
230
231
232
233
234
235
236
def get_model_settings(self) -> AgentModelSettings[AgentDepsT] | None:
    """Return model settings to merge into the agent's defaults, or None.

    This method is called once at agent construction time. Return a static
    `ModelSettings` dict when the settings don't change between requests.
    Return a callable that receives [`RunContext`][pydantic_ai.tools.RunContext]
    when settings need to vary per step (e.g. based on `ctx.run_step` or `ctx.deps`).

    When the callable is invoked, `ctx.model_settings` contains the merged
    result of all layers resolved before this capability (model defaults and
    agent-level settings). The returned dict is merged on top of that.
    """
    return None

get_toolset

get_toolset() -> AgentToolset[AgentDepsT] | None

Return a toolset to register with the agent, or None.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
238
239
240
def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
    """Return a toolset to register with the agent, or None."""
    return None

get_native_tools

get_native_tools() -> Sequence[AgentNativeTool[AgentDepsT]]

Return native tools to register with the agent.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
242
243
244
def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
    """Return native tools to register with the agent."""
    return []

get_wrapper_toolset

get_wrapper_toolset(
    toolset: AbstractToolset[AgentDepsT],
) -> AbstractToolset[AgentDepsT] | None

Wrap the agent's assembled toolset, or return None to leave it unchanged.

Called per-run with the combined non-output toolset (after the prepare_tools hook has already wrapped it). Output tools are added separately and are not included.

Unlike the other get_* methods which are called once at agent construction, this is called each run (after for_run). When multiple capabilities provide wrappers, they follow middleware semantics: the first capability in the list wraps outermost (matching wrap_* hooks).

Use this to apply cross-cutting toolset wrappers like PreparedToolset, FilteredToolset, or custom WrapperToolset subclasses.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None:
    """Wrap the agent's assembled toolset, or return None to leave it unchanged.

    Called per-run with the combined non-output toolset (after the
    [`prepare_tools`][pydantic_ai.capabilities.AbstractCapability.prepare_tools] hook
    has already wrapped it). Output tools are added separately and are not included.

    Unlike the other `get_*` methods which are called once at agent construction,
    this is called each run (after [`for_run`][pydantic_ai.capabilities.AbstractCapability.for_run]).
    When multiple capabilities provide wrappers, they follow middleware semantics:
    the first capability in the list wraps outermost (matching `wrap_*` hooks).

    Use this to apply cross-cutting toolset wrappers like
    [`PreparedToolset`][pydantic_ai.toolsets.PreparedToolset],
    [`FilteredToolset`][pydantic_ai.toolsets.FilteredToolset],
    or custom [`WrapperToolset`][pydantic_ai.toolsets.WrapperToolset] subclasses.
    """
    return None

prepare_tools async

prepare_tools(
    ctx: RunContext[AgentDepsT],
    tool_defs: list[ToolDefinition],
) -> list[ToolDefinition]

Filter or modify function tool definitions for this step.

Receives function tools only. For output tools, override prepare_output_tools — it runs separately, with ctx.retry/ctx.max_retries reflecting the output retry budget instead of the function-tool budget.

Return a filtered or modified list. The result flows into both the model's request parameters and ToolManager.tools, so filtering also blocks tool execution.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
async def prepare_tools(
    self,
    ctx: RunContext[AgentDepsT],
    tool_defs: list[ToolDefinition],
) -> list[ToolDefinition]:
    """Filter or modify function tool definitions for this step.

    Receives **function** tools only. For [output tools][pydantic_ai.output.ToolOutput],
    override
    [`prepare_output_tools`][pydantic_ai.capabilities.AbstractCapability.prepare_output_tools]
    — it runs separately, with `ctx.retry`/`ctx.max_retries` reflecting the **output**
    retry budget instead of the function-tool budget.

    Return a filtered or modified list. The result flows into both the model's request
    parameters and `ToolManager.tools`, so filtering also blocks tool execution.
    """
    return tool_defs

prepare_output_tools async

prepare_output_tools(
    ctx: RunContext[AgentDepsT],
    tool_defs: list[ToolDefinition],
) -> list[ToolDefinition]

Filter or modify output tool definitions for this step.

Receives only output tools. ctx.retry and ctx.max_retries reflect the output retry budget (agent-level max_output_retries), matching the output hook lifecycle.

Return a filtered or modified list. The result flows into both the model's request parameters and ToolManager.tools, so filtering also blocks tool execution.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def prepare_output_tools(
    self,
    ctx: RunContext[AgentDepsT],
    tool_defs: list[ToolDefinition],
) -> list[ToolDefinition]:
    """Filter or modify output tool definitions for this step.

    Receives only [output tools][pydantic_ai.output.ToolOutput]. `ctx.retry` and
    `ctx.max_retries` reflect the **output** retry budget (agent-level
    `max_output_retries`), matching the output hook lifecycle.

    Return a filtered or modified list. The result flows into both the model's request
    parameters and `ToolManager.tools`, so filtering also blocks tool execution.
    """
    return tool_defs

before_run async

before_run(ctx: RunContext[AgentDepsT]) -> None

Called before the agent run starts. Observe-only; use wrap_run for modification.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
303
304
305
306
307
async def before_run(
    self,
    ctx: RunContext[AgentDepsT],
) -> None:
    """Called before the agent run starts. Observe-only; use wrap_run for modification."""

after_run async

after_run(
    ctx: RunContext[AgentDepsT],
    *,
    result: AgentRunResult[Any]
) -> AgentRunResult[Any]

Called after the agent run completes. Can modify the result.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
309
310
311
312
313
314
315
316
async def after_run(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    result: AgentRunResult[Any],
) -> AgentRunResult[Any]:
    """Called after the agent run completes. Can modify the result."""
    return result

wrap_run async

wrap_run(
    ctx: RunContext[AgentDepsT], *, handler: WrapRunHandler
) -> AgentRunResult[Any]

Wraps the entire agent run. handler() executes the run.

If handler() raises and this method catches the exception and returns a result instead, the error is suppressed and the recovery result is used.

If this method does not call handler() (short-circuit), the run is skipped and the returned result is used directly.

Note: if the caller cancels the run (e.g. by breaking out of an iter() loop), this method receives an asyncio.CancelledError. Implementations that hold resources should handle cleanup accordingly.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
async def wrap_run(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    handler: WrapRunHandler,
) -> AgentRunResult[Any]:
    """Wraps the entire agent run. `handler()` executes the run.

    If `handler()` raises and this method catches the exception and
    returns a result instead, the error is suppressed and the recovery
    result is used.

    If this method does not call `handler()` (short-circuit), the run
    is skipped and the returned result is used directly.

    Note: if the caller cancels the run (e.g. by breaking out of an
    `iter()` loop), this method receives an `asyncio.CancelledError`.
    Implementations that hold resources should handle cleanup accordingly.
    """
    return await handler()

on_run_error async

on_run_error(
    ctx: RunContext[AgentDepsT], *, error: BaseException
) -> AgentRunResult[Any]

Called when the agent run fails with an exception.

This is the error counterpart to after_run: while after_run is called on success, on_run_error is called on failure (after wrap_run has had its chance to recover).

Raise the original error (or a different exception) to propagate it. Return an AgentRunResult to suppress the error and recover the run.

Not called for GeneratorExit or KeyboardInterrupt.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
async def on_run_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    error: BaseException,
) -> AgentRunResult[Any]:
    """Called when the agent run fails with an exception.

    This is the error counterpart to
    [`after_run`][pydantic_ai.capabilities.AbstractCapability.after_run]:
    while `after_run` is called on success, `on_run_error` is called on
    failure (after [`wrap_run`][pydantic_ai.capabilities.AbstractCapability.wrap_run]
    has had its chance to recover).

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** an [`AgentRunResult`][pydantic_ai.run.AgentRunResult] to suppress
    the error and recover the run.

    Not called for `GeneratorExit` or `KeyboardInterrupt`.
    """
    raise error

before_node_run async

before_node_run(
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT]
) -> AgentNode[AgentDepsT]

Called before each graph node executes. Can observe or replace the node.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
363
364
365
366
367
368
369
370
async def before_node_run(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
) -> AgentNode[AgentDepsT]:
    """Called before each graph node executes. Can observe or replace the node."""
    return node

after_node_run async

after_node_run(
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
    result: NodeResult[AgentDepsT]
) -> NodeResult[AgentDepsT]

Called after each graph node succeeds. Can modify the result (next node or End).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
372
373
374
375
376
377
378
379
380
async def after_node_run(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
    result: NodeResult[AgentDepsT],
) -> NodeResult[AgentDepsT]:
    """Called after each graph node succeeds. Can modify the result (next node or `End`)."""
    return result

wrap_node_run async

Wraps execution of each agent graph node (run step).

Called for every node in the agent graph (UserPromptNode, ModelRequestNode, CallToolsNode). handler(node) executes the node and returns the next node (or End).

Override to inspect or modify nodes before execution, inspect or modify the returned next node, call handler multiple times (retry), or return a different node to redirect graph progression.

Note: this hook fires when using [agent.run()][pydantic_ai.Agent.run], [agent.run_stream()][pydantic_ai.Agent.run_stream], and when manually driving an [agent.iter()][pydantic_ai.Agent.iter] run with [next()][pydantic_ai.result.AgentRun.next], but it does not fire when iterating over the run with bare async for (which yields stream events, not node results).

When using agent.run() with event_stream_handler, the handler wraps both streaming and graph advancement (i.e. the model call happens inside the wrapper). When using agent.run_stream(), the handler wraps only graph advancement — streaming happens before the wrapper because run_stream() must yield the stream to the caller while the stream context is still open, which cannot happen from inside a callback.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
async def wrap_node_run(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
    handler: WrapNodeRunHandler[AgentDepsT],
) -> NodeResult[AgentDepsT]:
    """Wraps execution of each agent graph node (run step).

    Called for every node in the agent graph (`UserPromptNode`,
    `ModelRequestNode`, `CallToolsNode`).  `handler(node)` executes
    the node and returns the next node (or `End`).

    Override to inspect or modify nodes before execution, inspect or modify
    the returned next node, call `handler` multiple times (retry), or
    return a different node to redirect graph progression.

    Note: this hook fires when using [`agent.run()`][pydantic_ai.Agent.run],
    [`agent.run_stream()`][pydantic_ai.Agent.run_stream], and when manually driving
    an [`agent.iter()`][pydantic_ai.Agent.iter] run with
    [`next()`][pydantic_ai.result.AgentRun.next], but it does **not** fire when
    iterating over the run with bare `async for` (which yields stream events, not
    node results).

    When using `agent.run()` with `event_stream_handler`, the handler wraps both
    streaming and graph advancement (i.e. the model call happens inside the wrapper).
    When using `agent.run_stream()`, the handler wraps only graph advancement — streaming
    happens before the wrapper because `run_stream()` must yield the stream to the caller
    while the stream context is still open, which cannot happen from inside a callback.
    """
    return await handler(node)

on_node_run_error async

on_node_run_error(
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
    error: Exception
) -> NodeResult[AgentDepsT]

Called when a graph node fails with an exception.

This is the error counterpart to after_node_run.

Raise the original error (or a different exception) to propagate it. Return a next node or End to recover and continue the graph.

Useful for recovering from UnexpectedModelBehavior by redirecting to a different node (e.g. retry with different model settings).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
async def on_node_run_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    node: AgentNode[AgentDepsT],
    error: Exception,
) -> NodeResult[AgentDepsT]:
    """Called when a graph node fails with an exception.

    This is the error counterpart to
    [`after_node_run`][pydantic_ai.capabilities.AbstractCapability.after_node_run].

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** a next node or `End` to recover and continue the graph.

    Useful for recovering from
    [`UnexpectedModelBehavior`][pydantic_ai.exceptions.UnexpectedModelBehavior]
    by redirecting to a different node (e.g. retry with different model settings).
    """
    raise error

wrap_run_event_stream async

wrap_run_event_stream(
    ctx: RunContext[AgentDepsT],
    *,
    stream: AsyncIterable[AgentStreamEvent]
) -> AsyncIterable[AgentStreamEvent]

Wraps the event stream for a streamed node. Can observe or transform events.

Note: when this method is overridden (or Hooks.on.event / Hooks.on.run_event_stream are registered), [agent.run()][pydantic_ai.Agent.run] automatically enables streaming mode so this hook fires even without an explicit event_stream_handler.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
async def wrap_run_event_stream(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    stream: AsyncIterable[AgentStreamEvent],
) -> AsyncIterable[AgentStreamEvent]:
    """Wraps the event stream for a streamed node. Can observe or transform events.

    Note: when this method is overridden (or [`Hooks.on.event`][pydantic_ai.capabilities.hooks.Hooks.on]
    / [`Hooks.on.run_event_stream`][pydantic_ai.capabilities.hooks.Hooks.on] are registered),
    [`agent.run()`][pydantic_ai.Agent.run] automatically enables streaming mode so this hook
    fires even without an explicit `event_stream_handler`.
    """
    async for event in stream:
        yield event

before_model_request async

before_model_request(
    ctx: RunContext[AgentDepsT],
    request_context: ModelRequestContext,
) -> ModelRequestContext

Called before each model request. Can modify messages, settings, and parameters.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
455
456
457
458
459
460
461
async def before_model_request(
    self,
    ctx: RunContext[AgentDepsT],
    request_context: ModelRequestContext,
) -> ModelRequestContext:
    """Called before each model request. Can modify messages, settings, and parameters."""
    return request_context

after_model_request async

after_model_request(
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    response: ModelResponse
) -> ModelResponse

Called after each model response. Can modify the response before further processing.

Raise ModelRetry to reject the response and ask the model to try again. The original response is still appended to message history so the model can see what it said. Retries count against the output side of the agent's retry budget.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
async def after_model_request(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    response: ModelResponse,
) -> ModelResponse:
    """Called after each model response. Can modify the response before further processing.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the response and
    ask the model to try again. The original response is still appended to message history
    so the model can see what it said. Retries count against the output side of the agent's retry budget.
    """
    return response

wrap_model_request async

wrap_model_request(
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    handler: WrapModelRequestHandler
) -> ModelResponse

Wraps the model request. handler() calls the model.

Raise ModelRetry to skip on_model_request_error and directly retry the model request with a retry prompt. If the handler was called, the model response is preserved in history for context (same as after_model_request).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
async def wrap_model_request(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    handler: WrapModelRequestHandler,
) -> ModelResponse:
    """Wraps the model request. handler() calls the model.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip `on_model_request_error`
    and directly retry the model request with a retry prompt. If the handler was called,
    the model response is preserved in history for context (same as `after_model_request`).
    """
    return await handler(request_context)

on_model_request_error async

on_model_request_error(
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    error: Exception
) -> ModelResponse

Called when a model request fails with an exception.

This is the error counterpart to after_model_request.

Raise the original error (or a different exception) to propagate it. Return a ModelResponse to suppress the error and use the response as if the model call succeeded. Raise ModelRetry to retry the model request with a retry prompt instead of recovering or propagating.

Not called for SkipModelRequest or ModelRetry.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
async def on_model_request_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    request_context: ModelRequestContext,
    error: Exception,
) -> ModelResponse:
    """Called when a model request fails with an exception.

    This is the error counterpart to
    [`after_model_request`][pydantic_ai.capabilities.AbstractCapability.after_model_request].

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** a [`ModelResponse`][pydantic_ai.messages.ModelResponse] to suppress
    the error and use the response as if the model call succeeded.
    **Raise** [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to retry the model request
    with a retry prompt instead of recovering or propagating.

    Not called for [`SkipModelRequest`][pydantic_ai.exceptions.SkipModelRequest]
    or [`ModelRetry`][pydantic_ai.exceptions.ModelRetry].
    """
    raise error

before_tool_validate async

before_tool_validate(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs
) -> RawToolArgs

Modify raw args before validation.

Raise ModelRetry to skip validation and ask the model to redo the tool call.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
async def before_tool_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs,
) -> RawToolArgs:
    """Modify raw args before validation.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip validation and
    ask the model to redo the tool call.
    """
    return args

after_tool_validate async

after_tool_validate(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs
) -> ValidatedToolArgs

Modify validated args. Called only on successful validation.

Raise ModelRetry to reject the validated args and ask the model to redo the tool call.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
async def after_tool_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
) -> ValidatedToolArgs:
    """Modify validated args. Called only on successful validation.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the validated args
    and ask the model to redo the tool call.
    """
    return args

wrap_tool_validate async

wrap_tool_validate(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs,
    handler: WrapToolValidateHandler
) -> ValidatedToolArgs

Wraps tool argument validation. handler() runs the validation.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
548
549
550
551
552
553
554
555
556
557
558
async def wrap_tool_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs,
    handler: WrapToolValidateHandler,
) -> ValidatedToolArgs:
    """Wraps tool argument validation. handler() runs the validation."""
    return await handler(args)

on_tool_validate_error async

on_tool_validate_error(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs,
    error: ValidationError | ModelRetry
) -> ValidatedToolArgs

Called when tool argument validation fails.

This is the error counterpart to after_tool_validate. Fires for [ValidationError][pydantic.ValidationError] (schema mismatch) and ModelRetry (custom validator rejection).

Raise the original error (or a different exception) to propagate it. Return validated args to suppress the error and continue as if validation passed.

Not called for SkipToolValidation.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
async def on_tool_validate_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: RawToolArgs,
    error: ValidationError | ModelRetry,
) -> ValidatedToolArgs:
    """Called when tool argument validation fails.

    This is the error counterpart to
    [`after_tool_validate`][pydantic_ai.capabilities.AbstractCapability.after_tool_validate].
    Fires for [`ValidationError`][pydantic.ValidationError] (schema mismatch) and
    [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] (custom validator rejection).

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** validated args to suppress the error and continue as if validation passed.

    Not called for [`SkipToolValidation`][pydantic_ai.exceptions.SkipToolValidation].
    """
    raise error

before_tool_execute async

before_tool_execute(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs
) -> ValidatedToolArgs

Modify validated args before execution.

Raise ModelRetry to skip execution and ask the model to redo the tool call.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
async def before_tool_execute(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
) -> ValidatedToolArgs:
    """Modify validated args before execution.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip execution and
    ask the model to redo the tool call.
    """
    return args

after_tool_execute async

after_tool_execute(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    result: Any
) -> Any

Modify result after execution.

Raise ModelRetry to reject the tool result and ask the model to redo the tool call.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
async def after_tool_execute(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    result: Any,
) -> Any:
    """Modify result after execution.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the tool result
    and ask the model to redo the tool call.
    """
    return result

wrap_tool_execute async

wrap_tool_execute(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    handler: WrapToolExecuteHandler
) -> Any

Wraps tool execution. handler() runs the tool.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
616
617
618
619
620
621
622
623
624
625
626
async def wrap_tool_execute(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    handler: WrapToolExecuteHandler,
) -> Any:
    """Wraps tool execution. handler() runs the tool."""
    return await handler(args)

on_tool_execute_error async

on_tool_execute_error(
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    error: Exception
) -> Any

Called when tool execution fails with an exception.

This is the error counterpart to after_tool_execute.

Raise the original error (or a different exception) to propagate it. Return any value to suppress the error and use it as the tool result. Raise ModelRetry to ask the model to redo the tool call instead of recovering or propagating.

Not called for control flow exceptions (SkipToolExecution, CallDeferred, ApprovalRequired) or retry signals (ToolRetryError from ModelRetry). Use wrap_tool_execute to intercept retries.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
async def on_tool_execute_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    call: ToolCallPart,
    tool_def: ToolDefinition,
    args: ValidatedToolArgs,
    error: Exception,
) -> Any:
    """Called when tool execution fails with an exception.

    This is the error counterpart to
    [`after_tool_execute`][pydantic_ai.capabilities.AbstractCapability.after_tool_execute].

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** any value to suppress the error and use it as the tool result.
    **Raise** [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to ask the model to
    redo the tool call instead of recovering or propagating.

    Not called for control flow exceptions
    ([`SkipToolExecution`][pydantic_ai.exceptions.SkipToolExecution],
    [`CallDeferred`][pydantic_ai.exceptions.CallDeferred],
    [`ApprovalRequired`][pydantic_ai.exceptions.ApprovalRequired])
    or retry signals ([`ToolRetryError`][pydantic_ai.exceptions.ToolRetryError]
    from [`ModelRetry`][pydantic_ai.exceptions.ModelRetry]).
    Use [`wrap_tool_execute`][pydantic_ai.capabilities.AbstractCapability.wrap_tool_execute]
    to intercept retries.
    """
    raise error

before_output_validate async

before_output_validate(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput
) -> RawOutput

Modify raw model output before validation/parsing.

The primary hook for pre-parse repair and normalization of model output. Fires only for structured output that requires parsing: prompted, native, tool, and union output. Does not fire for plain text or image output.

For structured text output, output is the raw text string from the model. For tool output, output is the raw tool arguments (string or dict).

Raise ModelRetry to skip validation and ask the model to try again with a custom message.

During streaming, this hook fires on every partial validation attempt as well as the final result. Check ctx.partial_output to distinguish and avoid expensive work on partial results.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
async def before_output_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput,
) -> RawOutput:
    """Modify raw model output before validation/parsing.

    The primary hook for pre-parse repair and normalization of model output.
    Fires only for structured output that requires parsing: prompted, native,
    tool, and union output. Does **not** fire for plain text or image output.

    For structured text output, `output` is the raw text string from the model.
    For tool output, `output` is the raw tool arguments (string or dict).

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip validation and
    ask the model to try again with a custom message.

    During streaming, this hook fires on every partial validation attempt as well as
    the final result. Check `ctx.partial_output` to distinguish and avoid expensive
    work on partial results.
    """
    return output

after_output_validate async

after_output_validate(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any
) -> Any

Modify validated output after successful parsing. Called only on success.

output is the semantic value the model was asked to produce — e.g., a MyModel instance for output_type=MyModel, or 42 for output_type=int, or the input to a single-arg output function. For multi-arg output functions, this is the dict of arguments (the genuine multi-value input).

Note: this differs from tool hooks (after_tool_validate), which always see dict[str, Any] — tool args follow the schema contract. Output hooks see the semantic output value, regardless of how it's internally represented during validation.

Raise ModelRetry to reject the validated output and ask the model to try again.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
async def after_output_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
) -> Any:
    """Modify validated output after successful parsing. Called only on success.

    `output` is the **semantic value** the model was asked to produce — e.g., a
    `MyModel` instance for `output_type=MyModel`, or `42` for `output_type=int`, or
    the input to a single-arg output function. For multi-arg output functions, this
    is the `dict` of arguments (the genuine multi-value input).

    Note: this differs from *tool* hooks (`after_tool_validate`), which always see
    `dict[str, Any]` — tool args follow the schema contract. Output hooks see the
    semantic output value, regardless of how it's internally represented during
    validation.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the validated
    output and ask the model to try again.
    """
    return output

wrap_output_validate async

wrap_output_validate(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput,
    handler: WrapOutputValidateHandler
) -> Any

Wraps output validation. handler(output) performs the validation.

ModelRetry from within the handler goes to on_output_validate_error. ModelRetry raised directly (not from the handler) bypasses the error hook.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
async def wrap_output_validate(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput,
    handler: WrapOutputValidateHandler,
) -> Any:
    """Wraps output validation. handler(output) performs the validation.

    [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] from within the handler goes to
    [`on_output_validate_error`][pydantic_ai.capabilities.AbstractCapability.on_output_validate_error].
    `ModelRetry` raised directly (not from the handler) bypasses the error hook.
    """
    return await handler(output)

on_output_validate_error async

on_output_validate_error(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput,
    error: ValidationError | ModelRetry
) -> Any

Called when output validation fails.

This is the error counterpart to after_output_validate.

Raise the original error (or a different exception) to propagate it. Return validated output to suppress the error and continue.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
async def on_output_validate_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: RawOutput,
    error: ValidationError | ModelRetry,
) -> Any:
    """Called when output validation fails.

    This is the error counterpart to
    [`after_output_validate`][pydantic_ai.capabilities.AbstractCapability.after_output_validate].

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** validated output to suppress the error and continue.
    """
    raise error

before_output_process async

before_output_process(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any
) -> Any

Modify validated output before processing (extraction, output function call).

output is the semantic value — e.g., a MyModel instance or 42, matching after_output_validate. For multi-arg output functions, it's the dict of args. See after_output_validate for a full explanation of the semantic-value contract.

Raise ModelRetry to skip processing and ask the model to try again.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
async def before_output_process(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
) -> Any:
    """Modify validated output before processing (extraction, output function call).

    `output` is the **semantic value** — e.g., a `MyModel` instance or `42`, matching
    `after_output_validate`. For multi-arg output functions, it's the `dict` of args.
    See [`after_output_validate`][pydantic_ai.capabilities.AbstractCapability.after_output_validate]
    for a full explanation of the semantic-value contract.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to skip processing and
    ask the model to try again.
    """
    return output

after_output_process async

after_output_process(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any
) -> Any

Modify result after output processing.

Raise ModelRetry to reject the result and ask the model to try again.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
764
765
766
767
768
769
770
771
772
773
774
775
776
async def after_output_process(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
) -> Any:
    """Modify result after output processing.

    Raise [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] to reject the result
    and ask the model to try again.
    """
    return output

wrap_output_process async

wrap_output_process(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    handler: WrapOutputProcessHandler
) -> Any

Wraps output processing. handler(output) runs extraction + output function call.

ModelRetry bypasses on_output_process_error (treated as control flow, not an error).

During streaming, this fires only when partial validation succeeds, and on the final result. Check ctx.partial_output to skip expensive work on partial results.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
async def wrap_output_process(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    handler: WrapOutputProcessHandler,
) -> Any:
    """Wraps output processing. handler(output) runs extraction + output function call.

    [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] bypasses
    [`on_output_process_error`][pydantic_ai.capabilities.AbstractCapability.on_output_process_error]
    (treated as control flow, not an error).

    During streaming, this fires only when partial validation succeeds, and on the
    final result. Check `ctx.partial_output` to skip expensive work on partial results.
    """
    return await handler(output)

on_output_process_error async

on_output_process_error(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    error: Exception
) -> Any

Called when output processing fails with an exception.

This is the error counterpart to after_output_process.

Raise the original error (or a different exception) to propagate it. Return any value to suppress the error and use it as the output.

Not called for retry signals (ToolRetryError from ModelRetry).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
async def on_output_process_error(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    error: Exception,
) -> Any:
    """Called when output processing fails with an exception.

    This is the error counterpart to
    [`after_output_process`][pydantic_ai.capabilities.AbstractCapability.after_output_process].

    **Raise** the original `error` (or a different exception) to propagate it.
    **Return** any value to suppress the error and use it as the output.

    Not called for retry signals ([`ToolRetryError`][pydantic_ai.exceptions.ToolRetryError]
    from [`ModelRetry`][pydantic_ai.exceptions.ModelRetry]).
    """
    raise error

handle_deferred_tool_calls async

handle_deferred_tool_calls(
    ctx: RunContext[AgentDepsT],
    *,
    requests: DeferredToolRequests
) -> DeferredToolResults | None

Handle deferred tool calls (approval-required or externally-executed) inline during an agent run.

Called by [ToolManager][pydantic_ai.tool_manager.ToolManager] when:

Uses accumulation dispatch: each capability in the chain receives remaining unresolved requests and can resolve some or all of them. Results are merged and unresolved calls are passed to the next capability.

Return a DeferredToolResults to resolve some or all calls. Return None to leave all calls unresolved.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
async def handle_deferred_tool_calls(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    requests: DeferredToolRequests,
) -> DeferredToolResults | None:
    """Handle deferred tool calls (approval-required or externally-executed) inline during an agent run.

    Called by [`ToolManager`][pydantic_ai.tool_manager.ToolManager] when:

    - a tool raises [`ApprovalRequired`][pydantic_ai.exceptions.ApprovalRequired] or
      [`CallDeferred`][pydantic_ai.exceptions.CallDeferred] during execution, or
    - the model calls a tool registered with `requires_approval=True` (see
      [Human-in-the-Loop Tool Approval](../deferred-tools.md#human-in-the-loop-tool-approval))
      or a tool backed by [external execution](../deferred-tools.md#external-tool-execution).

    Uses accumulation dispatch: each capability in the chain receives remaining
    unresolved requests and can resolve some or all of them. Results are merged
    and unresolved calls are passed to the next capability.

    **Return** a [`DeferredToolResults`][pydantic_ai.tools.DeferredToolResults] to resolve
    some or all calls.
    **Return** `None` to leave all calls unresolved.
    """
    return None

prefix_tools

prefix_tools(prefix: str) -> PrefixTools[AgentDepsT]

Returns a new capability that wraps this one and prefixes its tool names.

Only this capability's tools are prefixed; other agent tools are unaffected.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
848
849
850
851
852
853
854
855
def prefix_tools(self, prefix: str) -> PrefixTools[AgentDepsT]:
    """Returns a new capability that wraps this one and prefixes its tool names.

    Only this capability's tools are prefixed; other agent tools are unaffected.
    """
    from .prefix_tools import PrefixTools

    return PrefixTools(wrapped=self, prefix=prefix)

AgentNode module-attribute

AgentNode: TypeAlias = (
    "_agent_graph.AgentNode[AgentDepsT, Any]"
)

Type alias for an agent graph node (UserPromptNode, ModelRequestNode, CallToolsNode).

CapabilityOrdering dataclass

Ordering constraints for a capability within a combined capability chain.

Capabilities follow middleware semantics: the first capability in the list is the outermost layer, wrapping all others. Declare ordering constraints via get_ordering to control a capability's position in the chain regardless of how the user lists them.

When a CombinedCapability is constructed, it topologically sorts its children to satisfy these constraints, preserving user-provided order as a tiebreaker.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/abstract.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@dataclass
class CapabilityOrdering:
    """Ordering constraints for a capability within a combined capability chain.

    Capabilities follow middleware semantics: the first capability in the list is the
    **outermost** layer, wrapping all others. Declare ordering constraints via
    [`get_ordering`][pydantic_ai.capabilities.AbstractCapability.get_ordering]
    to control a capability's position in the chain regardless of how the user lists them.

    When a [`CombinedCapability`][pydantic_ai.capabilities.CombinedCapability] is
    constructed, it topologically sorts its children to satisfy these constraints,
    preserving user-provided order as a tiebreaker.
    """

    position: CapabilityPosition | None = None
    """Fixed position in the chain, or `None` for user-provided order."""

    wraps: Sequence[CapabilityRef] = ()
    """This capability wraps around (is outside of) these capabilities in the middleware chain.

    Each entry can be a capability **type** (matches all instances of that type via `issubclass`)
    or a specific capability **instance** (matches by identity via `is`).

    Note: instance refs use identity (`is`) matching, so if a capability's
    [`for_run`][pydantic_ai.capabilities.AbstractCapability.for_run] returns a
    new instance, refs to the original will no longer match. Use type refs
    when the target capability uses per-run state isolation.
    """

    wrapped_by: Sequence[CapabilityRef] = ()
    """This capability is wrapped by (is inside of) these capabilities in the middleware chain.

    Each entry can be a capability **type** (matches all instances of that type via `issubclass`)
    or a specific capability **instance** (matches by identity via `is`).

    Note: instance refs use identity (`is`) matching, so if a capability's
    [`for_run`][pydantic_ai.capabilities.AbstractCapability.for_run] returns a
    new instance, refs to the original will no longer match. Use type refs
    when the target capability uses per-run state isolation.
    """

    requires: Sequence[type[AbstractCapability[Any]]] = ()
    """These types must be present in the chain (no ordering implied)."""

position class-attribute instance-attribute

position: CapabilityPosition | None = None

Fixed position in the chain, or None for user-provided order.

wraps class-attribute instance-attribute

wraps: Sequence[CapabilityRef] = ()

This capability wraps around (is outside of) these capabilities in the middleware chain.

Each entry can be a capability type (matches all instances of that type via issubclass) or a specific capability instance (matches by identity via is).

Note: instance refs use identity (is) matching, so if a capability's for_run returns a new instance, refs to the original will no longer match. Use type refs when the target capability uses per-run state isolation.

wrapped_by class-attribute instance-attribute

wrapped_by: Sequence[CapabilityRef] = ()

This capability is wrapped by (is inside of) these capabilities in the middleware chain.

Each entry can be a capability type (matches all instances of that type via issubclass) or a specific capability instance (matches by identity via is).

Note: instance refs use identity (is) matching, so if a capability's for_run returns a new instance, refs to the original will no longer match. Use type refs when the target capability uses per-run state isolation.

requires class-attribute instance-attribute

requires: Sequence[type[AbstractCapability[Any]]] = ()

These types must be present in the chain (no ordering implied).

CapabilityPosition module-attribute

CapabilityPosition = Literal['outermost', 'innermost']

Position tier for a capability in the middleware chain.

  • 'outermost': in the outermost tier, before all non-outermost capabilities. Multiple capabilities can declare 'outermost'; original list order breaks ties within the tier, and wraps/wrapped_by edges refine order further.
  • 'innermost': in the innermost tier, after all non-innermost capabilities. Same tie-breaking rules apply.

CapabilityRef module-attribute

CapabilityRef: TypeAlias = (
    "type[AbstractCapability[Any]] | AbstractCapability[Any]"
)

Reference to a capability — either a type (matches all instances of that type) or a specific instance (matches by identity).

NodeResult module-attribute

NodeResult: TypeAlias = (
    "_agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]"
)

Type alias for the result of executing an agent graph node: either the next node or End.

RawOutput module-attribute

RawOutput: TypeAlias = str | dict[str, Any]

Type alias for raw output data (text or tool args).

RawToolArgs module-attribute

RawToolArgs: TypeAlias = str | dict[str, Any]

Type alias for raw (pre-validation) tool arguments.

ValidatedToolArgs module-attribute

ValidatedToolArgs: TypeAlias = dict[str, Any]

Type alias for validated tool arguments.

WrapModelRequestHandler module-attribute

WrapModelRequestHandler: TypeAlias = (
    "Callable[[ModelRequestContext], Awaitable[ModelResponse]]"
)

Handler type for wrap_model_request.

WrapNodeRunHandler module-attribute

WrapNodeRunHandler: TypeAlias = (
    "Callable[[_agent_graph.AgentNode[AgentDepsT, Any]], Awaitable[_agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]]]"
)

Handler type for wrap_node_run.

WrapOutputProcessHandler module-attribute

WrapOutputProcessHandler: TypeAlias = Callable[
    [Any], Awaitable[Any]
]

Handler type for wrap_output_process.

WrapOutputValidateHandler module-attribute

WrapOutputValidateHandler: TypeAlias = Callable[
    [RawOutput], Awaitable[Any]
]

Handler type for wrap_output_validate.

WrapRunHandler module-attribute

WrapRunHandler: TypeAlias = (
    "Callable[[], Awaitable[AgentRunResult[Any]]]"
)

Handler type for wrap_run.

WrapToolExecuteHandler module-attribute

WrapToolExecuteHandler: TypeAlias = Callable[
    [ValidatedToolArgs], Awaitable[Any]
]

Handler type for wrap_tool_execute.

WrapToolValidateHandler module-attribute

WrapToolValidateHandler: TypeAlias = Callable[
    [RawToolArgs], Awaitable[ValidatedToolArgs]
]

Handler type for wrap_tool_validate.

CombinedCapability dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that combines multiple capabilities.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/combined.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
@dataclass
class CombinedCapability(AbstractCapability[AgentDepsT]):
    """A capability that combines multiple capabilities."""

    capabilities: Sequence[AbstractCapability[AgentDepsT]]

    def __post_init__(self) -> None:
        # Splat any nested `CombinedCapability` so leaves participate as siblings in the
        # outer ordering pass. Without this, a nested `CombinedCapability` whose leaves
        # span both `outermost` and `innermost` tiers would force `_effective_ordering`
        # to merge them into a single position and raise `Conflicting positions`.
        flat: list[AbstractCapability[AgentDepsT]] = []
        for cap in self.capabilities:
            if isinstance(cap, CombinedCapability):
                flat.extend(cap.capabilities)
            else:
                flat.append(cap)
        self.capabilities = flat
        if any(leaf.get_ordering() is not None for leaf in collect_leaves(self)):
            self.capabilities = sort_capabilities(list(self.capabilities))

    def apply(self, visitor: Callable[[AbstractCapability[AgentDepsT]], None]) -> None:
        for cap in self.capabilities:
            cap.apply(visitor)

    @property
    def has_wrap_node_run(self) -> bool:
        return any(c.has_wrap_node_run for c in self.capabilities)

    @property
    def has_wrap_run_event_stream(self) -> bool:
        return any(c.has_wrap_run_event_stream for c in self.capabilities)

    async def for_run(self, ctx: RunContext[AgentDepsT]) -> AbstractCapability[AgentDepsT]:
        new_caps = await gather(*(c.for_run(ctx) for c in self.capabilities))
        if all(new is old for new, old in zip(new_caps, self.capabilities)):
            return self
        return replace(self, capabilities=list(new_caps))

    def get_instructions(self) -> AgentInstructions[AgentDepsT] | None:
        instructions: list[str | _system_prompt.SystemPromptFunc[AgentDepsT]] = []
        for capability in self.capabilities:
            instructions.extend(normalize_instructions(capability.get_instructions()))
        return instructions or None

    def get_model_settings(self) -> ModelSettings | Callable[[RunContext[AgentDepsT]], ModelSettings] | None:
        # Collect settings in order, preserving each capability's position in the merge chain.
        # Each entry is either a static dict or a dynamic callable.
        settings_chain: list[ModelSettings | Callable[[RunContext[AgentDepsT]], ModelSettings]] = []
        for capability in self.capabilities:
            cap_settings = capability.get_model_settings()
            if cap_settings is not None:
                settings_chain.append(cap_settings)
        if not settings_chain:
            return None
        if all(not callable(s) for s in settings_chain):
            # All static — merge eagerly
            merged: ModelSettings | None = None
            for s in settings_chain:
                merged = merge_model_settings(merged, s)  # type: ignore[arg-type]
            return merged

        def resolve(ctx: RunContext[AgentDepsT]) -> ModelSettings:
            merged: ModelSettings | None = None
            for entry in settings_chain:
                # Mutate ctx.model_settings so each dynamic entry sees the
                # accumulated settings from all prior layers.
                ctx.model_settings = merge_model_settings(ctx.model_settings, merged)
                resolved = entry(ctx) if callable(entry) else entry
                merged = merge_model_settings(merged, resolved)
            # Update ctx.model_settings to include the final entry's contribution
            ctx.model_settings = merge_model_settings(ctx.model_settings, merged)
            return merged if merged is not None else ModelSettings()

        return resolve

    def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
        toolsets: list[AbstractToolset[AgentDepsT]] = []
        for capability in self.capabilities:
            toolset = capability.get_toolset()
            if toolset is None:
                pass
            elif isinstance(toolset, AbstractToolset):
                # Pyright can't narrow Callable type aliases out of unions after isinstance check
                toolsets.append(toolset)  # pyright: ignore[reportUnknownArgumentType]
            else:
                toolsets.append(DynamicToolset[AgentDepsT](toolset_func=toolset))
        return CombinedToolset(toolsets) if toolsets else None

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        native_tools: list[AgentNativeTool[AgentDepsT]] = []
        for capability in self.capabilities:
            native_tools.extend(capability.get_native_tools() or [])
        return native_tools

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None:
        wrapped = toolset
        any_wrapped = False
        for capability in reversed(self.capabilities):
            result = capability.get_wrapper_toolset(wrapped)
            if result is not None:
                wrapped = result
                any_wrapped = True
        return wrapped if any_wrapped else None

    # --- Tool preparation hooks ---

    async def prepare_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        for capability in self.capabilities:
            tool_defs = await capability.prepare_tools(ctx, tool_defs)
        return tool_defs

    async def prepare_output_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        for capability in self.capabilities:
            tool_defs = await capability.prepare_output_tools(ctx, tool_defs)
        return tool_defs

    # --- Run lifecycle hooks ---

    async def before_run(
        self,
        ctx: RunContext[AgentDepsT],
    ) -> None:
        for capability in self.capabilities:
            await capability.before_run(ctx)

    async def after_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        result: AgentRunResult[Any],
    ) -> AgentRunResult[Any]:
        for capability in reversed(self.capabilities):
            result = await capability.after_run(ctx, result=result)
        return result

    async def wrap_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        handler: Callable[[], Awaitable[AgentRunResult[Any]]],
    ) -> AgentRunResult[Any]:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_run_wrap(cap, ctx, chain)
        return await chain()

    async def on_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        error: BaseException,
    ) -> AgentRunResult[Any]:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_run_error(ctx, error=error)
            except BaseException as new_error:
                error = new_error
        raise error

    # --- Node run lifecycle hooks ---

    async def before_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: _agent_graph.AgentNode[AgentDepsT, Any],
    ) -> _agent_graph.AgentNode[AgentDepsT, Any]:
        for capability in self.capabilities:
            node = await capability.before_node_run(ctx, node=node)
        return node

    async def after_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: _agent_graph.AgentNode[AgentDepsT, Any],
        result: _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]],
    ) -> _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]:
        for capability in reversed(self.capabilities):
            result = await capability.after_node_run(ctx, node=node, result=result)
        return result

    async def wrap_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: _agent_graph.AgentNode[AgentDepsT, Any],
        handler: Callable[
            [_agent_graph.AgentNode[AgentDepsT, Any]],
            Awaitable[_agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]],
        ],
    ) -> _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_node_run_wrap(cap, ctx, chain)
        return await chain(node)

    async def on_node_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: _agent_graph.AgentNode[AgentDepsT, Any],
        error: Exception,
    ) -> _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_node_run_error(ctx, node=node, error=error)
            except Exception as new_error:
                error = new_error
        raise error

    # --- Event stream hook ---

    async def wrap_run_event_stream(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        stream: AsyncIterable[AgentStreamEvent],
    ) -> AsyncIterable[AgentStreamEvent]:
        for cap in reversed(self.capabilities):
            stream = cap.wrap_run_event_stream(ctx, stream=stream)
        async for event in stream:
            yield event

    # --- Model request lifecycle hooks ---

    async def before_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        request_context: ModelRequestContext,
    ) -> ModelRequestContext:
        for capability in self.capabilities:
            request_context = await capability.before_model_request(ctx, request_context)
        return request_context

    async def after_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        response: ModelResponse,
    ) -> ModelResponse:
        for capability in reversed(self.capabilities):
            response = await capability.after_model_request(ctx, request_context=request_context, response=response)
        return response

    async def wrap_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        handler: Callable[[ModelRequestContext], Awaitable[ModelResponse]],
    ) -> ModelResponse:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_model_request_wrap(cap, ctx, chain)
        return await chain(request_context)

    async def on_model_request_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        error: Exception,
    ) -> ModelResponse:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_model_request_error(ctx, request_context=request_context, error=error)
            except Exception as new_error:
                error = new_error
        raise error

    # --- Tool validate lifecycle hooks ---

    async def before_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: str | dict[str, Any],
    ) -> str | dict[str, Any]:
        for capability in self.capabilities:
            args = await capability.before_tool_validate(ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def after_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: dict[str, Any],
    ) -> dict[str, Any]:
        for capability in reversed(self.capabilities):
            args = await capability.after_tool_validate(ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def wrap_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: str | dict[str, Any],
        handler: Callable[[str | dict[str, Any]], Awaitable[dict[str, Any]]],
    ) -> dict[str, Any]:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_tool_validate_wrap(cap, ctx, call, tool_def, chain)
        return await chain(args)

    async def on_tool_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: str | dict[str, Any],
        error: ValidationError | ModelRetry,
    ) -> dict[str, Any]:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_tool_validate_error(
                    ctx, call=call, tool_def=tool_def, args=args, error=error
                )
            except (ValidationError, ModelRetry) as new_error:
                error = new_error
            except (
                Exception
            ):  # pragma: no cover — defensive; on_tool_validate_error shouldn't raise non-validation errors
                raise
        raise error

    # --- Tool execute lifecycle hooks ---

    async def before_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: dict[str, Any],
    ) -> dict[str, Any]:
        for capability in self.capabilities:
            args = await capability.before_tool_execute(ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def after_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: dict[str, Any],
        result: Any,
    ) -> Any:
        for capability in reversed(self.capabilities):
            result = await capability.after_tool_execute(ctx, call=call, tool_def=tool_def, args=args, result=result)
        return result

    async def wrap_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: dict[str, Any],
        handler: Callable[[dict[str, Any]], Awaitable[Any]],
    ) -> Any:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_tool_execute_wrap(cap, ctx, call, tool_def, chain)
        return await chain(args)

    async def on_tool_execute_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: dict[str, Any],
        error: Exception,
    ) -> Any:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_tool_execute_error(ctx, call=call, tool_def=tool_def, args=args, error=error)
            except Exception as new_error:
                error = new_error
        raise error

    # --- Output validate lifecycle hooks ---

    async def before_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
    ) -> RawOutput:
        for capability in self.capabilities:
            output = await capability.before_output_validate(ctx, output_context=output_context, output=output)
        return output

    async def after_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        for capability in reversed(self.capabilities):
            output = await capability.after_output_validate(ctx, output_context=output_context, output=output)
        return output

    async def wrap_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        handler: WrapOutputValidateHandler,
    ) -> Any:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_output_validate_wrap(cap, ctx, output_context, chain)
        return await chain(output)

    async def on_output_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        error: ValidationError | ModelRetry,
    ) -> Any:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_output_validate_error(
                    ctx, output_context=output_context, output=output, error=error
                )
            except (ValidationError, ModelRetry) as new_error:
                error = new_error
            except Exception:  # pragma: no cover — defensive
                raise
        raise error

    # --- Output process lifecycle hooks ---

    async def before_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        for capability in self.capabilities:
            output = await capability.before_output_process(ctx, output_context=output_context, output=output)
        return output

    async def after_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        for capability in reversed(self.capabilities):
            output = await capability.after_output_process(ctx, output_context=output_context, output=output)
        return output

    async def wrap_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        handler: WrapOutputProcessHandler,
    ) -> Any:
        chain = handler
        for cap in reversed(self.capabilities):
            chain = _make_output_process_wrap(cap, ctx, output_context, chain)
        return await chain(output)

    async def on_output_process_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        error: Exception,
    ) -> Any:
        for capability in reversed(self.capabilities):
            try:
                return await capability.on_output_process_error(
                    ctx, output_context=output_context, output=output, error=error
                )
            except Exception as new_error:
                error = new_error
        raise error

    async def handle_deferred_tool_calls(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        requests: DeferredToolRequests,
    ) -> DeferredToolResults | None:
        accumulated = DeferredToolResults()
        remaining = requests
        any_handled = False
        for capability in self.capabilities:
            result = await capability.handle_deferred_tool_calls(ctx, requests=remaining)
            if result is None or not (result.approvals or result.calls):
                continue
            any_handled = True
            accumulated.update(result)
            remaining_or_none = remaining.remaining(result)
            if remaining_or_none is None:
                break
            remaining = remaining_or_none
        return accumulated if any_handled else None

HandleDeferredToolCalls dataclass

Bases: AbstractCapability[AgentDepsT]

Resolves deferred tool calls inline during an agent run using a handler function.

When tools require approval or external execution, the agent normally pauses the run and returns DeferredToolRequests as output. This capability intercepts deferred tool calls, calls the provided handler to resolve them, and continues the agent run automatically.

The handler receives the RunContext and the DeferredToolRequests. It may return DeferredToolResults with results for some or all pending calls, or return None to decline handling (the next capability in the chain gets a chance, otherwise the calls bubble up as DeferredToolRequests output).

Example
from pydantic_ai import Agent
from pydantic_ai.capabilities import HandleDeferredToolCalls
from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults, RunContext


async def handle_deferred(
    ctx: RunContext, requests: DeferredToolRequests
) -> DeferredToolResults:
    # Auto-approve all tools that need approval
    return requests.build_results(approve_all=True)


agent = Agent(
    'openai:gpt-5',
    capabilities=[HandleDeferredToolCalls(handler=handle_deferred)],
)
Source code in pydantic_ai_slim/pydantic_ai/capabilities/deferred_tool_handler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class HandleDeferredToolCalls(AbstractCapability[AgentDepsT]):
    """Resolves deferred tool calls inline during an agent run using a handler function.

    When tools require approval or external execution, the agent normally pauses the run
    and returns [`DeferredToolRequests`][pydantic_ai.tools.DeferredToolRequests] as output.
    This capability intercepts deferred tool calls, calls the provided handler to resolve
    them, and continues the agent run automatically.

    The handler receives the [`RunContext`][pydantic_ai.tools.RunContext] and the
    [`DeferredToolRequests`][pydantic_ai.tools.DeferredToolRequests]. It may return
    [`DeferredToolResults`][pydantic_ai.tools.DeferredToolResults] with results for
    some or all pending calls, or return `None` to decline handling (the next capability
    in the chain gets a chance, otherwise the calls bubble up as `DeferredToolRequests`
    output).

    Example:
        ```python
        from pydantic_ai import Agent
        from pydantic_ai.capabilities import HandleDeferredToolCalls
        from pydantic_ai.tools import DeferredToolRequests, DeferredToolResults, RunContext


        async def handle_deferred(
            ctx: RunContext, requests: DeferredToolRequests
        ) -> DeferredToolResults:
            # Auto-approve all tools that need approval
            return requests.build_results(approve_all=True)


        agent = Agent(
            'openai:gpt-5',
            capabilities=[HandleDeferredToolCalls(handler=handle_deferred)],
        )
        ```
    """

    handler: Callable[
        [RunContext[AgentDepsT], DeferredToolRequests],
        DeferredToolResults | None | Awaitable[DeferredToolResults | None],
    ]
    """The handler function that resolves deferred tool requests.

    Receives the run context and the deferred tool requests, and returns
    [`DeferredToolResults`][pydantic_ai.tools.DeferredToolResults] with results for some
    or all pending calls, or `None` to decline handling. Can be sync or async.
    """

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None

    async def handle_deferred_tool_calls(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        requests: DeferredToolRequests,
    ) -> DeferredToolResults | None:
        result = self.handler(ctx, requests)
        if inspect.isawaitable(result):
            return await result
        return result

handler instance-attribute

The handler function that resolves deferred tool requests.

Receives the run context and the deferred tool requests, and returns DeferredToolResults with results for some or all pending calls, or None to decline handling. Can be sync or async.

Hooks

Bases: AbstractCapability[AgentDepsT]

Register hook functions via decorators or constructor kwargs.

For extension developers building reusable capabilities, subclass AbstractCapability directly. For application code that needs a few hooks without the ceremony of a subclass, use Hooks.

Example using decorators:

hooks = Hooks()

@hooks.on.before_model_request
async def log_request(ctx, request_context):
    print(f'Request: {request_context}')
    return request_context

agent = Agent('openai:gpt-5', capabilities=[hooks])

Example using constructor kwargs:

agent = Agent('openai:gpt-5', capabilities=[
    Hooks(before_model_request=log_request)
])
Source code in pydantic_ai_slim/pydantic_ai/capabilities/hooks.py
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
class Hooks(AbstractCapability[AgentDepsT]):
    """Register hook functions via decorators or constructor kwargs.

    For extension developers building reusable capabilities, subclass
    [`AbstractCapability`][pydantic_ai.capabilities.AbstractCapability] directly.
    For application code that needs a few hooks without the ceremony of a subclass,
    use `Hooks`.

    Example using decorators:

    ```python {test="skip" lint="skip"}
    hooks = Hooks()

    @hooks.on.before_model_request
    async def log_request(ctx, request_context):
        print(f'Request: {request_context}')
        return request_context

    agent = Agent('openai:gpt-5', capabilities=[hooks])
    ```

    Example using constructor kwargs:

    ```python {test="skip" lint="skip"}
    agent = Agent('openai:gpt-5', capabilities=[
        Hooks(before_model_request=log_request)
    ])
    ```
    """

    _registry: dict[str, list[_HookEntry[Any]]]

    def __init__(
        self,
        *,
        # Run lifecycle
        before_run: BeforeRunHookFunc | None = None,
        after_run: AfterRunHookFunc | None = None,
        run: WrapRunHookFunc | None = None,
        run_error: OnRunErrorHookFunc | None = None,
        # Node lifecycle
        before_node_run: BeforeNodeRunHookFunc | None = None,
        after_node_run: AfterNodeRunHookFunc | None = None,
        node_run: WrapNodeRunHookFunc | None = None,
        node_run_error: OnNodeRunErrorHookFunc | None = None,
        # Event stream
        run_event_stream: WrapRunEventStreamHookFunc | None = None,
        event: OnEventHookFunc | None = None,
        # Model request
        before_model_request: BeforeModelRequestHookFunc | None = None,
        after_model_request: AfterModelRequestHookFunc | None = None,
        model_request: WrapModelRequestHookFunc | None = None,
        model_request_error: OnModelRequestErrorHookFunc | None = None,
        # Tool preparation
        prepare_tools: PrepareToolsHookFunc | None = None,
        prepare_output_tools: PrepareOutputToolsHookFunc | None = None,
        # Tool validation
        before_tool_validate: BeforeToolValidateHookFunc | None = None,
        after_tool_validate: AfterToolValidateHookFunc | None = None,
        tool_validate: WrapToolValidateHookFunc | None = None,
        tool_validate_error: OnToolValidateErrorHookFunc | None = None,
        # Tool execution
        before_tool_execute: BeforeToolExecuteHookFunc | None = None,
        after_tool_execute: AfterToolExecuteHookFunc | None = None,
        tool_execute: WrapToolExecuteHookFunc | None = None,
        tool_execute_error: OnToolExecuteErrorHookFunc | None = None,
        # Output validation
        before_output_validate: BeforeOutputValidateHookFunc | None = None,
        after_output_validate: AfterOutputValidateHookFunc | None = None,
        output_validate: WrapOutputValidateHookFunc | None = None,
        output_validate_error: OnOutputValidateErrorHookFunc | None = None,
        # Output processing
        before_output_process: BeforeOutputProcessHookFunc | None = None,
        after_output_process: AfterOutputProcessHookFunc | None = None,
        output_process: WrapOutputProcessHookFunc | None = None,
        output_process_error: OnOutputProcessErrorHookFunc | None = None,
        # Deferred tool calls
        deferred_tool_calls: HandleDeferredToolCallsHookFunc | None = None,
        # Ordering
        ordering: CapabilityOrdering | None = None,
    ):
        self._ordering = ordering
        self._registry = {}
        # Map constructor kwarg names to internal registry keys (AbstractCapability method names)
        _kwargs: dict[str, Any] = {
            'before_run': before_run,
            'after_run': after_run,
            'wrap_run': run,
            'on_run_error': run_error,
            'before_node_run': before_node_run,
            'after_node_run': after_node_run,
            'wrap_node_run': node_run,
            'on_node_run_error': node_run_error,
            'wrap_run_event_stream': run_event_stream,
            '_on_event': event,
            'before_model_request': before_model_request,
            'after_model_request': after_model_request,
            'wrap_model_request': model_request,
            'on_model_request_error': model_request_error,
            'prepare_tools': prepare_tools,
            'prepare_output_tools': prepare_output_tools,
            'before_tool_validate': before_tool_validate,
            'after_tool_validate': after_tool_validate,
            'wrap_tool_validate': tool_validate,
            'on_tool_validate_error': tool_validate_error,
            'before_tool_execute': before_tool_execute,
            'after_tool_execute': after_tool_execute,
            'wrap_tool_execute': tool_execute,
            'on_tool_execute_error': tool_execute_error,
            'before_output_validate': before_output_validate,
            'after_output_validate': after_output_validate,
            'wrap_output_validate': output_validate,
            'on_output_validate_error': output_validate_error,
            'before_output_process': before_output_process,
            'after_output_process': after_output_process,
            'wrap_output_process': output_process,
            'on_output_process_error': output_process_error,
            'handle_deferred_tool_calls': deferred_tool_calls,
        }
        for key, func in _kwargs.items():
            if func is not None:
                self._registry.setdefault(key, []).append(_HookEntry(func))

    @cached_property
    def on(self) -> _HookRegistration[AgentDepsT]:
        """Decorator namespace for registering hook functions."""
        return _HookRegistration(self)

    def _get(self, key: str) -> list[_HookEntry[Any]]:
        return self._registry.get(key, [])

    @property
    def has_wrap_node_run(self) -> bool:
        return bool(self._get('wrap_node_run'))

    @property
    def has_wrap_run_event_stream(self) -> bool:
        return bool(self._get('wrap_run_event_stream') or self._get('_on_event'))

    def get_ordering(self) -> CapabilityOrdering | None:
        return self._ordering

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None

    def __repr__(self) -> str:
        registered = {k: len(v) for k, v in self._registry.items() if v}
        return f'Hooks({registered})'

    # --- AbstractCapability method overrides ---
    # These dispatch to registered hook functions in self._registry.

    async def before_run(self, ctx: RunContext[AgentDepsT]) -> None:
        for entry in self._get('before_run'):
            await _call_entry(entry, 'before_run', ctx)

    async def after_run(self, ctx: RunContext[AgentDepsT], *, result: AgentRunResult[Any]) -> AgentRunResult[Any]:
        for entry in self._get('after_run'):
            result = await _call_entry(entry, 'after_run', ctx, result=result)
        return result

    async def wrap_run(self, ctx: RunContext[AgentDepsT], *, handler: WrapRunHandler) -> AgentRunResult[Any]:
        entries = self._get('wrap_run')
        if not entries:
            return await handler()
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(entry, 'wrap_run', ctx, {}, chain, None)
        return await chain()

    async def on_run_error(self, ctx: RunContext[AgentDepsT], *, error: BaseException) -> AgentRunResult[Any]:
        for entry in self._get('on_run_error'):
            try:
                return await _call_entry(entry, 'on_run_error', ctx, error=error)
            except BaseException as new_error:
                error = new_error
        raise error

    async def before_node_run(
        self, ctx: RunContext[AgentDepsT], *, node: AgentNode[AgentDepsT]
    ) -> AgentNode[AgentDepsT]:
        for entry in self._get('before_node_run'):
            node = await _call_entry(entry, 'before_node_run', ctx, node=node)
        return node

    async def after_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        result: NodeResult[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        for entry in self._get('after_node_run'):
            result = await _call_entry(entry, 'after_node_run', ctx, node=node, result=result)
        return result

    async def wrap_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        handler: WrapNodeRunHandler[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        entries = self._get('wrap_node_run')
        if not entries:
            return await handler(node)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(entry, 'wrap_node_run', ctx, {}, chain, 'node')
        return await chain(node)

    async def on_node_run_error(
        self, ctx: RunContext[AgentDepsT], *, node: AgentNode[AgentDepsT], error: Exception
    ) -> NodeResult[AgentDepsT]:
        for entry in self._get('on_node_run_error'):
            try:
                return await _call_entry(entry, 'on_node_run_error', ctx, node=node, error=error)
            except Exception as new_error:
                error = new_error
        raise error

    async def wrap_run_event_stream(
        self, ctx: RunContext[AgentDepsT], *, stream: AsyncIterable[AgentStreamEvent]
    ) -> AsyncIterable[AgentStreamEvent]:
        # First, wrap with per-event callbacks (innermost)
        event_entries = self._get('_on_event')
        if event_entries:
            stream = _event_callback_stream(ctx, stream, event_entries)
        # Then chain explicit stream wrappers (outermost)
        for entry in reversed(self._get('wrap_run_event_stream')):
            stream = entry.func(ctx, stream=stream)
        async for event in stream:
            yield event

    async def before_model_request(
        self, ctx: RunContext[AgentDepsT], request_context: ModelRequestContext
    ) -> ModelRequestContext:
        for entry in self._get('before_model_request'):
            request_context = await _call_entry(entry, 'before_model_request', ctx, request_context)
        return request_context

    async def after_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        response: ModelResponse,
    ) -> ModelResponse:
        for entry in self._get('after_model_request'):
            response = await _call_entry(
                entry, 'after_model_request', ctx, request_context=request_context, response=response
            )
        return response

    async def wrap_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        handler: WrapModelRequestHandler,
    ) -> ModelResponse:
        entries = self._get('wrap_model_request')
        if not entries:
            return await handler(request_context)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(entry, 'wrap_model_request', ctx, {}, chain, 'request_context')
        return await chain(request_context)

    async def on_model_request_error(
        self, ctx: RunContext[AgentDepsT], *, request_context: ModelRequestContext, error: Exception
    ) -> ModelResponse:
        for entry in self._get('on_model_request_error'):
            try:
                return await _call_entry(
                    entry, 'on_model_request_error', ctx, request_context=request_context, error=error
                )
            except Exception as new_error:
                error = new_error
        raise error

    async def prepare_tools(self, ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]) -> list[ToolDefinition]:
        for entry in self._get('prepare_tools'):
            tool_defs = await _call_entry(entry, 'prepare_tools', ctx, tool_defs)
        return tool_defs

    async def prepare_output_tools(
        self, ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]
    ) -> list[ToolDefinition]:
        for entry in self._get('prepare_output_tools'):
            tool_defs = await _call_entry(entry, 'prepare_output_tools', ctx, tool_defs)
        return tool_defs

    async def before_tool_validate(
        self, ctx: RunContext[AgentDepsT], *, call: ToolCallPart, tool_def: ToolDefinition, args: RawToolArgs
    ) -> RawToolArgs:
        for entry in _filter_tool_entries(self._get('before_tool_validate'), call=call):
            args = await _call_entry(entry, 'before_tool_validate', ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def after_tool_validate(
        self, ctx: RunContext[AgentDepsT], *, call: ToolCallPart, tool_def: ToolDefinition, args: ValidatedToolArgs
    ) -> ValidatedToolArgs:
        for entry in _filter_tool_entries(self._get('after_tool_validate'), call=call):
            args = await _call_entry(entry, 'after_tool_validate', ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def wrap_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        handler: WrapToolValidateHandler,
    ) -> ValidatedToolArgs:
        entries = _filter_tool_entries(self._get('wrap_tool_validate'), call=call)
        if not entries:
            return await handler(args)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(
                entry, 'wrap_tool_validate', ctx, {'call': call, 'tool_def': tool_def}, chain, 'args'
            )
        return await chain(args)

    async def on_tool_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        error: ValidationError | ModelRetry,
    ) -> ValidatedToolArgs:
        for entry in _filter_tool_entries(self._get('on_tool_validate_error'), call=call):
            try:
                return await _call_entry(
                    entry, 'on_tool_validate_error', ctx, call=call, tool_def=tool_def, args=args, error=error
                )
            except (ValidationError, ModelRetry) as new_error:
                error = new_error
        raise error

    async def before_tool_execute(
        self, ctx: RunContext[AgentDepsT], *, call: ToolCallPart, tool_def: ToolDefinition, args: ValidatedToolArgs
    ) -> ValidatedToolArgs:
        for entry in _filter_tool_entries(self._get('before_tool_execute'), call=call):
            args = await _call_entry(entry, 'before_tool_execute', ctx, call=call, tool_def=tool_def, args=args)
        return args

    async def after_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        result: Any,
    ) -> Any:
        for entry in _filter_tool_entries(self._get('after_tool_execute'), call=call):
            result = await _call_entry(
                entry, 'after_tool_execute', ctx, call=call, tool_def=tool_def, args=args, result=result
            )
        return result

    async def wrap_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        handler: WrapToolExecuteHandler,
    ) -> Any:
        entries = _filter_tool_entries(self._get('wrap_tool_execute'), call=call)
        if not entries:
            return await handler(args)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(
                entry, 'wrap_tool_execute', ctx, {'call': call, 'tool_def': tool_def}, chain, 'args'
            )
        return await chain(args)

    async def on_tool_execute_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        error: Exception,
    ) -> Any:
        for entry in _filter_tool_entries(self._get('on_tool_execute_error'), call=call):
            try:
                return await _call_entry(
                    entry, 'on_tool_execute_error', ctx, call=call, tool_def=tool_def, args=args, error=error
                )
            except Exception as new_error:
                error = new_error
        raise error

    async def before_output_validate(
        self, ctx: RunContext[AgentDepsT], *, output_context: OutputContext, output: RawOutput
    ) -> RawOutput:
        for entry in self._get('before_output_validate'):
            output = await _call_entry(
                entry, 'before_output_validate', ctx, output_context=output_context, output=output
            )
        return output

    async def after_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        for entry in self._get('after_output_validate'):
            output = await _call_entry(
                entry, 'after_output_validate', ctx, output_context=output_context, output=output
            )
        return output

    async def wrap_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        handler: WrapOutputValidateHandler,
    ) -> Any:
        entries = self._get('wrap_output_validate')
        if not entries:
            return await handler(output)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(
                entry, 'wrap_output_validate', ctx, {'output_context': output_context}, chain, 'output'
            )
        return await chain(output)

    async def on_output_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        error: ValidationError | ModelRetry,
    ) -> Any:
        for entry in self._get('on_output_validate_error'):
            try:
                return await _call_entry(
                    entry,
                    'on_output_validate_error',
                    ctx,
                    output_context=output_context,
                    output=output,
                    error=error,
                )
            except (ValidationError, ModelRetry) as new_error:
                error = new_error
        raise error

    async def before_output_process(
        self, ctx: RunContext[AgentDepsT], *, output_context: OutputContext, output: Any
    ) -> Any:
        for entry in self._get('before_output_process'):
            output = await _call_entry(
                entry, 'before_output_process', ctx, output_context=output_context, output=output
            )
        return output

    async def after_output_process(
        self, ctx: RunContext[AgentDepsT], *, output_context: OutputContext, output: Any
    ) -> Any:
        for entry in self._get('after_output_process'):
            output = await _call_entry(
                entry,
                'after_output_process',
                ctx,
                output_context=output_context,
                output=output,
            )
        return output

    async def wrap_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        handler: WrapOutputProcessHandler,
    ) -> Any:
        entries = self._get('wrap_output_process')
        if not entries:
            return await handler(output)
        chain: Callable[..., Any] = handler
        for entry in reversed(entries):
            chain = _make_wrap_link(
                entry, 'wrap_output_process', ctx, {'output_context': output_context}, chain, 'output'
            )
        return await chain(output)

    async def on_output_process_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        error: Exception,
    ) -> Any:
        for entry in self._get('on_output_process_error'):
            try:
                return await _call_entry(
                    entry, 'on_output_process_error', ctx, output_context=output_context, output=output, error=error
                )
            except Exception as new_error:
                error = new_error
        raise error

    async def handle_deferred_tool_calls(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        requests: DeferredToolRequests,
    ) -> DeferredToolResults | None:
        accumulated = DeferredToolResults()
        remaining = requests
        any_handled = False
        for entry in self._get('handle_deferred_tool_calls'):
            result = await _call_entry(entry, 'handle_deferred_tool_calls', ctx, requests=remaining)
            if result is None or not (result.approvals or result.calls):
                continue
            any_handled = True
            accumulated.update(result)
            remaining_or_none = remaining.remaining(result)
            if remaining_or_none is None:
                break
            remaining = remaining_or_none
        return accumulated if any_handled else None

on cached property

on: _HookRegistration[AgentDepsT]

Decorator namespace for registering hook functions.

HookTimeoutError

Bases: TimeoutError

Raised when a hook function exceeds its configured timeout.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/hooks.py
64
65
66
67
68
69
70
71
class HookTimeoutError(TimeoutError):
    """Raised when a hook function exceeds its configured timeout."""

    def __init__(self, hook_name: str, func_name: str, timeout: float):
        self.hook_name = hook_name
        self.func_name = func_name
        self.timeout = timeout
        super().__init__(f'Hook {hook_name!r} function {func_name!r} timed out after {timeout}s')

ImageGeneration dataclass

Bases: NativeOrLocalTool[AgentDepsT]

Image generation capability.

Uses the model's native image generation when available. When the model doesn't support it and fallback_model is provided, falls back to a local tool that delegates to a subagent running the specified image-capable model.

Image generation settings (quality, size, etc.) are forwarded to the ImageGenerationTool used by both the native and the local fallback subagent. When passing a custom native instance, its settings are also used for the fallback subagent; capability-level fields override any native instance settings.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/image_generation.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@dataclass(init=False)
class ImageGeneration(NativeOrLocalTool[AgentDepsT]):
    """Image generation capability.

    Uses the model's native image generation when available. When the model doesn't
    support it and `fallback_model` is provided, falls back to a local tool that
    delegates to a subagent running the specified image-capable model.

    Image generation settings (`quality`, `size`, etc.) are forwarded to the
    [`ImageGenerationTool`][pydantic_ai.native_tools.ImageGenerationTool] used by
    both the native and the local fallback subagent. When passing a custom `native`
    instance, its settings are also used for the fallback subagent; capability-level
    fields override any `native` instance settings.
    """

    fallback_model: ImageGenerationFallbackModel
    """Model to use for image generation when the agent's model doesn't support it natively.

    Must be a model that supports image generation via the
    [`ImageGenerationTool`][pydantic_ai.native_tools.ImageGenerationTool] native tool.
    This requires a conversational model with image generation support, not a dedicated
    image-only API. Examples:

    * `'openai-responses:gpt-5.4'` — OpenAI model with image generation support
    * `'google:gemini-3-pro-image-preview'` — Google image generation model

    Can be a model name string, `Model` instance, or a callable taking `RunContext`
    that returns a `Model` instance.
    """

    # Keep these fields in sync with ImageGenerationTool in native_tools.py.

    action: Literal['generate', 'edit', 'auto'] | None
    """Whether to generate a new image or edit an existing image.

    Supported by: OpenAI Responses. Default: `'auto'`.
    """

    background: Literal['transparent', 'opaque', 'auto'] | None
    """Background type for the generated image.

    Supported by: OpenAI Responses. `'transparent'` only supported for `'png'` and `'webp'`.
    """

    input_fidelity: Literal['high', 'low'] | None
    """Input fidelity for matching style/features of input images.

    Supported by: OpenAI Responses. Default: `'low'`.
    """

    moderation: Literal['auto', 'low'] | None
    """Moderation level for the generated image.

    Supported by: OpenAI Responses.
    """

    image_model: ImageGenerationModelName | None
    """The image generation model to use.

    Supported by: OpenAI Responses.
    """

    output_compression: int | None
    """Compression level for the output image.

    Supported by: OpenAI Responses (jpeg/webp, default: 100), Google Cloud (jpeg, default: 75).
    """

    output_format: Literal['png', 'webp', 'jpeg'] | None
    """Output format of the generated image.

    Supported by: OpenAI Responses (default: `'png'`), Google Cloud.
    """

    quality: Literal['low', 'medium', 'high', 'auto'] | None
    """Quality of the generated image.

    Supported by: OpenAI Responses.
    """

    size: Literal['auto', '1024x1024', '1024x1536', '1536x1024', '512', '1K', '2K', '4K'] | None
    """Size of the generated image.

    Supported by: OpenAI Responses (`'auto'`, `'1024x1024'`, `'1024x1536'`, `'1536x1024'`),
    Google (`'512'`, `'1K'`, `'2K'`, `'4K'`).
    """

    aspect_ratio: ImageAspectRatio | None
    """Aspect ratio for generated images.

    Supported by: Google (Gemini), OpenAI Responses (maps `'1:1'`, `'2:3'`, `'3:2'` to sizes).
    """

    def __init__(
        self,
        *,
        native: ImageGenerationTool
        | Callable[[RunContext[AgentDepsT]], Awaitable[ImageGenerationTool | None] | ImageGenerationTool | None]
        | bool = True,
        local: Tool[AgentDepsT] | Callable[..., Any] | Literal[False] | None = None,
        fallback_model: Model
        | KnownModelName
        | str
        | Callable[[RunContext[AgentDepsT]], Awaitable[Model] | Model]
        | None = None,
        action: Literal['generate', 'edit', 'auto'] | None = None,
        background: Literal['transparent', 'opaque', 'auto'] | None = None,
        input_fidelity: Literal['high', 'low'] | None = None,
        moderation: Literal['auto', 'low'] | None = None,
        image_model: ImageGenerationModelName | None = None,
        output_compression: int | None = None,
        output_format: Literal['png', 'webp', 'jpeg'] | None = None,
        quality: Literal['low', 'medium', 'high', 'auto'] | None = None,
        size: Literal['auto', '1024x1024', '1024x1536', '1536x1024', '512', '1K', '2K', '4K'] | None = None,
        aspect_ratio: ImageAspectRatio | None = None,
    ) -> None:
        if fallback_model is not None and local is not None:
            raise UserError(
                'ImageGeneration: cannot specify both `fallback_model` and `local` — '
                'use `fallback_model` for the default subagent fallback, or `local` for a custom tool'
            )
        self.native = native
        self.local = local
        self.fallback_model = fallback_model
        self.action = action
        self.background = background
        self.input_fidelity = input_fidelity
        self.moderation = moderation
        self.image_model = image_model
        self.output_compression = output_compression
        self.output_format = output_format
        self.quality = quality
        self.size = size
        self.aspect_ratio = aspect_ratio
        self.__post_init__()

    def _image_gen_kwargs(self) -> dict[str, Any]:
        """Collect non-None ImageGenerationTool config fields."""
        kwargs: dict[str, Any] = {}
        if self.action is not None:
            kwargs['action'] = self.action
        if self.background is not None:
            kwargs['background'] = self.background
        if self.input_fidelity is not None:
            kwargs['input_fidelity'] = self.input_fidelity
        if self.moderation is not None:
            kwargs['moderation'] = self.moderation
        if self.image_model is not None:
            kwargs['model'] = self.image_model
        if self.output_compression is not None:
            kwargs['output_compression'] = self.output_compression
        if self.output_format is not None:
            kwargs['output_format'] = self.output_format
        if self.quality is not None:
            kwargs['quality'] = self.quality
        if self.size is not None:
            kwargs['size'] = self.size
        if self.aspect_ratio is not None:
            kwargs['aspect_ratio'] = self.aspect_ratio
        return kwargs

    def _default_native(self) -> ImageGenerationTool:
        return ImageGenerationTool(**self._image_gen_kwargs())

    def _native_unique_id(self) -> str:
        return ImageGenerationTool.kind

    def _resolved_native(self) -> ImageGenerationTool:
        """Get the ImageGenerationTool for the fallback, with capability-level overrides applied."""
        base = self.native if isinstance(self.native, ImageGenerationTool) else ImageGenerationTool()
        overrides = self._image_gen_kwargs()
        if not overrides:
            return base
        return replace(base, **overrides)

    def _default_local(self) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT] | None:
        if self.fallback_model is None:
            return None
        from pydantic_ai.common_tools.image_generation import image_generation_tool

        return image_generation_tool(model=self.fallback_model, native_tool=self._resolved_native())

fallback_model instance-attribute

fallback_model: ImageGenerationFallbackModel = (
    fallback_model
)

Model to use for image generation when the agent's model doesn't support it natively.

Must be a model that supports image generation via the ImageGenerationTool native tool. This requires a conversational model with image generation support, not a dedicated image-only API. Examples:

  • 'openai-responses:gpt-5.4' — OpenAI model with image generation support
  • 'google:gemini-3-pro-image-preview' — Google image generation model

Can be a model name string, Model instance, or a callable taking RunContext that returns a Model instance.

action instance-attribute

action: Literal['generate', 'edit', 'auto'] | None = action

Whether to generate a new image or edit an existing image.

Supported by: OpenAI Responses. Default: 'auto'.

background instance-attribute

background: (
    Literal["transparent", "opaque", "auto"] | None
) = background

Background type for the generated image.

Supported by: OpenAI Responses. 'transparent' only supported for 'png' and 'webp'.

input_fidelity instance-attribute

input_fidelity: Literal["high", "low"] | None = (
    input_fidelity
)

Input fidelity for matching style/features of input images.

Supported by: OpenAI Responses. Default: 'low'.

moderation instance-attribute

moderation: Literal['auto', 'low'] | None = moderation

Moderation level for the generated image.

Supported by: OpenAI Responses.

image_model instance-attribute

image_model: ImageGenerationModelName | None = image_model

The image generation model to use.

Supported by: OpenAI Responses.

output_compression instance-attribute

output_compression: int | None = output_compression

Compression level for the output image.

Supported by: OpenAI Responses (jpeg/webp, default: 100), Google Cloud (jpeg, default: 75).

output_format instance-attribute

output_format: Literal["png", "webp", "jpeg"] | None = (
    output_format
)

Output format of the generated image.

Supported by: OpenAI Responses (default: 'png'), Google Cloud.

quality instance-attribute

quality: Literal["low", "medium", "high", "auto"] | None = (
    quality
)

Quality of the generated image.

Supported by: OpenAI Responses.

size instance-attribute

size: (
    Literal[
        "auto",
        "1024x1024",
        "1024x1536",
        "1536x1024",
        "512",
        "1K",
        "2K",
        "4K",
    ]
    | None
) = size

Size of the generated image.

Supported by: OpenAI Responses ('auto', '1024x1024', '1024x1536', '1536x1024'), Google ('512', '1K', '2K', '4K').

aspect_ratio instance-attribute

aspect_ratio: ImageAspectRatio | None = aspect_ratio

Aspect ratio for generated images.

Supported by: Google (Gemini), OpenAI Responses (maps '1:1', '2:3', '3:2' to sizes).

IncludeToolReturnSchemas dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that includes return schemas for selected tools.

When added to an agent's capabilities, this sets include_return_schema to True on matching tool definitions, causing the model to receive return type information for those tools.

For models that natively support return schemas (e.g. Google Gemini), the schema is passed as a structured field. For other models, it is injected into the tool description as JSON text.

Per-tool overrides (Tool(..., include_return_schema=False)) take precedence — this capability only sets the flag on tools that haven't explicitly opted out.

from pydantic_ai import Agent
from pydantic_ai.capabilities import IncludeToolReturnSchemas

agent = Agent('openai:gpt-5', capabilities=[IncludeToolReturnSchemas()])
Source code in pydantic_ai_slim/pydantic_ai/capabilities/include_return_schemas.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class IncludeToolReturnSchemas(AbstractCapability[AgentDepsT]):
    """Capability that includes return schemas for selected tools.

    When added to an agent's capabilities, this sets
    [`include_return_schema`][pydantic_ai.tools.ToolDefinition.include_return_schema]
    to `True` on matching tool definitions, causing the model to receive
    return type information for those tools.

    For models that natively support return schemas (e.g. Google Gemini), the
    schema is passed as a structured field.  For other models, it is injected
    into the tool description as JSON text.

    Per-tool overrides (`Tool(..., include_return_schema=False)`) take
    precedence — this capability only sets the flag on tools that haven't
    explicitly opted out.

    ```python
    from pydantic_ai import Agent
    from pydantic_ai.capabilities import IncludeToolReturnSchemas

    agent = Agent('openai:gpt-5', capabilities=[IncludeToolReturnSchemas()])
    ```
    """

    tools: ToolSelector[AgentDepsT] = 'all'
    """Which tools should have their return schemas included.

    - `'all'` (default): every tool gets its return schema included.
    - `Sequence[str]`: only tools whose names are listed.
    - `dict[str, Any]`: matches tools whose metadata deeply includes the specified key-value pairs.
    - Callable `(ctx, tool_def) -> bool`: custom sync or async predicate.
    """

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return 'IncludeToolReturnSchemas'

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
        selector = self.tools

        async def _include_return_schemas(
            ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]
        ) -> list[ToolDefinition]:
            resolved: list[ToolDefinition] = []
            for td in tool_defs:
                # Only set the flag on tools that haven't explicitly opted in or out
                if td.include_return_schema is None and await matches_tool_selector(selector, ctx, td):
                    td = replace(td, include_return_schema=True)
                resolved.append(td)
            return resolved

        return PreparedToolset(toolset, _include_return_schemas)

tools class-attribute instance-attribute

tools: ToolSelector[AgentDepsT] = 'all'

Which tools should have their return schemas included.

  • 'all' (default): every tool gets its return schema included.
  • Sequence[str]: only tools whose names are listed.
  • dict[str, Any]: matches tools whose metadata deeply includes the specified key-value pairs.
  • Callable (ctx, tool_def) -> bool: custom sync or async predicate.

Instrumentation dataclass

Bases: AbstractCapability[Any]

Capability that instruments agent runs with OpenTelemetry/Logfire tracing.

When added to an agent via capabilities=[Instrumentation(...)], this capability creates OpenTelemetry spans for the agent run, model requests, and tool executions.

Other capabilities can add attributes to these spans using the OpenTelemetry API (opentelemetry.trace.get_current_span().set_attribute(key, value)).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/instrumentation.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
@dataclass
class Instrumentation(AbstractCapability[Any]):
    """Capability that instruments agent runs with OpenTelemetry/Logfire tracing.

    When added to an agent via `capabilities=[Instrumentation(...)]`, this capability
    creates OpenTelemetry spans for the agent run, model requests, and tool executions.

    Other capabilities can add attributes to these spans using the OpenTelemetry API
    (`opentelemetry.trace.get_current_span().set_attribute(key, value)`).
    """

    settings: InstrumentationSettings = field(default_factory=lambda: _default_settings())
    """OTel/Logfire instrumentation settings. Defaults to `InstrumentationSettings()`,
    which uses the global `TracerProvider` (typically configured by `logfire.configure()`)."""

    # Per-run state (set in `for_run`, mutated by `wrap_model_request`). `for_run`
    # returns a shallow copy via `replace(self)` for per-run isolation. These fields
    # are updated as the run progresses and assume sequential model requests within
    # a run — if the agent loop ever issues concurrent model requests, accesses to
    # these fields would race.
    _agent_name: str = field(default='agent', repr=False, init=False)
    _new_message_index: int = field(default=0, repr=False, init=False)
    _last_messages: list[ModelMessage] | None = field(default=None, repr=False, init=False)
    _last_model_request_parameters: ModelRequestParameters | None = field(default=None, repr=False, init=False)
    # Resolved once from `self.settings.version` in `__post_init__` and preserved across
    # `dataclasses.replace` calls in `for_run` (which only touches init=True fields).
    _instrumentation_names: InstrumentationNames = field(
        default_factory=lambda: InstrumentationNames.for_version(DEFAULT_INSTRUMENTATION_VERSION),
        repr=False,
        init=False,
    )

    def __post_init__(self) -> None:
        self._instrumentation_names = InstrumentationNames.for_version(self.settings.version)

    def get_ordering(self) -> CapabilityOrdering:
        return CapabilityOrdering(position='outermost')

    @classmethod
    def from_spec(cls, **kwargs: Any) -> Instrumentation:
        """Build an `Instrumentation` capability from a YAML/JSON spec.

        Accepts the serializable subset of [`InstrumentationSettings`][pydantic_ai.models.instrumented.InstrumentationSettings]
        kwargs (`include_binary_content`, `include_content`, `version`,
        `use_aggregated_usage_attribute_names`). The OTel `tracer_provider` and `meter_provider`
        fields can't be expressed in YAML and default to the global providers (typically configured
        via `logfire.configure()`).

        YAML form:

            capabilities:
              - Instrumentation: {}                # default settings
              - Instrumentation:
                  version: 2
                  include_content: false
        """
        from pydantic_ai.models.instrumented import InstrumentationSettings

        return cls(settings=InstrumentationSettings(**kwargs))

    async def for_run(self, ctx: RunContext[Any]) -> Instrumentation:
        """Return a fresh copy for per-run state isolation."""
        inst = replace(self)
        inst._agent_name = (ctx.agent.name if ctx.agent else None) or 'agent'
        inst._new_message_index = len(ctx.messages)
        return inst

    # ------------------------------------------------------------------
    # wrap_run — agent run span
    # ------------------------------------------------------------------

    async def wrap_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        handler: WrapRunHandler,
    ) -> AgentRunResult[Any]:
        settings = self.settings
        names = self._instrumentation_names
        agent_name = self._agent_name

        span_attributes: dict[str, Any] = {
            'model_name': ctx.model.model_name if ctx.model else 'no-model',
            'agent_name': agent_name,
            'gen_ai.agent.name': agent_name,
            'gen_ai.agent.call.id': ctx.run_id or '',
            'gen_ai.conversation.id': ctx.conversation_id or '',
            'gen_ai.operation.name': 'invoke_agent',
            'logfire.msg': f'{agent_name} run',
        }

        if ctx.agent is not None:  # pragma: no branch
            rendered = ctx.agent.render_description(ctx.deps)
            if rendered is not None:
                span_attributes['gen_ai.agent.description'] = rendered

        with settings.tracer.start_as_current_span(
            names.get_agent_run_span_name(agent_name),
            attributes=span_attributes,
        ) as span:
            otel_ctx = _otel_set_baggage('gen_ai.agent.name', agent_name)
            otel_ctx = _otel_set_baggage('gen_ai.agent.call.id', ctx.run_id or '', context=otel_ctx)
            otel_ctx = _otel_set_baggage('gen_ai.conversation.id', ctx.conversation_id or '', context=otel_ctx)
            token = _otel_attach(otel_ctx)
            result: AgentRunResult[Any] | None = None
            try:
                result = await handler()

                if settings.include_content and span.is_recording():
                    span.set_attribute(
                        'final_result',
                        (
                            result.output
                            if isinstance(result.output, str)
                            else to_json(serialize_any(result.output)).decode()
                        ),
                    )

                return result
            finally:
                _otel_detach(token)
                if span.is_recording():
                    # Get current messages and metadata from the result (which holds the up-to-date state).
                    # ctx.messages/ctx.metadata may be stale because the run state is mutated during execution.
                    if result is not None:
                        message_history = result.all_messages()
                        metadata = result.metadata
                    else:
                        # On error, use the last messages seen during model requests.
                        message_history = self._last_messages or ctx.messages
                        metadata = ctx.metadata
                    span.set_attributes(self._run_span_end_attributes(ctx, message_history, metadata))

    def _run_span_end_attributes(
        self,
        ctx: RunContext[Any],
        message_history: list[ModelMessage],
        metadata: dict[str, Any] | None,
    ) -> dict[str, str | int | float | bool]:
        """Compute the end-of-run span attributes."""
        settings = self.settings
        new_message_index = self._new_message_index

        last_instructions = get_instructions(message_history, self._last_model_request_parameters)
        attrs: dict[str, Any] = {
            'pydantic_ai.all_messages': to_json(settings.messages_to_otel_messages(list(message_history))).decode(),
            **settings.system_instructions_attributes(last_instructions),
        }

        if new_message_index > 0:
            attrs['pydantic_ai.new_message_index'] = new_message_index

        if any(
            (isinstance(m, ModelRequest) and m.instructions is not None and m.instructions != last_instructions)
            for m in message_history[new_message_index:]
        ):
            attrs['pydantic_ai.variable_instructions'] = True

        if metadata is not None:
            attrs['metadata'] = to_json(serialize_any(metadata)).decode()

        usage_attrs = (
            {
                k.replace('gen_ai.usage.', 'gen_ai.aggregated_usage.', 1): v
                for k, v in ctx.usage.opentelemetry_attributes().items()
            }
            if settings.use_aggregated_usage_attribute_names
            else ctx.usage.opentelemetry_attributes()
        )

        return {
            **usage_attrs,
            **attrs,
            'logfire.json_schema': to_json(
                {
                    'type': 'object',
                    'properties': {
                        **{k: {'type': 'array'} if isinstance(v, str) else {} for k, v in attrs.items()},
                        'final_result': {'type': 'object'},
                    },
                }
            ).decode(),
        }

    # ------------------------------------------------------------------
    # wrap_model_request — model request span
    # ------------------------------------------------------------------

    async def wrap_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        handler: WrapModelRequestHandler,
    ) -> ModelResponse:
        # Track the latest messages so _run_span_end_attributes has them on error paths
        # (ctx.messages may be stale because UserPromptNode replaces the list reference).
        self._last_messages = request_context.messages

        with open_model_request_span(self.settings, request_context) as (finish, prepared_request_context):
            # Stash for `_run_span_end_attributes`: feeding the parameters into
            # `get_instructions` lets it use the canonical `instruction_parts` source
            # (which includes prompted-output template instructions and is properly sorted)
            # instead of falling back to reading `ModelRequest.instructions` from history.
            self._last_model_request_parameters = prepared_request_context.model_request_parameters

            response = await handler(request_context)
            finish(response)
            return response

    # ------------------------------------------------------------------
    # wrap_tool_execute — tool execution span
    # ------------------------------------------------------------------

    def _tool_span_attributes(self, call: ToolCallPart) -> dict[str, Any]:
        """Build the span attributes shared by `wrap_tool_execute` and `wrap_output_process`.

        Both spans use `gen_ai.operation.name='execute_tool'` and the same `gen_ai.tool.*`
        attributes — they only differ in how the result is serialized and which exceptions
        are special-cased, which stays in the call-site `try/except`.
        """
        names = self._instrumentation_names
        include_content = self.settings.include_content
        return {
            'gen_ai.operation.name': 'execute_tool',
            'gen_ai.tool.name': call.tool_name,
            'gen_ai.tool.call.id': call.tool_call_id,
            **({names.tool_arguments_attr: call.args_as_json_str()} if include_content else {}),
            **get_agent_run_baggage_attributes(),
            'logfire.msg': f'running tool: {call.tool_name}',
            'logfire.json_schema': to_json(
                {
                    'type': 'object',
                    'properties': {
                        **(
                            {
                                names.tool_arguments_attr: {'type': 'object'},
                                names.tool_result_attr: {'type': 'object'},
                            }
                            if include_content
                            else {}
                        ),
                        'gen_ai.tool.name': {},
                        'gen_ai.tool.call.id': {},
                    },
                }
            ).decode(),
        }

    async def _run_tool_span(
        self,
        *,
        span_name: str,
        attributes: dict[str, Any],
        action: Callable[[], Awaitable[Any]],
        serialize_result: Callable[[Any], str],
        handle_tool_control_flow: bool = False,
    ) -> Any:
        """Open a `gen_ai`-flavoured tool/output span around `action`.

        Records the serialized result on success (when `include_content` is enabled and
        the span is recording), records the exception and sets status `ERROR` on failure.

        When `handle_tool_control_flow` is True, the helper additionally special-cases
        `CallDeferred`/`ApprovalRequired` (deferrals are control flow, not errors) and
        records `ToolRetryError`'s retry prompt as the tool result before re-raising.
        Output-function spans leave that flag off — `ToolRetryError` is treated as a
        plain error there because the retry prompt is recorded on the surrounding
        request/agent spans, and `CallDeferred`/`ApprovalRequired` never reach output
        processing.
        """
        settings = self.settings
        names = self._instrumentation_names
        include_content = settings.include_content

        with settings.tracer.start_as_current_span(
            span_name,
            attributes=attributes,
            record_exception=False,
            set_status_on_exception=False,
        ) as span:
            try:
                result = await action()
            except (CallDeferred, ApprovalRequired) as exc:
                if not handle_tool_control_flow:
                    span.record_exception(exc, escaped=True)
                    span.set_status(StatusCode.ERROR)
                    raise
                # Deferrals are control flow, not errors: capture the deferral name (and
                # metadata when available) as span attributes, and only mark the span
                # ERROR for older instrumentation versions that expected that shape.
                span.set_attribute(names.tool_deferral_name_attr, type(exc).__name__)
                if include_content and span.is_recording() and exc.metadata is not None:
                    try:
                        metadata_str = to_json(exc.metadata).decode()
                    except (TypeError, ValueError):
                        metadata_str = repr(exc.metadata)
                    span.set_attribute(names.tool_deferral_metadata_attr, metadata_str)
                if settings.version < 5:
                    span.record_exception(exc, escaped=True)
                    span.set_status(StatusCode.ERROR)
                raise
            except ToolRetryError as e:
                if handle_tool_control_flow and include_content and span.is_recording():
                    # Tool retries are surfaced as model-visible errors; record the prompt
                    # the model will see as the tool result before re-raising.
                    span.set_attribute(names.tool_result_attr, e.tool_retry.model_response())
                span.record_exception(e, escaped=True)
                span.set_status(StatusCode.ERROR)
                raise
            except BaseException as e:
                span.record_exception(e, escaped=True)
                span.set_status(StatusCode.ERROR)
                raise

            if include_content and span.is_recording():
                span.set_attribute(
                    names.tool_result_attr,
                    result if isinstance(result, str) else serialize_result(result),
                )

        return result

    async def wrap_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        handler: WrapToolExecuteHandler,
    ) -> Any:
        return await self._run_tool_span(
            span_name=self._instrumentation_names.get_tool_span_name(call.tool_name),
            attributes=self._tool_span_attributes(call),
            action=lambda: handler(args),
            serialize_result=lambda value: tool_return_ta.dump_json(value).decode(),
            handle_tool_control_flow=True,
        )

    # ------------------------------------------------------------------
    # wrap_output_process — output tool execution span (tool-mode only)
    # ------------------------------------------------------------------

    async def wrap_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        handler: WrapOutputProcessHandler,
    ) -> Any:
        """Emit a span for output-function execution.

        Output processing for plain validation (no function) is not span-worthy — the
        validated value is the model's response itself, no user code ran. We open a
        span only when an output function will execute, regardless of whether the
        output arrived via a tool call. The span name reflects the function (or tool
        name when the function name is unavailable, e.g. union processors).
        """
        if not output_context.has_function:
            return await handler(output)

        names = self._instrumentation_names
        include_content = self.settings.include_content
        tool_call = output_context.tool_call
        # Tool-mode output: the registered tool name (e.g. `final_result`) is what the
        # model called, so use it as the span target. For non-tool output, fall back to
        # the function name (when known) or a generic placeholder.
        span_target = tool_call.tool_name if tool_call else (output_context.function_name or 'output_function')

        attributes: dict[str, Any] = {
            'gen_ai.operation.name': 'execute_tool',
            'gen_ai.tool.name': span_target,
            **get_agent_run_baggage_attributes(),
            'logfire.msg': f'running output function: {span_target}',
        }
        if tool_call is not None and tool_call.tool_call_id:
            attributes['gen_ai.tool.call.id'] = tool_call.tool_call_id
        if include_content:
            attributes[names.tool_arguments_attr] = to_json(output).decode()

        attributes['logfire.json_schema'] = to_json(
            {
                'type': 'object',
                'properties': {
                    **(
                        {
                            names.tool_arguments_attr: {'type': 'object'},
                            names.tool_result_attr: {'type': 'object'},
                        }
                        if include_content
                        else {}
                    ),
                    'gen_ai.tool.name': {},
                    **({'gen_ai.tool.call.id': {}} if tool_call is not None and tool_call.tool_call_id else {}),
                },
            }
        ).decode()

        return await self._run_tool_span(
            span_name=names.get_output_tool_span_name(span_target),
            attributes=attributes,
            action=lambda: handler(output),
            serialize_result=lambda value: to_json(serialize_any(value)).decode(),
        )

settings class-attribute instance-attribute

settings: InstrumentationSettings = field(
    default_factory=lambda: _default_settings()
)

OTel/Logfire instrumentation settings. Defaults to InstrumentationSettings(), which uses the global TracerProvider (typically configured by logfire.configure()).

from_spec classmethod

from_spec(**kwargs: Any) -> Instrumentation

Build an Instrumentation capability from a YAML/JSON spec.

Accepts the serializable subset of InstrumentationSettings kwargs (include_binary_content, include_content, version, use_aggregated_usage_attribute_names). The OTel tracer_provider and meter_provider fields can't be expressed in YAML and default to the global providers (typically configured via logfire.configure()).

YAML form:

capabilities:
  - Instrumentation: {}                # default settings
  - Instrumentation:
      version: 2
      include_content: false
Source code in pydantic_ai_slim/pydantic_ai/capabilities/instrumentation.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def from_spec(cls, **kwargs: Any) -> Instrumentation:
    """Build an `Instrumentation` capability from a YAML/JSON spec.

    Accepts the serializable subset of [`InstrumentationSettings`][pydantic_ai.models.instrumented.InstrumentationSettings]
    kwargs (`include_binary_content`, `include_content`, `version`,
    `use_aggregated_usage_attribute_names`). The OTel `tracer_provider` and `meter_provider`
    fields can't be expressed in YAML and default to the global providers (typically configured
    via `logfire.configure()`).

    YAML form:

        capabilities:
          - Instrumentation: {}                # default settings
          - Instrumentation:
              version: 2
              include_content: false
    """
    from pydantic_ai.models.instrumented import InstrumentationSettings

    return cls(settings=InstrumentationSettings(**kwargs))

for_run async

for_run(ctx: RunContext[Any]) -> Instrumentation

Return a fresh copy for per-run state isolation.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/instrumentation.py
112
113
114
115
116
117
async def for_run(self, ctx: RunContext[Any]) -> Instrumentation:
    """Return a fresh copy for per-run state isolation."""
    inst = replace(self)
    inst._agent_name = (ctx.agent.name if ctx.agent else None) or 'agent'
    inst._new_message_index = len(ctx.messages)
    return inst

wrap_output_process async

wrap_output_process(
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    handler: WrapOutputProcessHandler
) -> Any

Emit a span for output-function execution.

Output processing for plain validation (no function) is not span-worthy — the validated value is the model's response itself, no user code ran. We open a span only when an output function will execute, regardless of whether the output arrived via a tool call. The span name reflects the function (or tool name when the function name is unavailable, e.g. union processors).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/instrumentation.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
async def wrap_output_process(
    self,
    ctx: RunContext[AgentDepsT],
    *,
    output_context: OutputContext,
    output: Any,
    handler: WrapOutputProcessHandler,
) -> Any:
    """Emit a span for output-function execution.

    Output processing for plain validation (no function) is not span-worthy — the
    validated value is the model's response itself, no user code ran. We open a
    span only when an output function will execute, regardless of whether the
    output arrived via a tool call. The span name reflects the function (or tool
    name when the function name is unavailable, e.g. union processors).
    """
    if not output_context.has_function:
        return await handler(output)

    names = self._instrumentation_names
    include_content = self.settings.include_content
    tool_call = output_context.tool_call
    # Tool-mode output: the registered tool name (e.g. `final_result`) is what the
    # model called, so use it as the span target. For non-tool output, fall back to
    # the function name (when known) or a generic placeholder.
    span_target = tool_call.tool_name if tool_call else (output_context.function_name or 'output_function')

    attributes: dict[str, Any] = {
        'gen_ai.operation.name': 'execute_tool',
        'gen_ai.tool.name': span_target,
        **get_agent_run_baggage_attributes(),
        'logfire.msg': f'running output function: {span_target}',
    }
    if tool_call is not None and tool_call.tool_call_id:
        attributes['gen_ai.tool.call.id'] = tool_call.tool_call_id
    if include_content:
        attributes[names.tool_arguments_attr] = to_json(output).decode()

    attributes['logfire.json_schema'] = to_json(
        {
            'type': 'object',
            'properties': {
                **(
                    {
                        names.tool_arguments_attr: {'type': 'object'},
                        names.tool_result_attr: {'type': 'object'},
                    }
                    if include_content
                    else {}
                ),
                'gen_ai.tool.name': {},
                **({'gen_ai.tool.call.id': {}} if tool_call is not None and tool_call.tool_call_id else {}),
            },
        }
    ).decode()

    return await self._run_tool_span(
        span_name=names.get_output_tool_span_name(span_target),
        attributes=attributes,
        action=lambda: handler(output),
        serialize_result=lambda value: to_json(serialize_any(value)).decode(),
    )

MCP dataclass

Bases: NativeOrLocalTool[AgentDepsT]

MCP server capability.

The primary entry point for using MCP servers with Pydantic AI. Runs the MCP server locally — keeps credentials, hooks, and tracing under your control — and accepts any MCPToolset input (URL, fastmcp.Client, transport, in-process FastMCP server, script path, etc.) directly via local=.

Pass url= for HTTP-based servers; the same URL can also be advertised to providers that support native MCP via native=True. For non-URL local clients, omit url= and pass the client/toolset as local=. Pass native=True, local=False for strict native-only (no local at all — works without the mcp extra).

Source code in pydantic_ai_slim/pydantic_ai/capabilities/mcp.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@dataclass(init=False)
class MCP(NativeOrLocalTool[AgentDepsT]):
    """MCP server capability.

    The primary entry point for using MCP servers with Pydantic AI. Runs the MCP server
    locally — keeps credentials, hooks, and tracing under your control — and accepts any
    [`MCPToolset`][pydantic_ai.mcp.MCPToolset] input (URL, `fastmcp.Client`, transport,
    in-process `FastMCP` server, script path, etc.) directly via `local=`.

    Pass `url=` for HTTP-based servers; the same URL can also be advertised to providers
    that support native MCP via `native=True`. For non-URL local clients, omit `url=` and
    pass the client/toolset as `local=`. Pass `native=True, local=False` for strict
    native-only (no local at all — works without the `mcp` extra).
    """

    url: str | None
    """The URL of the MCP server.

    Required when using native MCP. Optional when using a local-only client via `local=`."""

    id: str | None
    """Unique identifier for the MCP server. Defaults to a slug derived from the URL."""

    authorization_token: str | None
    """Authorization header value for MCP server requests. Passed to both native and local."""

    headers: dict[str, str] | None
    """HTTP headers for MCP server requests. Passed to both native and local."""

    allowed_tools: list[str] | None
    """Filter to only these tools. Applied to both native and local."""

    description: str | None
    """Description of the MCP server. Native-only; ignored by local tools."""

    def __init__(
        self,
        url: str | None = None,
        *,
        native: MCPServerTool
        | Callable[[RunContext[AgentDepsT]], Awaitable[MCPServerTool | None] | MCPServerTool | None]
        | bool = False,
        local: MCPToolsetClient | MCPToolset[AgentDepsT] | Callable[..., Any] | bool | None = None,
        id: str | None = None,
        authorization_token: str | None = None,
        headers: dict[str, str] | None = None,
        allowed_tools: list[str] | None = None,
        description: str | None = None,
    ) -> None:
        # Native MCP requires a URL only when the capability auto-constructs an `MCPServerTool`
        # (i.e. `native=True`). Explicit `MCPServerTool(...)` instances and per-run callables
        # carry their own URL, so the capability's `url=` isn't needed in those cases.
        if url is None and native is True:
            raise UserError(
                'MCP(native=True) requires `url=` — native MCP needs a URL to give the model. '
                "Pass `url='https://…'`, pass an explicit `native=MCPServerTool(url='…', …)` "
                'instance, or for local-only use leave `native` at its default of `False` and '
                'pass `local=` (e.g. an `MCPToolset`, `fastmcp.Client`, transport, in-process '
                '`FastMCP` server, or script path).'
            )

        self.url = url
        self.native = native
        # Non-string runtime `local=` inputs the base class doesn't recognize (Path, transport,
        # FastMCP server, pre-built `fastmcp.Client`, `AnyUrl`, etc.) are wrapped into an
        # `MCPToolset` here. Strings flow through `_resolve_local_strategy` below; pre-built
        # toolsets, callables, bools, and `None` pass through to `NativeOrLocalTool` unchanged.
        # Reaching this branch implies a fastmcp-typed object, which can only exist when the `mcp`
        # extra (and hence fastmcp) is installed; the module-level `MCPToolset` is the real class.
        if (
            local is not None
            and not isinstance(local, (bool, str))
            and not isinstance(local, AbstractToolset)
            and not callable(local)
        ):
            local = MCPToolset(local, include_instructions=True)
        self.local = local
        self.id = id
        self.authorization_token = authorization_token
        self.headers = headers
        self.allowed_tools = allowed_tools
        self.description = description
        self.__post_init__()

    @cached_property
    def _resolved_id(self) -> str:
        if self.id:
            return self.id
        # `_resolved_id` is only read through native paths, which require a URL (enforced in `__init__`).
        assert self.url is not None
        # Include hostname to avoid collisions (e.g. two /sse URLs on different hosts)
        parsed = urlparse(self.url)
        path = parsed.path.rstrip('/')
        slug = path.split('/')[-1] if path else ''
        host = parsed.hostname or ''
        return f'{host}-{slug}' if slug else host or self.url

    def _default_native(self) -> MCPServerTool:
        # `native is True` requires `url is not None` (enforced in `__init__`).
        assert self.url is not None
        return MCPServerTool(
            id=self._resolved_id,
            url=self.url,
            authorization_token=self.authorization_token,
            headers=self.headers,
            allowed_tools=self.allowed_tools,
            description=self.description,
        )

    def _native_unique_id(self) -> str:
        return f'mcp_server:{self._resolved_id}'

    def _default_local(self) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT] | None:
        if self.url is None:
            # No URL → no way to derive a default local; the user must have passed `local=` directly.
            return None
        return self._build_local(self.url)

    def _resolve_local_strategy(self, name: str | bool) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT]:
        # MCP has no named string strategies. `local=True` uses the URL from `MCP(url=...)`; a
        # string is treated as an override URL and validated to match — we only accept actual URLs
        # here so the same value can roundtrip through `from_spec`/`AgentSpec` and be served as a
        # native MCP tool by models that support it. Local-only inputs that aren't URLs (script
        # paths, `fastmcp.Client` instances, etc.) must be passed as `local=MCPToolset(...)` instead.
        if isinstance(name, str):
            _require_url(name)
            return self._build_local(name)
        if self.url is None:
            raise UserError(
                'MCP(local=True) requires `url=` to derive the local transport from. '
                "Pass `url='https://…'`, or pass a concrete local client/toolset as `local=`."
            )
        return self._build_local(self.url)

    def _build_local(self, url: str) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT]:
        # Merge authorization_token into headers for local connection.
        local_headers = dict(self.headers or {})
        if self.authorization_token:
            local_headers['Authorization'] = self.authorization_token

        try:
            # `MCPToolset` infers SSE vs Streamable HTTP from the URL.
            from pydantic_ai.mcp import MCPToolset

            return MCPToolset(url, headers=local_headers or None, include_instructions=True)
        except ImportError as e:
            raise UserError(
                'Please install the `mcp` package to run MCP servers locally, you can use the '
                '`mcp` optional group — `pip install "pydantic-ai-slim[mcp]"`. '
                'For native-only MCP (no local — no extra needed), pass '
                "`MCP(url='…', native=True, local=False)`."
            ) from e

    def get_toolset(self) -> AbstractToolset[AgentDepsT] | None:
        toolset = super().get_toolset()
        if toolset is not None and self.allowed_tools is not None:
            allowed = set(self.allowed_tools)
            return toolset.filtered(lambda _ctx, tool_def: tool_def.name in allowed)
        return toolset

    @classmethod
    def from_spec(
        cls,
        url: str,
        *,
        native: MCPServerTool | bool = False,
        local: str | bool | None = None,
        id: str | None = None,
        authorization_token: str | None = None,
        headers: dict[str, str] | None = None,
        allowed_tools: list[str] | None = None,
        description: str | None = None,
    ) -> MCP[AgentDepsT]:
        """Construct an `MCP` capability from spec-serializable args.

        Restricts the runtime-wide `local=` union to the JSON/YAML-serializable subset
        (`str | bool | None`) so `AgentSpec` schema generation works, and requires `url=` (which
        is optional at runtime when `local=` is a concrete non-URL client). Non-serializable
        runtime values like `fastmcp.Client`, `ClientTransport`, or pre-built `MCPToolset`
        instances can still be passed to `MCP(...)` directly — they just can't roundtrip through
        a spec file.
        """
        return cls(
            url,
            native=native,
            local=local,
            id=id,
            authorization_token=authorization_token,
            headers=headers,
            allowed_tools=allowed_tools,
            description=description,
        )

url instance-attribute

url: str | None = url

The URL of the MCP server.

Required when using native MCP. Optional when using a local-only client via local=.

id instance-attribute

id: str | None = id

Unique identifier for the MCP server. Defaults to a slug derived from the URL.

authorization_token instance-attribute

authorization_token: str | None = authorization_token

Authorization header value for MCP server requests. Passed to both native and local.

headers instance-attribute

headers: dict[str, str] | None = headers

HTTP headers for MCP server requests. Passed to both native and local.

allowed_tools instance-attribute

allowed_tools: list[str] | None = allowed_tools

Filter to only these tools. Applied to both native and local.

description instance-attribute

description: str | None = description

Description of the MCP server. Native-only; ignored by local tools.

from_spec classmethod

from_spec(
    url: str,
    *,
    native: MCPServerTool | bool = False,
    local: str | bool | None = None,
    id: str | None = None,
    authorization_token: str | None = None,
    headers: dict[str, str] | None = None,
    allowed_tools: list[str] | None = None,
    description: str | None = None
) -> MCP[AgentDepsT]

Construct an MCP capability from spec-serializable args.

Restricts the runtime-wide local= union to the JSON/YAML-serializable subset (str | bool | None) so AgentSpec schema generation works, and requires url= (which is optional at runtime when local= is a concrete non-URL client). Non-serializable runtime values like fastmcp.Client, ClientTransport, or pre-built MCPToolset instances can still be passed to MCP(...) directly — they just can't roundtrip through a spec file.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/mcp.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@classmethod
def from_spec(
    cls,
    url: str,
    *,
    native: MCPServerTool | bool = False,
    local: str | bool | None = None,
    id: str | None = None,
    authorization_token: str | None = None,
    headers: dict[str, str] | None = None,
    allowed_tools: list[str] | None = None,
    description: str | None = None,
) -> MCP[AgentDepsT]:
    """Construct an `MCP` capability from spec-serializable args.

    Restricts the runtime-wide `local=` union to the JSON/YAML-serializable subset
    (`str | bool | None`) so `AgentSpec` schema generation works, and requires `url=` (which
    is optional at runtime when `local=` is a concrete non-URL client). Non-serializable
    runtime values like `fastmcp.Client`, `ClientTransport`, or pre-built `MCPToolset`
    instances can still be passed to `MCP(...)` directly — they just can't roundtrip through
    a spec file.
    """
    return cls(
        url,
        native=native,
        local=local,
        id=id,
        authorization_token=authorization_token,
        headers=headers,
        allowed_tools=allowed_tools,
        description=description,
    )

NativeOrLocalTool dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that pairs a provider-native tool with a local fallback.

When the model supports the native tool, the local fallback is removed. When the model doesn't support the native tool, it is removed and the local tool stays.

Can be used directly:

from pydantic_ai.capabilities import NativeOrLocalTool

cap = NativeOrLocalTool(native=WebSearchTool(), local=my_search_func)

Or subclassed to set defaults by overriding _default_native, _default_local, and _requires_native. The built-in WebSearch, WebFetch, and ImageGeneration capabilities are all subclasses.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/native_or_local.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass(init=False)
class NativeOrLocalTool(AbstractCapability[AgentDepsT]):
    """Capability that pairs a provider-native tool with a local fallback.

    When the model supports the native tool, the local fallback is removed.
    When the model doesn't support the native tool, it is removed and the local tool stays.

    Can be used directly:

    ```python {test="skip" lint="skip"}
    from pydantic_ai.capabilities import NativeOrLocalTool

    cap = NativeOrLocalTool(native=WebSearchTool(), local=my_search_func)
    ```

    Or subclassed to set defaults by overriding `_default_native`, `_default_local`,
    and `_requires_native`.
    The built-in [`WebSearch`][pydantic_ai.capabilities.WebSearch],
    [`WebFetch`][pydantic_ai.capabilities.WebFetch], and
    [`ImageGeneration`][pydantic_ai.capabilities.ImageGeneration] capabilities
    are all subclasses.
    """

    native: AgentNativeTool[AgentDepsT] | bool = True
    """Configure the provider-native tool.

    - `True` (default): use the default native tool configuration (subclasses only).
    - `False`: disable the native tool; always use the local tool.
    - An `AbstractNativeTool` instance: use this specific configuration.
    - A callable (`NativeToolFunc`): dynamically create the native tool per-run via `RunContext`.
    """

    local: str | Tool[AgentDepsT] | Callable[..., Any] | AbstractToolset[AgentDepsT] | bool | None = None
    """Configure the local fallback tool.

    - `None` (default): auto-detect a local fallback via `_default_local`.
    - `True`: opt in to the default local fallback (resolved via `_resolve_local_strategy`).
    - `False`: disable the local fallback; only use the native tool.
    - A named strategy (e.g. `'duckduckgo'`): resolved via `_resolve_local_strategy` in subclasses.
    - A `Tool` or `AbstractToolset` instance: use this specific local tool.
    - A bare callable: automatically wrapped in a `Tool`.
    """

    def __init__(
        self,
        *,
        native: AgentNativeTool[AgentDepsT] | bool = True,
        local: str | Tool[AgentDepsT] | Callable[..., Any] | AbstractToolset[AgentDepsT] | bool | None = None,
    ) -> None:
        self.native = native
        self.local = local
        self.__post_init__()

    def __post_init__(self) -> None:
        if self.native is False and self.local is False:
            raise UserError(f'{type(self).__name__}: both `native` and `local` cannot be False')

        # Resolve native=True → default instance (subclass hook)
        if self.native is True:
            default = self._default_native()
            if default is None:
                raise UserError(
                    f'{type(self).__name__}: native=True requires a subclass that overrides '
                    f'`_default_native()`, or pass an `AbstractNativeTool` instance directly'
                )
            self.native = default

        # Resolve local: None → default, True/str → named strategy, callable → Tool
        if self.local is None:
            self.local = self._default_local()
        elif self.local is True or isinstance(self.local, str):
            self.local = self._resolve_local_strategy(self.local)
        elif self.local is False:
            pass
        elif callable(self.local) and not isinstance(self.local, (Tool, AbstractToolset)):
            self.local = Tool(self.local)

        # Catch contradictory config: native disabled but constraint fields require it.
        # Checked first because adding `local=` can't fix it — the user needs to either drop
        # the constraint or re-enable native.
        if self.native is False and self._requires_native():
            raise UserError(f'{type(self).__name__}: constraint fields require the native tool, but native=False')

        # Disallow `native=False` without an explicit local — would produce a silent no-op capability.
        if self.native is False and self.local is None:  # pyright: ignore[reportUnknownMemberType]
            raise UserError(
                f'{type(self).__name__}(native=False) requires an explicit local tool — '
                'pass `local=...` (e.g. a strategy string, `True`, a callable, or a `Tool`/`AbstractToolset`).'
            )

    # --- Subclass hooks (not abstract — direct use is supported) ---

    def _default_native(self) -> AbstractNativeTool | None:
        """Create the default native tool instance.

        Override in subclasses. Returns None by default (direct use requires
        passing an explicit `AbstractNativeTool` instance as `native`).
        """
        return None

    def _native_unique_id(self) -> str:
        """The unique_id used for `unless_native` on local tool definitions.

        By default, derived from the native tool's `unique_id` property.
        Override in subclasses for custom behavior.
        """
        native = self.native
        if isinstance(native, AbstractNativeTool):
            return native.unique_id
        raise UserError(
            f'{type(self).__name__}: cannot derive native unique_id — override `_native_unique_id()` in your subclass'
        )

    def _default_local(self) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT] | None:
        """Auto-detect a local fallback. Override in subclasses that have one."""
        return None

    def _resolve_local_strategy(self, name: str | bool) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT]:
        """Resolve a named local strategy (e.g. `'duckduckgo'`) or `local=True` to a concrete tool.

        Override in subclasses that expose named strategies. The default implementation raises
        `UserError`.
        """
        raise UserError(
            f'{type(self).__name__}: `local={name!r}` is not supported. '
            'Pass a `Tool`, `AbstractToolset`, or callable directly.'
        )

    def _requires_native(self) -> bool:
        """Return True if capability-level constraint fields require the native tool.

        When True, the local fallback is suppressed. If the model doesn't support
        the native tool, `UserError` is raised — preventing silent constraint violation.

        Override in subclasses that expose native-only constraint fields
        (e.g. `allowed_domains`, `blocked_domains`).
        """
        return False

    # --- Shared logic ---

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        if self.native is False:
            return []
        # After __post_init__, native=True is resolved to an AbstractNativeTool instance
        assert not isinstance(self.native, bool)
        return [self.native]

    def get_toolset(self) -> AbstractToolset[AgentDepsT] | None:
        local = self.local
        if local is None or local is False or self._requires_native():
            return None

        # local is Tool | AbstractToolset after __post_init__ resolution
        toolset: AbstractToolset[AgentDepsT] = (
            cast(AbstractToolset[AgentDepsT], local)
            if isinstance(local, AbstractToolset)
            else FunctionToolset([cast(Tool[AgentDepsT], local)])
        )

        if self.native is not False:
            uid = self._native_unique_id()

            async def _add_unless_native(
                ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]
            ) -> list[ToolDefinition]:
                return [replace(d, unless_native=uid) for d in tool_defs]

            return PreparedToolset(wrapped=toolset, prepare_func=_add_unless_native)
        return toolset

native class-attribute instance-attribute

native: AgentNativeTool[AgentDepsT] | bool = native

Configure the provider-native tool.

  • True (default): use the default native tool configuration (subclasses only).
  • False: disable the native tool; always use the local tool.
  • An AbstractNativeTool instance: use this specific configuration.
  • A callable (NativeToolFunc): dynamically create the native tool per-run via RunContext.

local class-attribute instance-attribute

local: (
    str
    | Tool[AgentDepsT]
    | Callable[..., Any]
    | AbstractToolset[AgentDepsT]
    | bool
    | None
) = local

Configure the local fallback tool.

  • None (default): auto-detect a local fallback via _default_local.
  • True: opt in to the default local fallback (resolved via _resolve_local_strategy).
  • False: disable the local fallback; only use the native tool.
  • A named strategy (e.g. 'duckduckgo'): resolved via _resolve_local_strategy in subclasses.
  • A Tool or AbstractToolset instance: use this specific local tool.
  • A bare callable: automatically wrapped in a Tool.

NativeTool dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that registers a native tool with the agent.

Wraps a single AgentNativeTool — either a static AbstractNativeTool instance or a callable that dynamically produces one.

Equivalent to passing the tool through Agent(capabilities=[NativeTool(my_tool)]). For provider-adaptive use (with a local fallback), see NativeOrLocalTool or its subclasses like WebSearch.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/native_tool.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@dataclass
class NativeTool(AbstractCapability[AgentDepsT]):
    """A capability that registers a native tool with the agent.

    Wraps a single [`AgentNativeTool`][pydantic_ai.tools.AgentNativeTool] — either a static
    [`AbstractNativeTool`][pydantic_ai.native_tools.AbstractNativeTool] instance or a callable
    that dynamically produces one.

    Equivalent to passing the tool through `Agent(capabilities=[NativeTool(my_tool)])`. For
    provider-adaptive use (with a local fallback), see [`NativeOrLocalTool`][pydantic_ai.capabilities.NativeOrLocalTool]
    or its subclasses like [`WebSearch`][pydantic_ai.capabilities.WebSearch].
    """

    tool: AgentNativeTool[AgentDepsT]

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        return [self.tool]

    @classmethod
    def from_spec(cls, tool: AbstractNativeTool | None = None, **kwargs: Any) -> NativeTool[Any]:
        """Create from spec.

        Supports two YAML forms:

        - Flat: `{NativeTool: {kind: web_search, search_context_size: high}}`
        - Explicit: `{NativeTool: {tool: {kind: web_search}}}`
        """
        if tool is not None:
            validated = _NATIVE_TOOL_ADAPTER.validate_python(tool)
        elif kwargs:
            validated = _NATIVE_TOOL_ADAPTER.validate_python(kwargs)
        else:
            raise TypeError(
                '`NativeTool.from_spec()` requires either a `tool` argument or keyword arguments'
                ' specifying the native tool type (e.g. `kind="web_search"`)'
            )
        return cls(tool=validated)

from_spec classmethod

from_spec(
    tool: AbstractNativeTool | None = None, **kwargs: Any
) -> NativeTool[Any]

Create from spec.

Supports two YAML forms:

  • Flat: {NativeTool: {kind: web_search, search_context_size: high}}
  • Explicit: {NativeTool: {tool: {kind: web_search}}}
Source code in pydantic_ai_slim/pydantic_ai/capabilities/native_tool.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@classmethod
def from_spec(cls, tool: AbstractNativeTool | None = None, **kwargs: Any) -> NativeTool[Any]:
    """Create from spec.

    Supports two YAML forms:

    - Flat: `{NativeTool: {kind: web_search, search_context_size: high}}`
    - Explicit: `{NativeTool: {tool: {kind: web_search}}}`
    """
    if tool is not None:
        validated = _NATIVE_TOOL_ADAPTER.validate_python(tool)
    elif kwargs:
        validated = _NATIVE_TOOL_ADAPTER.validate_python(kwargs)
    else:
        raise TypeError(
            '`NativeTool.from_spec()` requires either a `tool` argument or keyword arguments'
            ' specifying the native tool type (e.g. `kind="web_search"`)'
        )
    return cls(tool=validated)

PrefixTools dataclass

Bases: WrapperCapability[AgentDepsT]

A capability that wraps another capability and prefixes its tool names.

Only the wrapped capability's tools are prefixed; other agent tools are unaffected.

from pydantic_ai import Agent
from pydantic_ai.capabilities import PrefixTools, Toolset
from pydantic_ai.toolsets import FunctionToolset

toolset = FunctionToolset()

agent = Agent(
    'openai:gpt-5',
    capabilities=[
        PrefixTools(
            wrapped=Toolset(toolset),
            prefix='ns',
        ),
    ],
)
Source code in pydantic_ai_slim/pydantic_ai/capabilities/prefix_tools.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class PrefixTools(WrapperCapability[AgentDepsT]):
    """A capability that wraps another capability and prefixes its tool names.

    Only the wrapped capability's tools are prefixed; other agent tools are unaffected.

    ```python
    from pydantic_ai import Agent
    from pydantic_ai.capabilities import PrefixTools, Toolset
    from pydantic_ai.toolsets import FunctionToolset

    toolset = FunctionToolset()

    agent = Agent(
        'openai:gpt-5',
        capabilities=[
            PrefixTools(
                wrapped=Toolset(toolset),
                prefix='ns',
            ),
        ],
    )
    ```
    """

    prefix: str

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return 'PrefixTools'

    @classmethod
    def from_spec(cls, *, prefix: str, capability: CapabilitySpec) -> PrefixTools[Any]:
        """Create from spec with a nested capability specification.

        Args:
            prefix: The prefix to add to tool names (e.g. `'mcp'` turns `'search'` into `'mcp_search'`).
            capability: A capability spec (same format as entries in the `capabilities` list).
        """
        from pydantic_ai.agent.spec import load_capability_from_nested_spec

        wrapped = load_capability_from_nested_spec(capability)
        return cls(wrapped=wrapped, prefix=prefix)

    def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
        toolset = super().get_toolset()
        if toolset is None:
            return None
        if isinstance(toolset, AbstractToolset):
            # Pyright can't narrow Callable type aliases out of unions after isinstance check
            return PrefixedToolset(toolset, prefix=self.prefix)  # pyright: ignore[reportUnknownArgumentType]
        # ToolsetFunc callable — wrap in DynamicToolset so PrefixedToolset can delegate
        return PrefixedToolset(DynamicToolset[AgentDepsT](toolset_func=toolset), prefix=self.prefix)

from_spec classmethod

from_spec(
    *, prefix: str, capability: CapabilitySpec
) -> PrefixTools[Any]

Create from spec with a nested capability specification.

Parameters:

Name Type Description Default
prefix str

The prefix to add to tool names (e.g. 'mcp' turns 'search' into 'mcp_search').

required
capability CapabilitySpec

A capability spec (same format as entries in the capabilities list).

required
Source code in pydantic_ai_slim/pydantic_ai/capabilities/prefix_tools.py
46
47
48
49
50
51
52
53
54
55
56
57
@classmethod
def from_spec(cls, *, prefix: str, capability: CapabilitySpec) -> PrefixTools[Any]:
    """Create from spec with a nested capability specification.

    Args:
        prefix: The prefix to add to tool names (e.g. `'mcp'` turns `'search'` into `'mcp_search'`).
        capability: A capability spec (same format as entries in the `capabilities` list).
    """
    from pydantic_ai.agent.spec import load_capability_from_nested_spec

    wrapped = load_capability_from_nested_spec(capability)
    return cls(wrapped=wrapped, prefix=prefix)

PrepareOutputTools dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that filters or modifies output tool definitions using a callable.

Mirrors PrepareTools for output tools. ctx.retry/ctx.max_retries reflect the output retry budget (max_output_retries), matching the output hook lifecycle.

from pydantic_ai import Agent, RunContext
from pydantic_ai.capabilities import PrepareOutputTools
from pydantic_ai.output import ToolOutput
from pydantic_ai.tools import ToolDefinition


async def only_after_first_step(
    ctx: RunContext, tool_defs: list[ToolDefinition]
) -> list[ToolDefinition] | None:
    return tool_defs if ctx.run_step > 0 else []


agent = Agent(
    'openai:gpt-5',
    output_type=ToolOutput(str),
    capabilities=[PrepareOutputTools(only_after_first_step)],
)
Source code in pydantic_ai_slim/pydantic_ai/capabilities/prepare_tools.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class PrepareOutputTools(AbstractCapability[AgentDepsT]):
    """Capability that filters or modifies output tool definitions using a callable.

    Mirrors [`PrepareTools`][pydantic_ai.capabilities.PrepareTools] for
    [output tools][pydantic_ai.output.ToolOutput]. `ctx.retry`/`ctx.max_retries` reflect
    the **output** retry budget (`max_output_retries`), matching the output hook lifecycle.

    ```python
    from pydantic_ai import Agent, RunContext
    from pydantic_ai.capabilities import PrepareOutputTools
    from pydantic_ai.output import ToolOutput
    from pydantic_ai.tools import ToolDefinition


    async def only_after_first_step(
        ctx: RunContext, tool_defs: list[ToolDefinition]
    ) -> list[ToolDefinition] | None:
        return tool_defs if ctx.run_step > 0 else []


    agent = Agent(
        'openai:gpt-5',
        output_type=ToolOutput(str),
        capabilities=[PrepareOutputTools(only_after_first_step)],
    )
    ```
    """

    prepare_func: ToolsPrepareFunc[AgentDepsT]

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None  # Not spec-serializable (takes a callable)

    async def prepare_output_tools(
        self, ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]
    ) -> list[ToolDefinition]:
        return await _call_prepare_func(self.prepare_func, ctx, tool_defs)

PrepareTools dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that filters or modifies function tool definitions using a callable.

Wraps a ToolsPrepareFunc as a capability. Filters/modifies function tools only; for output tools use PrepareOutputTools.

from pydantic_ai import Agent, RunContext
from pydantic_ai.capabilities import PrepareTools
from pydantic_ai.tools import ToolDefinition


async def hide_admin_tools(
    ctx: RunContext, tool_defs: list[ToolDefinition]
) -> list[ToolDefinition] | None:
    return [td for td in tool_defs if not td.name.startswith('admin_')]


agent = Agent('openai:gpt-5', capabilities=[PrepareTools(hide_admin_tools)])
Source code in pydantic_ai_slim/pydantic_ai/capabilities/prepare_tools.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass
class PrepareTools(AbstractCapability[AgentDepsT]):
    """Capability that filters or modifies function tool definitions using a callable.

    Wraps a [`ToolsPrepareFunc`][pydantic_ai.tools.ToolsPrepareFunc] as a capability.
    Filters/modifies **function** tools only; for output tools use
    [`PrepareOutputTools`][pydantic_ai.capabilities.PrepareOutputTools].

    ```python
    from pydantic_ai import Agent, RunContext
    from pydantic_ai.capabilities import PrepareTools
    from pydantic_ai.tools import ToolDefinition


    async def hide_admin_tools(
        ctx: RunContext, tool_defs: list[ToolDefinition]
    ) -> list[ToolDefinition] | None:
        return [td for td in tool_defs if not td.name.startswith('admin_')]


    agent = Agent('openai:gpt-5', capabilities=[PrepareTools(hide_admin_tools)])
    ```
    """

    prepare_func: ToolsPrepareFunc[AgentDepsT]

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None  # Not spec-serializable (takes a callable)

    async def prepare_tools(self, ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]) -> list[ToolDefinition]:
        return await _call_prepare_func(self.prepare_func, ctx, tool_defs)

ProcessEventStream dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that forwards the agent's event stream to a user-provided async handler.

The handler receives the stream of AgentStreamEvents emitted during model streaming and tool execution for each ModelRequestNode and CallToolsNode. Two forms are supported:

  • An EventStreamHandler — an async def returning None. Events are forwarded to the handler while also being passed through unchanged to the rest of the capability chain, so multiple handlers (and the top-level event_stream_handler argument) can all see the same stream without changing each other's view. A handler that returns early stops receiving events but does not affect downstream consumers; a handler that raises propagates the exception to the rest of the run. Events are delivered synchronously, so a slow handler back-pressures the rest of the stream.
  • An [EventStreamProcessor][pydantic_ai.agent.EventStreamProcessor] — an async generator yielding AgentStreamEvents. The events it yields replace the inner stream for downstream wrappers and consumers, so it can modify, drop, or add events.

When this capability is registered, [agent.run()][pydantic_ai.Agent.run] automatically enables streaming so the handler fires without requiring an explicit event_stream_handler argument.

Durable execution

Under the current durable-execution integrations (Temporal, DBOS, Prefect), model streaming happens inside an activity/step rather than in the outer agent loop. This capability's wrap_run_event_stream hook fires for tool-call events and the final post-streaming batch, but it does not see individual model-response events live — the underlying durable model consumes those inside the activity before returning. The in-flight event_stream_handler parameter does still observe the live events; a future refactor threading the capability chain through the activity boundary is being explored in #4977.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/process_event_stream.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@dataclass
class ProcessEventStream(AbstractCapability[AgentDepsT]):
    """A capability that forwards the agent's event stream to a user-provided async handler.

    The handler receives the stream of [`AgentStreamEvent`][pydantic_ai.messages.AgentStreamEvent]s
    emitted during model streaming and tool execution for each `ModelRequestNode` and
    `CallToolsNode`. Two forms are supported:

    - An [`EventStreamHandler`][pydantic_ai.agent.EventStreamHandler] — an `async def`
      returning `None`. Events are forwarded to the handler while also being passed
      through unchanged to the rest of the capability chain, so multiple handlers (and
      the top-level `event_stream_handler` argument) can all see the same stream without
      changing each other's view. A handler that returns early stops receiving events
      but does not affect downstream consumers; a handler that raises propagates the
      exception to the rest of the run. Events are delivered synchronously, so a slow
      handler back-pressures the rest of the stream.
    - An [`EventStreamProcessor`][pydantic_ai.agent.EventStreamProcessor] — an async
      generator yielding [`AgentStreamEvent`][pydantic_ai.messages.AgentStreamEvent]s.
      The events it yields replace the inner stream for downstream wrappers and consumers,
      so it can modify, drop, or add events.

    When this capability is registered, [`agent.run()`][pydantic_ai.Agent.run] automatically
    enables streaming so the handler fires without requiring an explicit `event_stream_handler`
    argument.

    !!! note "Durable execution"

        Under the current durable-execution integrations
        ([Temporal][pydantic_ai.durable_exec.temporal.TemporalAgent],
        [DBOS][pydantic_ai.durable_exec.dbos.DBOSAgent],
        [Prefect][pydantic_ai.durable_exec.prefect.PrefectAgent]), model streaming happens
        inside an activity/step rather than in the outer agent loop. This capability's
        `wrap_run_event_stream` hook fires for tool-call events and the final post-streaming
        batch, but it does **not** see individual model-response events live — the underlying
        durable model consumes those inside the activity before returning. The in-flight
        `event_stream_handler` parameter does still observe the live events; a future
        refactor threading the capability chain through the activity boundary is being
        explored in [#4977](https://github.com/pydantic/pydantic-ai/pull/4977).
    """

    handler: EventStreamHandlerFunc[AgentDepsT] | EventStreamProcessorFunc[AgentDepsT]

    async def wrap_run_event_stream(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        stream: AsyncIterable[AgentStreamEvent],
    ) -> AsyncIterable[AgentStreamEvent]:
        # Probe the handler: the processor form returns an AsyncIterator directly, while
        # the observer form returns an awaitable. Introspecting the return is robust for
        # both plain functions and callable instances, unlike `inspect.isasyncgenfunction`.
        probe = self.handler(ctx, stream)
        if isinstance(probe, AsyncIterator):
            async for event in probe:
                yield event
            return

        # Observer: the probe is a coroutine we haven't awaited. Close it (nothing has
        # run yet) and re-invoke the handler with the teed receive stream.
        cast('Coroutine[Any, Any, None]', probe).close()

        observer = cast('EventStreamHandlerFunc[AgentDepsT]', self.handler)
        send_stream, receive_stream = anyio.create_memory_object_stream[AgentStreamEvent]()

        async def run_handler() -> None:
            async with receive_stream:
                await observer(ctx, receive_stream)

        async with anyio.create_task_group() as tg:
            tg.start_soon(run_handler)
            async with send_stream:
                handler_alive = True
                async for event in stream:
                    if handler_alive:
                        try:
                            await send_stream.send(event)
                        except (anyio.BrokenResourceError, anyio.ClosedResourceError):
                            # Handler bailed early; keep forwarding events downstream.
                            handler_alive = False
                    yield event

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None  # Not spec-serializable (takes a callable)

ProcessHistory dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that processes message history before model requests.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/process_history.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class ProcessHistory(AbstractCapability[AgentDepsT]):
    """A capability that processes message history before model requests."""

    processor: HistoryProcessorFunc[AgentDepsT]

    async def before_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        request_context: ModelRequestContext,
    ) -> ModelRequestContext:
        request_context.messages = await _run_history_processor(self.processor, ctx, request_context.messages)

        return request_context

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None  # Not spec-serializable (takes a callable)

ReinjectSystemPrompt dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that reinjects the agent's configured system_prompt when missing from history.

Ensures the agent's configured system_prompt is present at the head of the first ModelRequest on every model request.

Intended for callers that reconstruct a message_history from a source that doesn't round-trip system prompts — UI frontends, database persistence layers, conversation compaction pipelines. By default, if any SystemPromptPart is already present anywhere in the history (for example, preserved from a prior run or handed off from another agent), this capability leaves the messages untouched so that existing system prompts remain authoritative. Set replace_existing=True to instead strip any existing SystemPromptParts before prepending the agent's configured prompt — useful when the history comes from an untrusted source (such as a UI frontend) and the server's prompt must win.

The UI adapters automatically add this capability in manage_system_prompt='server' mode with replace_existing=True. Add it explicitly with Agent(..., capabilities=[ReinjectSystemPrompt()]) or per-run via the capabilities= argument on Agent.run to get the same behavior anywhere.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/reinject_system_prompt.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@dataclass
class ReinjectSystemPrompt(AbstractCapability[AgentDepsT]):
    """Capability that reinjects the agent's configured `system_prompt` when missing from history.

    Ensures the agent's configured `system_prompt` is present at the head of the first
    `ModelRequest` on every model request.

    Intended for callers that reconstruct a `message_history` from a source that doesn't
    round-trip system prompts — UI frontends, database persistence layers, conversation
    compaction pipelines. By default, if any `SystemPromptPart` is already present anywhere
    in the history (for example, preserved from a prior run or handed off from another
    agent), this capability leaves the messages untouched so that existing system prompts
    remain authoritative. Set `replace_existing=True` to instead strip any existing
    `SystemPromptPart`s before prepending the agent's configured prompt — useful when the
    history comes from an untrusted source (such as a UI frontend) and the server's prompt
    must win.

    The UI adapters automatically add this capability in `manage_system_prompt='server'` mode
    with `replace_existing=True`. Add it explicitly with
    `Agent(..., capabilities=[ReinjectSystemPrompt()])` or per-run via the `capabilities=`
    argument on [`Agent.run`][pydantic_ai.agent.AbstractAgent.run] to get the same behavior
    anywhere.
    """

    replace_existing: bool = False
    """If `True`, strip any existing `SystemPromptPart`s from the history before prepending
    the agent's configured prompt. If `False` (the default), the capability is a no-op when
    any `SystemPromptPart` is already present."""

    async def before_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        request_context: ModelRequestContext,
    ) -> ModelRequestContext:
        messages = request_context.messages
        if self.replace_existing:
            _strip_system_prompts(messages)
        elif _has_system_prompt(messages):
            return request_context
        if ctx.agent is None:
            return request_context  # pragma: no cover — ctx.agent is always set during an agent run
        sys_parts = await ctx.agent.system_prompt_parts(
            deps=ctx.deps,
            model=ctx.model,
            message_history=messages,
            prompt=ctx.prompt,
            usage=ctx.usage,
            model_settings=ctx.model_settings,
        )
        if sys_parts:
            _prepend_to_first_request(messages, sys_parts)
        return request_context

replace_existing class-attribute instance-attribute

replace_existing: bool = False

If True, strip any existing SystemPromptParts from the history before prepending the agent's configured prompt. If False (the default), the capability is a no-op when any SystemPromptPart is already present.

SetToolMetadata dataclass

Bases: AbstractCapability[AgentDepsT]

Capability that merges metadata key-value pairs onto selected tools.

from pydantic_ai import Agent
from pydantic_ai.capabilities import SetToolMetadata

agent = Agent('openai:gpt-5', capabilities=[SetToolMetadata(code_mode=True)])
Source code in pydantic_ai_slim/pydantic_ai/capabilities/set_tool_metadata.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclass
class SetToolMetadata(AbstractCapability[AgentDepsT]):
    """Capability that merges metadata key-value pairs onto selected tools.

    ```python
    from pydantic_ai import Agent
    from pydantic_ai.capabilities import SetToolMetadata

    agent = Agent('openai:gpt-5', capabilities=[SetToolMetadata(code_mode=True)])
    ```
    """

    tools: ToolSelector[AgentDepsT] = 'all'
    metadata: dict[str, Any] = field(default_factory=dict[str, Any])

    def __init__(self, *, tools: ToolSelector[AgentDepsT] = 'all', **metadata: Any) -> None:
        self.tools = tools
        self.metadata = metadata

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return 'SetToolMetadata'

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
        selector = self.tools
        metadata = self.metadata

        async def _set_metadata(ctx: RunContext[AgentDepsT], tool_defs: list[ToolDefinition]) -> list[ToolDefinition]:
            resolved: list[ToolDefinition] = []
            for td in tool_defs:
                if await matches_tool_selector(selector, ctx, td):
                    td = replace(td, metadata={**(td.metadata or {}), **metadata})
                resolved.append(td)
            return resolved

        return PreparedToolset(toolset, _set_metadata)

Thinking dataclass

Bases: AbstractCapability[Any]

Enables and configures model thinking/reasoning.

Uses the unified thinking setting in ModelSettings to work portably across providers. Provider-specific thinking settings (e.g., anthropic_thinking, openai_reasoning_effort) take precedence when both are set.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/thinking.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dataclass
class Thinking(AbstractCapability[Any]):
    """Enables and configures model thinking/reasoning.

    Uses the unified `thinking` setting in
    [`ModelSettings`][pydantic_ai.settings.ModelSettings] to work portably across providers.
    Provider-specific thinking settings (e.g., `anthropic_thinking`,
    `openai_reasoning_effort`) take precedence when both are set.
    """

    effort: ThinkingLevel = True
    """The thinking effort level.

    - `True`: Enable thinking with the provider's default effort.
    - `False`: Disable thinking (silently ignored on always-on models).
    - `'minimal'`/`'low'`/`'medium'`/`'high'`/`'xhigh'`: Enable thinking at a specific effort level.
    """

    def get_model_settings(self) -> ModelSettings | None:
        return ModelSettings(thinking=self.effort)

effort class-attribute instance-attribute

effort: ThinkingLevel = True

The thinking effort level.

  • True: Enable thinking with the provider's default effort.
  • False: Disable thinking (silently ignored on always-on models).
  • 'minimal'/'low'/'medium'/'high'/'xhigh': Enable thinking at a specific effort level.

ThreadExecutor dataclass

Bases: AbstractCapability[Any]

Use a custom executor for running sync functions in threads.

By default, sync tool functions and other sync callbacks are run in threads using [anyio.to_thread.run_sync][], which creates ephemeral threads. In long-running servers (e.g. FastAPI), this can lead to thread accumulation under sustained load.

This capability provides a bounded ThreadPoolExecutor (or any Executor) to use instead, scoped to agent runs:

from concurrent.futures import ThreadPoolExecutor

from pydantic_ai import Agent
from pydantic_ai.capabilities import ThreadExecutor

executor = ThreadPoolExecutor(max_workers=16, thread_name_prefix='agent-worker')
agent = Agent('openai:gpt-5.2', capabilities=[ThreadExecutor(executor)])

To set an executor for all agents globally, use Agent.using_thread_executor().

Source code in pydantic_ai_slim/pydantic_ai/capabilities/thread_executor.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@dataclass
class ThreadExecutor(AbstractCapability[Any]):
    """Use a custom executor for running sync functions in threads.

    By default, sync tool functions and other sync callbacks are run in threads using
    [`anyio.to_thread.run_sync`][anyio.to_thread.run_sync], which creates ephemeral threads.
    In long-running servers (e.g. FastAPI), this can lead to thread accumulation under sustained load.

    This capability provides a bounded [`ThreadPoolExecutor`][concurrent.futures.ThreadPoolExecutor]
    (or any [`Executor`][concurrent.futures.Executor]) to use instead, scoped to agent runs:

    ```python
    from concurrent.futures import ThreadPoolExecutor

    from pydantic_ai import Agent
    from pydantic_ai.capabilities import ThreadExecutor

    executor = ThreadPoolExecutor(max_workers=16, thread_name_prefix='agent-worker')
    agent = Agent('openai:gpt-5.2', capabilities=[ThreadExecutor(executor)])
    ```

    To set an executor for all agents globally, use
    [`Agent.using_thread_executor()`][pydantic_ai.agent.AbstractAgent.using_thread_executor].
    """

    executor: Executor
    """The executor to use for running sync functions."""

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None

    async def wrap_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        handler: WrapRunHandler,
    ) -> AgentRunResult[Any]:
        with _utils.using_thread_executor(self.executor):
            return await handler()

executor instance-attribute

executor: Executor

The executor to use for running sync functions.

Toolset dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that provides a toolset.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/toolset.py
 9
10
11
12
13
14
15
16
17
18
19
20
@dataclass
class Toolset(AbstractCapability[AgentDepsT]):
    """A capability that provides a toolset."""

    toolset: AgentToolset[AgentDepsT]

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None  # Not spec-serializable (takes a callable)

    def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
        return self.toolset

WebFetch dataclass

Bases: NativeOrLocalTool[AgentDepsT]

URL fetching capability.

Uses the model's native URL fetching and raises UserError on models that don't support it natively. Pass local=True to opt into a local fallback (requires the web-fetch optional group):

pip install "pydantic-ai-slim[web-fetch]"
Source code in pydantic_ai_slim/pydantic_ai/capabilities/web_fetch.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@dataclass(init=False)
class WebFetch(NativeOrLocalTool[AgentDepsT]):
    """URL fetching capability.

    Uses the model's native URL fetching and raises `UserError` on models that
    don't support it natively. Pass `local=True` to opt into a local fallback
    (requires the `web-fetch` optional group):

    ```bash
    pip install "pydantic-ai-slim[web-fetch]"
    ```
    """

    allowed_domains: list[str] | None
    """Only fetch from these domains. Enforced locally when native is unavailable."""

    blocked_domains: list[str] | None
    """Never fetch from these domains. Enforced locally when native is unavailable."""

    max_uses: int | None
    """Maximum number of fetches per run. Requires native support."""

    enable_citations: bool | None
    """Enable citations for fetched content. Native-only; ignored by local tools."""

    max_content_tokens: int | None
    """Maximum content length in tokens. Native-only; ignored by local tools."""

    def __init__(
        self,
        *,
        native: WebFetchTool
        | Callable[[RunContext[AgentDepsT]], Awaitable[WebFetchTool | None] | WebFetchTool | None]
        | bool = True,
        local: Tool[AgentDepsT] | Callable[..., Any] | bool | None = None,
        allowed_domains: list[str] | None = None,
        blocked_domains: list[str] | None = None,
        max_uses: int | None = None,
        enable_citations: bool | None = None,
        max_content_tokens: int | None = None,
    ) -> None:
        self.native = native
        self.local = local
        self.allowed_domains = allowed_domains
        self.blocked_domains = blocked_domains
        self.max_uses = max_uses
        self.enable_citations = enable_citations
        self.max_content_tokens = max_content_tokens
        self.__post_init__()

    def _default_native(self) -> WebFetchTool:
        kwargs: dict[str, Any] = {}
        if self.allowed_domains is not None:
            kwargs['allowed_domains'] = self.allowed_domains
        if self.blocked_domains is not None:
            kwargs['blocked_domains'] = self.blocked_domains
        if self.max_uses is not None:
            kwargs['max_uses'] = self.max_uses
        if self.enable_citations is not None:
            kwargs['enable_citations'] = self.enable_citations
        if self.max_content_tokens is not None:
            kwargs['max_content_tokens'] = self.max_content_tokens
        return WebFetchTool(**kwargs)

    def _native_unique_id(self) -> str:
        return WebFetchTool.kind

    def _resolve_local_strategy(self, name: str | bool) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT]:
        if name is True:
            try:
                from pydantic_ai.common_tools.web_fetch import web_fetch_tool
            except ImportError as e:
                raise UserError(
                    'WebFetch(local=True) requires the `web-fetch` optional group — '
                    '`pip install "pydantic-ai-slim[web-fetch]"`.'
                ) from e
            return web_fetch_tool(
                allowed_domains=self.allowed_domains,
                blocked_domains=self.blocked_domains,
            )
        raise UserError(
            f'WebFetch(local={name!r}) is not a known strategy. '
            'Pass `local=True` for the default markdownify-based tool, or a Tool/callable directly.'
        )

    def _requires_native(self) -> bool:
        return self.max_uses is not None

allowed_domains instance-attribute

allowed_domains: list[str] | None = allowed_domains

Only fetch from these domains. Enforced locally when native is unavailable.

blocked_domains instance-attribute

blocked_domains: list[str] | None = blocked_domains

Never fetch from these domains. Enforced locally when native is unavailable.

max_uses instance-attribute

max_uses: int | None = max_uses

Maximum number of fetches per run. Requires native support.

enable_citations instance-attribute

enable_citations: bool | None = enable_citations

Enable citations for fetched content. Native-only; ignored by local tools.

max_content_tokens instance-attribute

max_content_tokens: int | None = max_content_tokens

Maximum content length in tokens. Native-only; ignored by local tools.

WebSearch dataclass

Bases: NativeOrLocalTool[AgentDepsT]

Web search capability.

Uses the model's native web search and raises UserError on models that don't support it natively. Pass local='duckduckgo' (or local=True) to opt into a local DuckDuckGo fallback — requires the duckduckgo optional group:

pip install "pydantic-ai-slim[duckduckgo]"

local= also accepts any callable, Tool, or AbstractToolset for a custom fallback.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/web_search.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass(init=False)
class WebSearch(NativeOrLocalTool[AgentDepsT]):
    """Web search capability.

    Uses the model's native web search and raises `UserError` on models that
    don't support it natively. Pass `local='duckduckgo'` (or `local=True`) to opt into a
    local DuckDuckGo fallback — requires the `duckduckgo` optional group:

    ```bash
    pip install "pydantic-ai-slim[duckduckgo]"
    ```

    `local=` also accepts any callable, `Tool`, or `AbstractToolset` for a custom fallback.
    """

    search_context_size: Literal['low', 'medium', 'high'] | None
    """Controls how much context is retrieved from the web. Native-only; ignored by local tools."""

    user_location: WebSearchUserLocation | None
    """Localize search results based on user location. Native-only; ignored by local tools."""

    blocked_domains: list[str] | None
    """Domains to exclude from results. Requires native support."""

    allowed_domains: list[str] | None
    """Only include results from these domains. Requires native support."""

    max_uses: int | None
    """Maximum number of web searches per run. Requires native support."""

    def __init__(
        self,
        *,
        native: WebSearchTool
        | Callable[[RunContext[AgentDepsT]], Awaitable[WebSearchTool | None] | WebSearchTool | None]
        | bool = True,
        local: WebSearchLocalStrategy | Tool[AgentDepsT] | Callable[..., Any] | bool | None = None,
        search_context_size: Literal['low', 'medium', 'high'] | None = None,
        user_location: WebSearchUserLocation | None = None,
        blocked_domains: list[str] | None = None,
        allowed_domains: list[str] | None = None,
        max_uses: int | None = None,
    ) -> None:
        self.native = native
        self.local = local
        self.search_context_size = search_context_size
        self.user_location = user_location
        self.blocked_domains = blocked_domains
        self.allowed_domains = allowed_domains
        self.max_uses = max_uses
        self.__post_init__()

    def _default_native(self) -> WebSearchTool:
        kwargs: dict[str, Any] = {}
        if self.search_context_size is not None:
            kwargs['search_context_size'] = self.search_context_size
        if self.user_location is not None:
            kwargs['user_location'] = self.user_location
        if self.blocked_domains is not None:
            kwargs['blocked_domains'] = self.blocked_domains
        if self.allowed_domains is not None:
            kwargs['allowed_domains'] = self.allowed_domains
        if self.max_uses is not None:
            kwargs['max_uses'] = self.max_uses
        return WebSearchTool(**kwargs)

    def _native_unique_id(self) -> str:
        return WebSearchTool.kind

    def _resolve_local_strategy(self, name: str | bool) -> Tool[AgentDepsT] | AbstractToolset[AgentDepsT]:
        # True → the default strategy (DuckDuckGo)
        strategy = 'duckduckgo' if name is True else name
        if strategy == 'duckduckgo':
            try:
                from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
            except ImportError as e:
                raise UserError(
                    "WebSearch(local='duckduckgo') requires the `duckduckgo` optional group — "
                    '`pip install "pydantic-ai-slim[duckduckgo]"`.'
                ) from e
            return duckduckgo_search_tool()
        raise UserError(
            f'WebSearch(local={name!r}) is not a known strategy. '
            "Supported: 'duckduckgo' (or `local=True`). Or pass a Tool/callable directly."
        )

    def _requires_native(self) -> bool:
        return self.blocked_domains is not None or self.allowed_domains is not None or self.max_uses is not None

search_context_size instance-attribute

search_context_size: (
    Literal["low", "medium", "high"] | None
) = search_context_size

Controls how much context is retrieved from the web. Native-only; ignored by local tools.

user_location instance-attribute

user_location: WebSearchUserLocation | None = user_location

Localize search results based on user location. Native-only; ignored by local tools.

blocked_domains instance-attribute

blocked_domains: list[str] | None = blocked_domains

Domains to exclude from results. Requires native support.

allowed_domains instance-attribute

allowed_domains: list[str] | None = allowed_domains

Only include results from these domains. Requires native support.

max_uses instance-attribute

max_uses: int | None = max_uses

Maximum number of web searches per run. Requires native support.

WrapperCapability dataclass

Bases: AbstractCapability[AgentDepsT]

A capability that wraps another capability and delegates all methods.

Analogous to WrapperToolset for toolsets. Subclass and override specific methods to modify behavior while delegating the rest.

Source code in pydantic_ai_slim/pydantic_ai/capabilities/wrapper.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
@dataclass
class WrapperCapability(AbstractCapability[AgentDepsT]):
    """A capability that wraps another capability and delegates all methods.

    Analogous to [`WrapperToolset`][pydantic_ai.toolsets.WrapperToolset] for toolsets.
    Subclass and override specific methods to modify behavior while delegating the rest.
    """

    wrapped: AbstractCapability[AgentDepsT]

    def apply(self, visitor: Callable[[AbstractCapability[AgentDepsT]], None]) -> None:
        self.wrapped.apply(visitor)

    @classmethod
    def get_serialization_name(cls) -> str | None:
        return None

    @property
    def has_wrap_node_run(self) -> bool:
        return type(self).wrap_node_run is not WrapperCapability.wrap_node_run or self.wrapped.has_wrap_node_run

    @property
    def has_wrap_run_event_stream(self) -> bool:
        return (
            type(self).wrap_run_event_stream is not WrapperCapability.wrap_run_event_stream
            or self.wrapped.has_wrap_run_event_stream
        )

    async def for_run(self, ctx: RunContext[AgentDepsT]) -> AbstractCapability[AgentDepsT]:
        new_wrapped = await self.wrapped.for_run(ctx)
        if new_wrapped is self.wrapped:
            return self
        return replace(self, wrapped=new_wrapped)

    # --- Get methods ---

    def get_instructions(self) -> AgentInstructions[AgentDepsT] | None:
        return self.wrapped.get_instructions()

    def get_model_settings(self) -> AgentModelSettings[AgentDepsT] | None:
        return self.wrapped.get_model_settings()

    def get_toolset(self) -> AgentToolset[AgentDepsT] | None:
        return self.wrapped.get_toolset()

    def get_native_tools(self) -> Sequence[AgentNativeTool[AgentDepsT]]:
        return self.wrapped.get_native_tools()

    def get_wrapper_toolset(self, toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT] | None:
        return self.wrapped.get_wrapper_toolset(toolset)

    async def prepare_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        return await self.wrapped.prepare_tools(ctx, tool_defs)

    async def prepare_output_tools(
        self,
        ctx: RunContext[AgentDepsT],
        tool_defs: list[ToolDefinition],
    ) -> list[ToolDefinition]:
        return await self.wrapped.prepare_output_tools(ctx, tool_defs)

    # --- Run lifecycle hooks ---

    async def before_run(self, ctx: RunContext[AgentDepsT]) -> None:
        await self.wrapped.before_run(ctx)

    async def after_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        result: AgentRunResult[Any],
    ) -> AgentRunResult[Any]:
        return await self.wrapped.after_run(ctx, result=result)

    async def wrap_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        handler: WrapRunHandler,
    ) -> AgentRunResult[Any]:
        return await self.wrapped.wrap_run(ctx, handler=handler)

    async def on_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        error: BaseException,
    ) -> AgentRunResult[Any]:
        return await self.wrapped.on_run_error(ctx, error=error)

    # --- Node run lifecycle hooks ---

    async def before_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
    ) -> AgentNode[AgentDepsT]:
        return await self.wrapped.before_node_run(ctx, node=node)

    async def after_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        result: NodeResult[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        return await self.wrapped.after_node_run(ctx, node=node, result=result)

    async def wrap_node_run(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        handler: WrapNodeRunHandler[AgentDepsT],
    ) -> NodeResult[AgentDepsT]:
        return await self.wrapped.wrap_node_run(ctx, node=node, handler=handler)

    async def on_node_run_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        node: AgentNode[AgentDepsT],
        error: Exception,
    ) -> NodeResult[AgentDepsT]:
        return await self.wrapped.on_node_run_error(ctx, node=node, error=error)

    # --- Event stream hook ---

    async def wrap_run_event_stream(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        stream: AsyncIterable[AgentStreamEvent],
    ) -> AsyncIterable[AgentStreamEvent]:
        async for event in self.wrapped.wrap_run_event_stream(ctx, stream=stream):
            yield event

    # --- Model request lifecycle hooks ---

    async def before_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        request_context: ModelRequestContext,
    ) -> ModelRequestContext:
        return await self.wrapped.before_model_request(ctx, request_context)

    async def after_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        response: ModelResponse,
    ) -> ModelResponse:
        return await self.wrapped.after_model_request(ctx, request_context=request_context, response=response)

    async def wrap_model_request(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        handler: WrapModelRequestHandler,
    ) -> ModelResponse:
        return await self.wrapped.wrap_model_request(ctx, request_context=request_context, handler=handler)

    async def on_model_request_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        request_context: ModelRequestContext,
        error: Exception,
    ) -> ModelResponse:
        return await self.wrapped.on_model_request_error(ctx, request_context=request_context, error=error)

    # --- Tool validate lifecycle hooks ---

    async def before_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
    ) -> RawToolArgs:
        return await self.wrapped.before_tool_validate(ctx, call=call, tool_def=tool_def, args=args)

    async def after_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
    ) -> ValidatedToolArgs:
        return await self.wrapped.after_tool_validate(ctx, call=call, tool_def=tool_def, args=args)

    async def wrap_tool_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        handler: WrapToolValidateHandler,
    ) -> ValidatedToolArgs:
        return await self.wrapped.wrap_tool_validate(ctx, call=call, tool_def=tool_def, args=args, handler=handler)

    async def on_tool_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: RawToolArgs,
        error: ValidationError | ModelRetry,
    ) -> ValidatedToolArgs:
        return await self.wrapped.on_tool_validate_error(ctx, call=call, tool_def=tool_def, args=args, error=error)

    # --- Tool execute lifecycle hooks ---

    async def before_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
    ) -> ValidatedToolArgs:
        return await self.wrapped.before_tool_execute(ctx, call=call, tool_def=tool_def, args=args)

    async def after_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        result: Any,
    ) -> Any:
        return await self.wrapped.after_tool_execute(ctx, call=call, tool_def=tool_def, args=args, result=result)

    async def wrap_tool_execute(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        handler: WrapToolExecuteHandler,
    ) -> Any:
        return await self.wrapped.wrap_tool_execute(ctx, call=call, tool_def=tool_def, args=args, handler=handler)

    async def on_tool_execute_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        call: ToolCallPart,
        tool_def: ToolDefinition,
        args: ValidatedToolArgs,
        error: Exception,
    ) -> Any:
        return await self.wrapped.on_tool_execute_error(ctx, call=call, tool_def=tool_def, args=args, error=error)

    # --- Output validate lifecycle hooks ---

    async def before_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
    ) -> RawOutput:
        return await self.wrapped.before_output_validate(ctx, output_context=output_context, output=output)

    async def after_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        return await self.wrapped.after_output_validate(ctx, output_context=output_context, output=output)

    async def wrap_output_validate(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        handler: WrapOutputValidateHandler,
    ) -> Any:
        return await self.wrapped.wrap_output_validate(
            ctx, output_context=output_context, output=output, handler=handler
        )

    async def on_output_validate_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: RawOutput,
        error: ValidationError | ModelRetry,
    ) -> Any:
        return await self.wrapped.on_output_validate_error(
            ctx, output_context=output_context, output=output, error=error
        )

    # --- Output process lifecycle hooks ---

    async def before_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        return await self.wrapped.before_output_process(ctx, output_context=output_context, output=output)

    async def after_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
    ) -> Any:
        return await self.wrapped.after_output_process(ctx, output_context=output_context, output=output)

    async def wrap_output_process(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        handler: WrapOutputProcessHandler,
    ) -> Any:
        return await self.wrapped.wrap_output_process(
            ctx, output_context=output_context, output=output, handler=handler
        )

    async def on_output_process_error(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        output_context: OutputContext,
        output: Any,
        error: Exception,
    ) -> Any:
        return await self.wrapped.on_output_process_error(
            ctx, output_context=output_context, output=output, error=error
        )

    async def handle_deferred_tool_calls(
        self,
        ctx: RunContext[AgentDepsT],
        *,
        requests: DeferredToolRequests,
    ) -> DeferredToolResults | None:
        return await self.wrapped.handle_deferred_tool_calls(ctx, requests=requests)

AgentCapability module-attribute

A capability or a CapabilityFunc that takes a run context and returns one.

Use as the item type for Agent(capabilities=[...]) and agent.run(capabilities=[...]). Functions are wrapped in a DynamicCapability automatically.

CAPABILITY_TYPES module-attribute

CAPABILITY_TYPES: dict[
    str, type[AbstractCapability[Any]]
] = {
    name: cls
    for cls in (
        NativeTool,
        ImageGeneration,
        IncludeToolReturnSchemas,
        Instrumentation,
        MCP,
        PrefixTools,
        PrepareTools,
        ProcessHistory,
        ReinjectSystemPrompt,
        SetToolMetadata,
        Thinking,
        ToolSearch,
        Toolset,
        WebFetch,
        WebSearch,
    )
    if (name := (get_serialization_name())) is not None
}

Registry of all capability types that have a serialization name, mapping name to class.