Skip to content

pydantic_evals.online

Online evaluation — attach evaluators to live functions for automatic background evaluation.

This module provides the infrastructure for running evaluators on production (or staging) traffic. The same Evaluator instances used with Dataset.evaluate() work here, the difference is in how they are wired up (decorator vs dataset) rather than what they are.

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

CallbackSink

An EvaluationSink that delegates to a user-provided callable.

The callback receives the results, failures, and context. Other fields on the SinkPayload (such as span_reference and target) are not passed — use a custom EvaluationSink implementation if you need them.

Source code in pydantic_evals/pydantic_evals/_online.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class CallbackSink:
    """An `EvaluationSink` that delegates to a user-provided callable.

    The callback receives the results, failures, and context. Other fields on
    the [`SinkPayload`][pydantic_evals.online.SinkPayload] (such as
    `span_reference` and `target`) are not passed — use a custom
    `EvaluationSink` implementation if you need them.
    """

    def __init__(self, callback: SinkCallback) -> None:
        self.callback = callback

    async def submit(self, payload: SinkPayload) -> None:
        result = self.callback(payload.results, payload.failures, payload.context)
        if inspect.isawaitable(result):
            await result

EvaluationSink

Bases: Protocol

Protocol for additional evaluation result destinations.

By default, online evaluation emits gen_ai.evaluation.result OTel events for every evaluator run — no sink registration required. Sinks are the escape hatch for custom handling in addition to OTel emission: in-memory test capture, fan-out to Slack/DB, non-OTel backends, alerting pipelines, etc. See OnlineEvalConfig.default_sink.

To disable the default OTel emission (e.g. in tests that only want to assert on a custom sink), set emit_otel_events=False on the config.

Source code in pydantic_evals/pydantic_evals/_online.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@runtime_checkable
class EvaluationSink(Protocol):
    """Protocol for **additional** evaluation result destinations.

    By default, online evaluation emits `gen_ai.evaluation.result` OTel events
    for every evaluator run — no sink registration required. Sinks are the
    escape hatch for custom handling *in addition to* OTel emission: in-memory
    test capture, fan-out to Slack/DB, non-OTel backends, alerting pipelines,
    etc. See [`OnlineEvalConfig.default_sink`][pydantic_evals.online.OnlineEvalConfig.default_sink].

    To disable the default OTel emission (e.g. in tests that only want to
    assert on a custom sink), set
    [`emit_otel_events=False`][pydantic_evals.online.OnlineEvalConfig.emit_otel_events]
    on the config.
    """

    async def submit(self, payload: SinkPayload) -> None:
        """Submit evaluation results to the sink.

        The payload may include results from one or more evaluators that ran for
        a given function call — when multiple evaluators share this sink, their
        results are batched into a single `submit()` call. Each result carries
        enough metadata (name, evaluator version, source) to be attributed
        downstream; the exact batching behavior is an implementation detail and
        may change.

        Args:
            payload: A [`SinkPayload`][pydantic_evals.online.SinkPayload] bundling
                results, failures, context, span reference, and target. Sinks
                should read only the fields they need; new fields may be added
                in future releases.
        """
        ...

submit async

submit(payload: SinkPayload) -> None

Submit evaluation results to the sink.

The payload may include results from one or more evaluators that ran for a given function call — when multiple evaluators share this sink, their results are batched into a single submit() call. Each result carries enough metadata (name, evaluator version, source) to be attributed downstream; the exact batching behavior is an implementation detail and may change.

Parameters:

Name Type Description Default
payload SinkPayload

A SinkPayload bundling results, failures, context, span reference, and target. Sinks should read only the fields they need; new fields may be added in future releases.

required
Source code in pydantic_evals/pydantic_evals/_online.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def submit(self, payload: SinkPayload) -> None:
    """Submit evaluation results to the sink.

    The payload may include results from one or more evaluators that ran for
    a given function call — when multiple evaluators share this sink, their
    results are batched into a single `submit()` call. Each result carries
    enough metadata (name, evaluator version, source) to be attributed
    downstream; the exact batching behavior is an implementation detail and
    may change.

    Args:
        payload: A [`SinkPayload`][pydantic_evals.online.SinkPayload] bundling
            results, failures, context, span reference, and target. Sinks
            should read only the fields they need; new fields may be added
            in future releases.
    """
    ...

SinkPayload dataclass

Container passed to EvaluationSink.submit.

Do not instantiate directly

SinkPayload is constructed internally by pydantic-evals. We reserve the right to add fields in any release — if you build your own instances, a future version may break your code. Sink implementations should accept the payload as-is and read only the fields they need.

Source code in pydantic_evals/pydantic_evals/_online.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass(kw_only=True, frozen=True)
class SinkPayload:
    """Container passed to [`EvaluationSink.submit`][pydantic_evals.online.EvaluationSink.submit].

    !!! warning "Do not instantiate directly"
        `SinkPayload` is constructed internally by pydantic-evals. We reserve the right
        to add fields in any release — if you build your own instances, a future version
        may break your code. Sink implementations should accept the payload as-is and read
        only the fields they need.
    """

    results: Sequence[EvaluationResult]
    """Evaluation results from the evaluator run."""

    failures: Sequence[EvaluatorFailure]
    """Failures from the evaluator run if it raised."""

    context: EvaluatorContext
    """The full evaluator context for the function call."""

    span_reference: SpanReference | None
    """Reference to the OTel span for the function call, if available."""

    target: str
    """Identifies the function/agent being evaluated, supplied by the
    `@evaluate` decorator (defaults resolved at decoration time)."""

results instance-attribute

Evaluation results from the evaluator run.

failures instance-attribute

Failures from the evaluator run if it raised.

context instance-attribute

The full evaluator context for the function call.

span_reference instance-attribute

span_reference: SpanReference | None

Reference to the OTel span for the function call, if available.

target instance-attribute

target: str

Identifies the function/agent being evaluated, supplied by the @evaluate decorator (defaults resolved at decoration time).

OnErrorLocation module-attribute

OnErrorLocation = Literal['sink', 'on_max_concurrency']

The location within the online evaluation pipeline where an error occurred.

  • 'sink' — something went wrong delivering results downstream. This is most often an exception raised by a registered EvaluationSink.submit, but it's also used as a catch-all for failures in the default OTel event emission path (which is rare in practice; the OTel SDK rarely raises during emit()).
  • 'on_max_concurrency' — the evaluator's on_max_concurrency callback itself raised while being notified about a dropped evaluation.

SamplingMode module-attribute

SamplingMode = Literal['independent', 'correlated']

Controls how per-evaluator sample rates interact across evaluators for a single call.

  • 'independent' (default): Each evaluator flips its own coin. With N evaluators each at rate r, the probability of any evaluation overhead is 1 − (1−r)^N.
  • 'correlated': A single random seed is generated per call and shared across evaluators. An evaluator runs when call_seed < rate, so lower-rate evaluators' calls are always a subset of higher-rate ones. The probability of any overhead equals max(rate_i).

SamplingContext dataclass

Context available when deciding whether to sample an evaluator.

Contains the information available before the decorated function runs — the evaluator instance, function inputs, config metadata, and a per-call random seed. The function's output and duration are not yet available at sampling time.

Source code in pydantic_evals/pydantic_evals/online.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@dataclass(kw_only=True)
class SamplingContext:
    """Context available when deciding whether to sample an evaluator.

    Contains the information available *before* the decorated function runs — the evaluator
    instance, function inputs, config metadata, and a per-call random seed. The function's
    output and duration are not yet available at sampling time.
    """

    evaluator: Evaluator
    """The evaluator being sampled."""
    inputs: Any
    """The inputs to the decorated function."""
    metadata: dict[str, Any] | None
    """Metadata from the [`OnlineEvalConfig`][pydantic_evals.online.OnlineEvalConfig], if set."""
    call_seed: float
    """A uniform random value in [0, 1) generated once per decorated function call.

    Shared across all evaluators for the same call. In `'correlated'` sampling mode this is
    used automatically; in `'independent'` mode it is available for custom `sample_rate`
    callables that want to implement their own correlated logic.
    """

evaluator instance-attribute

evaluator: Evaluator

The evaluator being sampled.

inputs instance-attribute

inputs: Any

The inputs to the decorated function.

metadata instance-attribute

metadata: dict[str, Any] | None

Metadata from the OnlineEvalConfig, if set.

call_seed instance-attribute

call_seed: float

A uniform random value in [0, 1) generated once per decorated function call.

Shared across all evaluators for the same call. In 'correlated' sampling mode this is used automatically; in 'independent' mode it is available for custom sample_rate callables that want to implement their own correlated logic.

OnMaxConcurrencyCallback module-attribute

OnMaxConcurrencyCallback = Callable[
    [EvaluatorContext], None | Awaitable[None]
]

Callback invoked when an evaluation is dropped due to concurrency limits.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async.

OnSamplingErrorCallback module-attribute

OnSamplingErrorCallback = Callable[
    [Exception, Evaluator], None
]

Callback invoked when a sample_rate callable raises an exception.

Called synchronously before the decorated function runs. Receives the exception and the evaluator whose sample_rate failed. Must be sync (not async). If set, the evaluator is skipped. If not set, the exception propagates to the caller.

OnErrorCallback module-attribute

OnErrorCallback = Callable[
    [
        Exception,
        EvaluatorContext,
        Evaluator,
        OnErrorLocation,
    ],
    None | Awaitable[None],
]

Callback invoked when an exception occurs in the online evaluation pipeline.

Receives the exception, the evaluator context, the evaluator instance, and a location string indicating where the error occurred. Can be sync or async.

disable_evaluation

disable_evaluation() -> Iterator[None]

Context manager to disable all online evaluation in the current context.

When active, decorated functions still execute normally but no evaluators are dispatched.

Source code in pydantic_evals/pydantic_evals/online.py
155
156
157
158
159
160
161
162
163
164
165
@contextmanager
def disable_evaluation() -> Iterator[None]:
    """Context manager to disable all online evaluation in the current context.

    When active, decorated functions still execute normally but no evaluators are dispatched.
    """
    token = _EVALUATION_DISABLED.set(True)
    try:
        yield
    finally:
        _EVALUATION_DISABLED.reset(token)

SpanReference dataclass

Identifies a span that evaluation results should be associated with.

Used by sinks to associate evaluation results with the original function execution span.

Source code in pydantic_evals/pydantic_evals/online.py
168
169
170
171
172
173
174
175
176
177
178
@dataclass(kw_only=True)
class SpanReference:
    """Identifies a span that evaluation results should be associated with.

    Used by sinks to associate evaluation results with the original function execution span.
    """

    trace_id: str
    """The trace ID of the span."""
    span_id: str
    """The span ID of the span."""

trace_id instance-attribute

trace_id: str

The trace ID of the span.

span_id instance-attribute

span_id: str

The span ID of the span.

SinkCallback module-attribute

Type alias for bare callables accepted wherever an EvaluationSink is expected.

Auto-wrapped in CallbackSink when passed as a sink parameter.

OnlineEvaluator dataclass

Wraps an Evaluator with per-evaluator online configuration.

Different evaluators often need different settings — a cheap heuristic should run on 100% of traffic while an expensive LLM judge might run on only 1%.

Source code in pydantic_evals/pydantic_evals/online.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@dataclass(kw_only=True)
class OnlineEvaluator:
    """Wraps an `Evaluator` with per-evaluator online configuration.

    Different evaluators often need different settings — a cheap heuristic should
    run on 100% of traffic while an expensive LLM judge might run on only 1%.
    """

    evaluator: Evaluator
    """The evaluator to run.

    To version an evaluator, override
    [`get_evaluator_version`][pydantic_evals.evaluators.Evaluator.get_evaluator_version] on the
    `Evaluator` subclass (see `Evaluator` docstring). The framework calls it at dispatch time and
    propagates the value to sinks alongside each result.
    """
    sample_rate: float | Callable[[SamplingContext], float | bool] | None = None
    """Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.

    When a callable, it receives a [`SamplingContext`][pydantic_evals.online.SamplingContext]
    with the function inputs, config metadata, and evaluator name — but not the output or
    duration (which aren't available yet at sampling time).

    Defaults to `None`, which uses the config's `default_sample_rate` at each call.
    Set explicitly to override.
    """
    max_concurrency: int = 10
    """Maximum number of concurrent evaluations for this evaluator."""

    sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None = None
    """Override additional sink(s) for this evaluator. If `None`, the config's
    `default_sink` is used.

    Sinks are *additive* to the default OTel event emission — not replacements.
    See [`EvaluationSink`][pydantic_evals.online.EvaluationSink]."""

    on_max_concurrency: OnMaxConcurrencyCallback | None = None
    """Called when an evaluation is dropped because `max_concurrency` was reached.

    Receives the `EvaluatorContext` that would have been evaluated. Can be sync or async.
    If `None` (the default), dropped evaluations are silently ignored.
    """
    on_sampling_error: OnSamplingErrorCallback | None = None
    """Called synchronously when a `sample_rate` callable raises an exception.

    Receives the exception and the evaluator. Must be sync (not async), since sampling
    runs before the decorated function. If set, the evaluator is skipped. If `None`,
    uses the config's `on_sampling_error` default. If neither is set, the exception
    propagates to the caller.
    """
    on_error: OnErrorCallback | None = None
    """Called when an exception occurs in a sink or on_max_concurrency callback.

    Receives the exception, evaluator context, evaluator instance, and a location string
    (see [`OnErrorLocation`][pydantic_evals.online.OnErrorLocation]). Can be sync or async.
    `'sink'` covers both custom sink failures and the rarer default OTel event emission
    failures — the value is intentionally broad.
    If `None`, uses the config's `on_error` default. If neither is set, exceptions are
    silently suppressed.
    """
    run_on_errors: bool = False
    """Whether to run this evaluator when the wrapped function/agent raises.

    When `False` (the default), the evaluator is skipped if the wrapped call raises —
    only successful results reach the evaluator. When `True`, the raised exception is
    passed as `EvaluatorContext.output` so the evaluator can score failure modes
    (e.g. count tool errors, classify exception types). The exception still propagates
    to the caller after dispatch.
    """

    def __post_init__(self) -> None:
        self.semaphore = threading.Semaphore(self.max_concurrency)

evaluator instance-attribute

evaluator: Evaluator

The evaluator to run.

To version an evaluator, override get_evaluator_version on the Evaluator subclass (see Evaluator docstring). The framework calls it at dispatch time and propagates the value to sinks alongside each result.

sample_rate class-attribute instance-attribute

sample_rate: (
    float | Callable[[SamplingContext], float | bool] | None
) = None

Probability of running this evaluator (0.0–1.0), or a callable returning a float or bool.

When a callable, it receives a SamplingContext with the function inputs, config metadata, and evaluator name — but not the output or duration (which aren't available yet at sampling time).

Defaults to None, which uses the config's default_sample_rate at each call. Set explicitly to override.

max_concurrency class-attribute instance-attribute

max_concurrency: int = 10

Maximum number of concurrent evaluations for this evaluator.

sink class-attribute instance-attribute

Override additional sink(s) for this evaluator. If None, the config's default_sink is used.

Sinks are additive to the default OTel event emission — not replacements. See EvaluationSink.

on_max_concurrency class-attribute instance-attribute

on_max_concurrency: OnMaxConcurrencyCallback | None = None

Called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored.

on_sampling_error class-attribute instance-attribute

on_sampling_error: OnSamplingErrorCallback | None = None

Called synchronously when a sample_rate callable raises an exception.

Receives the exception and the evaluator. Must be sync (not async), since sampling runs before the decorated function. If set, the evaluator is skipped. If None, uses the config's on_sampling_error default. If neither is set, the exception propagates to the caller.

on_error class-attribute instance-attribute

on_error: OnErrorCallback | None = None

Called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string (see OnErrorLocation). Can be sync or async. 'sink' covers both custom sink failures and the rarer default OTel event emission failures — the value is intentionally broad. If None, uses the config's on_error default. If neither is set, exceptions are silently suppressed.

run_on_errors class-attribute instance-attribute

run_on_errors: bool = False

Whether to run this evaluator when the wrapped function/agent raises.

When False (the default), the evaluator is skipped if the wrapped call raises — only successful results reach the evaluator. When True, the raised exception is passed as EvaluatorContext.output so the evaluator can score failure modes (e.g. count tool errors, classify exception types). The exception still propagates to the caller after dispatch.

EvaluatorContextSource

Bases: Protocol

Protocol for retrieving stored evaluator contexts.

Implementations reconstruct EvaluatorContext objects from stored traces (e.g., Logfire). The batch method allows fetching contexts for multiple spans in a single call.

Source code in pydantic_evals/pydantic_evals/online.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class EvaluatorContextSource(Protocol):
    """Protocol for retrieving stored evaluator contexts.

    Implementations reconstruct [`EvaluatorContext`][pydantic_evals.evaluators.EvaluatorContext]
    objects from stored traces (e.g., Logfire). The batch method allows fetching contexts
    for multiple spans in a single call.
    """

    async def fetch(self, span: SpanReference) -> EvaluatorContext:
        """Fetch an evaluator context for a single span.

        Args:
            span: Reference to the span to fetch context for.

        Returns:
            The evaluator context for the span.
        """
        return (await self.fetch_many([span]))[0]

    async def fetch_many(self, spans: Sequence[SpanReference]) -> list[EvaluatorContext]:
        """Fetch evaluator contexts for multiple spans in a single batch.

        Args:
            spans: References to the spans to fetch context for.

        Returns:
            Evaluator contexts in the same order as the input spans.
        """
        ...

fetch async

Fetch an evaluator context for a single span.

Parameters:

Name Type Description Default
span SpanReference

Reference to the span to fetch context for.

required

Returns:

Type Description
EvaluatorContext

The evaluator context for the span.

Source code in pydantic_evals/pydantic_evals/online.py
273
274
275
276
277
278
279
280
281
282
async def fetch(self, span: SpanReference) -> EvaluatorContext:
    """Fetch an evaluator context for a single span.

    Args:
        span: Reference to the span to fetch context for.

    Returns:
        The evaluator context for the span.
    """
    return (await self.fetch_many([span]))[0]

fetch_many async

fetch_many(
    spans: Sequence[SpanReference],
) -> list[EvaluatorContext]

Fetch evaluator contexts for multiple spans in a single batch.

Parameters:

Name Type Description Default
spans Sequence[SpanReference]

References to the spans to fetch context for.

required

Returns:

Type Description
list[EvaluatorContext]

Evaluator contexts in the same order as the input spans.

Source code in pydantic_evals/pydantic_evals/online.py
284
285
286
287
288
289
290
291
292
293
async def fetch_many(self, spans: Sequence[SpanReference]) -> list[EvaluatorContext]:
    """Fetch evaluator contexts for multiple spans in a single batch.

    Args:
        spans: References to the spans to fetch context for.

    Returns:
        Evaluator contexts in the same order as the input spans.
    """
    ...

run_evaluators async

run_evaluators(
    evaluators: Sequence[Evaluator],
    context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]

Run evaluators on a context and return results.

Useful for re-running evaluators from stored data.

Parameters:

Name Type Description Default
evaluators Sequence[Evaluator]

The evaluators to run.

required
context EvaluatorContext

The evaluator context to evaluate against.

required

Returns:

Type Description
tuple[list[EvaluationResult], list[EvaluatorFailure]]

A tuple of (results, failures).

Source code in pydantic_evals/pydantic_evals/online.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
async def run_evaluators(
    evaluators: Sequence[Evaluator],
    context: EvaluatorContext,
) -> tuple[list[EvaluationResult], list[EvaluatorFailure]]:
    """Run evaluators on a context and return results.

    Useful for re-running evaluators from stored data.

    Args:
        evaluators: The evaluators to run.
        context: The evaluator context to evaluate against.

    Returns:
        A tuple of (results, failures).
    """
    all_results: list[EvaluationResult] = []
    all_failures: list[EvaluatorFailure] = []

    async with anyio.create_task_group() as tg:
        results_by_index: dict[int, list[EvaluationResult] | EvaluatorFailure] = {}

        async def _run(idx: int, evaluator: Evaluator) -> None:
            results_by_index[idx] = await run_evaluator(evaluator, context)

        for i, evaluator in enumerate(evaluators):
            tg.start_soon(_run, i, evaluator)

    for i in range(len(evaluators)):
        result = results_by_index[i]
        if isinstance(result, EvaluatorFailure):
            all_failures.append(result)
        else:
            all_results.extend(result)

    return all_results, all_failures

OnlineEvalConfig dataclass

Holds cross-evaluator defaults for online evaluation.

Create instances for different evaluation configurations, or use the global DEFAULT_CONFIG via the module-level evaluate() and configure() functions.

Source code in pydantic_evals/pydantic_evals/online.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
@dataclass(kw_only=True)
class OnlineEvalConfig:
    """Holds cross-evaluator defaults for online evaluation.

    Create instances for different evaluation configurations, or use the global
    `DEFAULT_CONFIG` via the module-level `evaluate()` and `configure()` functions.
    """

    default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None = None
    """Additional sink(s) to receive results, for evaluators that don't specify their own.

    Sinks run *in addition to* the default `gen_ai.evaluation.result` OTel event
    emission — they are the escape hatch for custom destinations (in-memory test
    capture, fan-out to Slack/DB, non-OTel backends). To disable OTel emission
    itself, set [`emit_otel_events=False`][pydantic_evals.online.OnlineEvalConfig.emit_otel_events].
    """
    default_sample_rate: float | Callable[[SamplingContext], float | bool] = 1.0
    """Default sample rate for evaluators that don't specify their own."""
    emit_otel_events: bool = True
    """Whether to emit `gen_ai.evaluation.result` OTel events for every evaluator run.

    When `True` (the default), dispatch emits one OTel log event per `EvaluationResult`
    or `EvaluatorFailure`, following the [OTel GenAI evaluation semconv](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/#event-gen_aievaluationresult).
    If no OTel SDK is configured in the process, emission is a cheap no-op.

    Set to `False` to disable — useful for tests that want to assert on a custom
    sink alone, or in environments where OTel emission is undesirable. Custom
    sinks registered via `default_sink` still run regardless of this flag. With
    `emit_otel_events=False` AND no sinks configured, dispatch short-circuits
    entirely (the evaluator never runs) since results would have nowhere to go.
    """
    include_baggage: bool = True
    """Whether to copy OTel baggage entries onto every emitted evaluation event.

    When `True` (the default), each emitted `gen_ai.evaluation.result` event also
    carries the keys present in the current OTel baggage as attributes — useful
    for propagating tenant/user/request identifiers from the calling context.
    Standard `gen_ai.*` and `error.type` attributes always win on conflict, so
    baggage cannot accidentally overwrite the semantic-convention attributes.

    Set to `False` to skip the baggage snapshot per event.
    """
    sampling_mode: SamplingMode = 'independent'
    """Controls how per-evaluator sample rates interact for a single call.

    - `'independent'` (default): each evaluator decides independently.
    - `'correlated'`: a shared random seed is used so that lower-rate evaluators'
      calls are a subset of higher-rate ones, minimising total overhead.

    See [`SamplingMode`][pydantic_evals.online.SamplingMode] for details.
    """
    enabled: bool = True
    """Whether online evaluation is enabled for this config."""
    metadata: dict[str, Any] | None = None
    """Optional metadata to include in evaluator contexts."""
    on_max_concurrency: OnMaxConcurrencyCallback | None = None
    """Default handler called when an evaluation is dropped because `max_concurrency` was reached.

    Receives the `EvaluatorContext` that would have been evaluated. Can be sync or async.
    If `None` (the default), dropped evaluations are silently ignored.
    Per-evaluator `OnlineEvaluator.on_max_concurrency` overrides this default.
    """
    on_sampling_error: OnSamplingErrorCallback | None = None
    """Default handler called synchronously when a `sample_rate` callable raises.

    Receives the exception and the evaluator. Must be sync (not async).
    If set, the evaluator is skipped. If `None` (the default), the exception
    propagates to the caller.
    Per-evaluator `OnlineEvaluator.on_sampling_error` overrides this default.
    """
    on_error: OnErrorCallback | None = None
    """Default handler called when an exception occurs in a sink or on_max_concurrency callback.

    Receives the exception, evaluator context, evaluator instance, and a location string
    (see [`OnErrorLocation`][pydantic_evals.online.OnErrorLocation]). Can be sync or async.
    `'sink'` covers both custom sink failures and the rarer default OTel event emission
    failures — the value is intentionally broad.
    If `None` (the default), exceptions are silently suppressed.
    Per-evaluator `OnlineEvaluator.on_error` overrides this default.
    """

    def evaluate(
        self,
        *evaluators: Evaluator | OnlineEvaluator,
        target: str | None = None,
        msg_template: LiteralString | None = None,
        span_name: str | None = None,
        extract_args: bool | Iterable[str] = False,
        record_return: bool = False,
    ) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
        """Decorator to attach online evaluators to a function.

        Each decorated call opens a dedicated span representing the function
        invocation — evaluator events are parented to this span, and the span
        itself appears in the user's configured OTel/logfire traces.

        Bare `Evaluator` instances are auto-wrapped in `OnlineEvaluator` at decoration time
        (so concurrency semaphores are shared across calls). Their `sample_rate` defaults to
        `None`, which resolves to the config's `default_sample_rate` at each call — so
        changes to the config after decoration take effect.

        To version an evaluator, override
        [`get_evaluator_version`][pydantic_evals.evaluators.Evaluator.get_evaluator_version] on the
        `Evaluator` subclass — the framework calls it at dispatch time and records the value on every
        [`EvaluationResult`][pydantic_evals.evaluators.EvaluationResult] and
        [`EvaluatorFailure`][pydantic_evals.evaluators.EvaluatorFailure] the evaluator emits:

        ```python
        from dataclasses import dataclass

        from pydantic_evals.evaluators import Evaluator, EvaluatorContext
        from pydantic_evals.online import evaluate


        @dataclass
        class Tone(Evaluator):
            def evaluate(self, ctx: EvaluatorContext) -> str:
                return 'neutral'

            def get_evaluator_version(self) -> str | None:
                return 'v2'


        @evaluate(Tone())
        async def summarize(text: str) -> str:
            return text
        ```

        Args:
            *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.
            target: Name of the thing being evaluated. Written to sinks and emitted
                OTel events as `gen_ai.evaluation.target`. Defaults to the decorated
                function's `__name__` when omitted.
            msg_template: Template for the call span's message. Defaults to
                `"Calling {module}.{qualname}"` like `@logfire.instrument`.
                When logfire is installed, `{arg=}`-style placeholders in the
                template are formatted against the function's arguments.
            span_name: Override for the call span's name. Defaults to `msg_template`.
            extract_args: Whether to record function arguments as span attributes.
                `False` (default) records nothing; `True` records all bound arguments;
                an iterable of names records only those arguments. Requires logfire
                to be installed so arguments are serialised with their JSON schema —
                raises `RuntimeError` at decoration time otherwise.
            record_return: Whether to record the function's return value as a `return`
                span attribute. Requires logfire for the same reason as `extract_args`.

        Returns:
            A decorator that wraps the function with online evaluation.
        """
        online_evals = [e if isinstance(e, OnlineEvaluator) else OnlineEvaluator(evaluator=e) for e in evaluators]

        if (extract_args or record_return) and not _LOGFIRE_INSTALLED:
            raise RuntimeError(
                'extract_args and record_return require logfire to be installed for argument and '
                'return-value serialization. Install `pydantic-evals[logfire]` (or disable both '
                'options) to use them.'
            )

        # These options are intentionally decorator-only for now so they stay close to
        # `@logfire.instrument`'s shape; we can lift them to `OnlineEvalConfig` as
        # defaults if users ask for it.

        def decorator(func: Callable[_P, _R]) -> Callable[_P, _R]:
            resolved_target = target if target is not None else func.__name__
            sig = inspect.signature(func)
            resolved_extract_args = _resolve_extract_args(func, sig, extract_args)
            resolved_msg_template = msg_template if msg_template is not None else _default_call_span_name(func)
            call_span = _CallSpanSpec(
                msg_template=resolved_msg_template,
                span_name=span_name,
                extract_args=resolved_extract_args,
                record_return=record_return,
            )
            if inspect.iscoroutinefunction(func):
                # ParamSpec can't distinguish async from sync return types — _wrap_async returns
                # Callable[_P, Awaitable[_R]] but the decorator signature expects Callable[_P, _R]
                return _wrap_async(func, sig, online_evals, self, resolved_target, call_span)  # pyright: ignore[reportReturnType]
            else:
                return _wrap_sync(func, sig, online_evals, self, resolved_target, call_span)

        return decorator

    def should_evaluate(self) -> bool:
        """Whether evaluators with this config should run, based on the current settings and context."""
        return self.enabled and not _EVALUATION_DISABLED.get() and _task_run.CURRENT_TASK_RUN.get() is None

default_sink class-attribute instance-attribute

default_sink: (
    EvaluationSink
    | Sequence[EvaluationSink | SinkCallback]
    | SinkCallback
    | None
) = None

Additional sink(s) to receive results, for evaluators that don't specify their own.

Sinks run in addition to the default gen_ai.evaluation.result OTel event emission — they are the escape hatch for custom destinations (in-memory test capture, fan-out to Slack/DB, non-OTel backends). To disable OTel emission itself, set emit_otel_events=False.

default_sample_rate class-attribute instance-attribute

default_sample_rate: (
    float | Callable[[SamplingContext], float | bool]
) = 1.0

Default sample rate for evaluators that don't specify their own.

emit_otel_events class-attribute instance-attribute

emit_otel_events: bool = True

Whether to emit gen_ai.evaluation.result OTel events for every evaluator run.

When True (the default), dispatch emits one OTel log event per EvaluationResult or EvaluatorFailure, following the OTel GenAI evaluation semconv. If no OTel SDK is configured in the process, emission is a cheap no-op.

Set to False to disable — useful for tests that want to assert on a custom sink alone, or in environments where OTel emission is undesirable. Custom sinks registered via default_sink still run regardless of this flag. With emit_otel_events=False AND no sinks configured, dispatch short-circuits entirely (the evaluator never runs) since results would have nowhere to go.

include_baggage class-attribute instance-attribute

include_baggage: bool = True

Whether to copy OTel baggage entries onto every emitted evaluation event.

When True (the default), each emitted gen_ai.evaluation.result event also carries the keys present in the current OTel baggage as attributes — useful for propagating tenant/user/request identifiers from the calling context. Standard gen_ai.* and error.type attributes always win on conflict, so baggage cannot accidentally overwrite the semantic-convention attributes.

Set to False to skip the baggage snapshot per event.

sampling_mode class-attribute instance-attribute

sampling_mode: SamplingMode = 'independent'

Controls how per-evaluator sample rates interact for a single call.

  • 'independent' (default): each evaluator decides independently.
  • 'correlated': a shared random seed is used so that lower-rate evaluators' calls are a subset of higher-rate ones, minimising total overhead.

See SamplingMode for details.

enabled class-attribute instance-attribute

enabled: bool = True

Whether online evaluation is enabled for this config.

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

Optional metadata to include in evaluator contexts.

on_max_concurrency class-attribute instance-attribute

on_max_concurrency: OnMaxConcurrencyCallback | None = None

Default handler called when an evaluation is dropped because max_concurrency was reached.

Receives the EvaluatorContext that would have been evaluated. Can be sync or async. If None (the default), dropped evaluations are silently ignored. Per-evaluator OnlineEvaluator.on_max_concurrency overrides this default.

on_sampling_error class-attribute instance-attribute

on_sampling_error: OnSamplingErrorCallback | None = None

Default handler called synchronously when a sample_rate callable raises.

Receives the exception and the evaluator. Must be sync (not async). If set, the evaluator is skipped. If None (the default), the exception propagates to the caller. Per-evaluator OnlineEvaluator.on_sampling_error overrides this default.

on_error class-attribute instance-attribute

on_error: OnErrorCallback | None = None

Default handler called when an exception occurs in a sink or on_max_concurrency callback.

Receives the exception, evaluator context, evaluator instance, and a location string (see OnErrorLocation). Can be sync or async. 'sink' covers both custom sink failures and the rarer default OTel event emission failures — the value is intentionally broad. If None (the default), exceptions are silently suppressed. Per-evaluator OnlineEvaluator.on_error overrides this default.

evaluate

evaluate(
    *evaluators: Evaluator | OnlineEvaluator,
    target: str | None = None,
    msg_template: LiteralString | None = None,
    span_name: str | None = None,
    extract_args: bool | Iterable[str] = False,
    record_return: bool = False
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function.

Each decorated call opens a dedicated span representing the function invocation — evaluator events are parented to this span, and the span itself appears in the user's configured OTel/logfire traces.

Bare Evaluator instances are auto-wrapped in OnlineEvaluator at decoration time (so concurrency semaphores are shared across calls). Their sample_rate defaults to None, which resolves to the config's default_sample_rate at each call — so changes to the config after decoration take effect.

To version an evaluator, override get_evaluator_version on the Evaluator subclass — the framework calls it at dispatch time and records the value on every EvaluationResult and EvaluatorFailure the evaluator emits:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class Tone(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> str:
        return 'neutral'

    def get_evaluator_version(self) -> str | None:
        return 'v2'


@evaluate(Tone())
async def summarize(text: str) -> str:
    return text

Parameters:

Name Type Description Default
*evaluators Evaluator | OnlineEvaluator

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

()
target str | None

Name of the thing being evaluated. Written to sinks and emitted OTel events as gen_ai.evaluation.target. Defaults to the decorated function's __name__ when omitted.

None
msg_template LiteralString | None

Template for the call span's message. Defaults to "Calling {module}.{qualname}" like @logfire.instrument. When logfire is installed, {arg=}-style placeholders in the template are formatted against the function's arguments.

None
span_name str | None

Override for the call span's name. Defaults to msg_template.

None
extract_args bool | Iterable[str]

Whether to record function arguments as span attributes. False (default) records nothing; True records all bound arguments; an iterable of names records only those arguments. Requires logfire to be installed so arguments are serialised with their JSON schema — raises RuntimeError at decoration time otherwise.

False
record_return bool

Whether to record the function's return value as a return span attribute. Requires logfire for the same reason as extract_args.

False

Returns:

Type Description
Callable[[Callable[_P, _R]], Callable[_P, _R]]

A decorator that wraps the function with online evaluation.

Source code in pydantic_evals/pydantic_evals/online.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def evaluate(
    self,
    *evaluators: Evaluator | OnlineEvaluator,
    target: str | None = None,
    msg_template: LiteralString | None = None,
    span_name: str | None = None,
    extract_args: bool | Iterable[str] = False,
    record_return: bool = False,
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
    """Decorator to attach online evaluators to a function.

    Each decorated call opens a dedicated span representing the function
    invocation — evaluator events are parented to this span, and the span
    itself appears in the user's configured OTel/logfire traces.

    Bare `Evaluator` instances are auto-wrapped in `OnlineEvaluator` at decoration time
    (so concurrency semaphores are shared across calls). Their `sample_rate` defaults to
    `None`, which resolves to the config's `default_sample_rate` at each call — so
    changes to the config after decoration take effect.

    To version an evaluator, override
    [`get_evaluator_version`][pydantic_evals.evaluators.Evaluator.get_evaluator_version] on the
    `Evaluator` subclass — the framework calls it at dispatch time and records the value on every
    [`EvaluationResult`][pydantic_evals.evaluators.EvaluationResult] and
    [`EvaluatorFailure`][pydantic_evals.evaluators.EvaluatorFailure] the evaluator emits:

    ```python
    from dataclasses import dataclass

    from pydantic_evals.evaluators import Evaluator, EvaluatorContext
    from pydantic_evals.online import evaluate


    @dataclass
    class Tone(Evaluator):
        def evaluate(self, ctx: EvaluatorContext) -> str:
            return 'neutral'

        def get_evaluator_version(self) -> str | None:
            return 'v2'


    @evaluate(Tone())
    async def summarize(text: str) -> str:
        return text
    ```

    Args:
        *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.
        target: Name of the thing being evaluated. Written to sinks and emitted
            OTel events as `gen_ai.evaluation.target`. Defaults to the decorated
            function's `__name__` when omitted.
        msg_template: Template for the call span's message. Defaults to
            `"Calling {module}.{qualname}"` like `@logfire.instrument`.
            When logfire is installed, `{arg=}`-style placeholders in the
            template are formatted against the function's arguments.
        span_name: Override for the call span's name. Defaults to `msg_template`.
        extract_args: Whether to record function arguments as span attributes.
            `False` (default) records nothing; `True` records all bound arguments;
            an iterable of names records only those arguments. Requires logfire
            to be installed so arguments are serialised with their JSON schema —
            raises `RuntimeError` at decoration time otherwise.
        record_return: Whether to record the function's return value as a `return`
            span attribute. Requires logfire for the same reason as `extract_args`.

    Returns:
        A decorator that wraps the function with online evaluation.
    """
    online_evals = [e if isinstance(e, OnlineEvaluator) else OnlineEvaluator(evaluator=e) for e in evaluators]

    if (extract_args or record_return) and not _LOGFIRE_INSTALLED:
        raise RuntimeError(
            'extract_args and record_return require logfire to be installed for argument and '
            'return-value serialization. Install `pydantic-evals[logfire]` (or disable both '
            'options) to use them.'
        )

    # These options are intentionally decorator-only for now so they stay close to
    # `@logfire.instrument`'s shape; we can lift them to `OnlineEvalConfig` as
    # defaults if users ask for it.

    def decorator(func: Callable[_P, _R]) -> Callable[_P, _R]:
        resolved_target = target if target is not None else func.__name__
        sig = inspect.signature(func)
        resolved_extract_args = _resolve_extract_args(func, sig, extract_args)
        resolved_msg_template = msg_template if msg_template is not None else _default_call_span_name(func)
        call_span = _CallSpanSpec(
            msg_template=resolved_msg_template,
            span_name=span_name,
            extract_args=resolved_extract_args,
            record_return=record_return,
        )
        if inspect.iscoroutinefunction(func):
            # ParamSpec can't distinguish async from sync return types — _wrap_async returns
            # Callable[_P, Awaitable[_R]] but the decorator signature expects Callable[_P, _R]
            return _wrap_async(func, sig, online_evals, self, resolved_target, call_span)  # pyright: ignore[reportReturnType]
        else:
            return _wrap_sync(func, sig, online_evals, self, resolved_target, call_span)

    return decorator

should_evaluate

should_evaluate() -> bool

Whether evaluators with this config should run, based on the current settings and context.

Source code in pydantic_evals/pydantic_evals/online.py
592
593
594
def should_evaluate(self) -> bool:
    """Whether evaluators with this config should run, based on the current settings and context."""
    return self.enabled and not _EVALUATION_DISABLED.get() and _task_run.CURRENT_TASK_RUN.get() is None

DEFAULT_CONFIG module-attribute

DEFAULT_CONFIG = OnlineEvalConfig()

The global default OnlineEvalConfig instance.

Module-level functions like evaluate() and configure() delegate to this instance.

evaluate

evaluate(
    *evaluators: Evaluator | OnlineEvaluator,
    target: str | None = None,
    msg_template: LiteralString | None = None,
    span_name: str | None = None,
    extract_args: bool | Iterable[str] = False,
    record_return: bool = False
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]

Decorator to attach online evaluators to a function using the global default config.

Equivalent to DEFAULT_CONFIG.evaluate(...).

Parameters:

Name Type Description Default
*evaluators Evaluator | OnlineEvaluator

Evaluators to attach. Can be Evaluator or OnlineEvaluator instances.

()
target str | None

Name of the thing being evaluated. Written to sinks and emitted OTel events as gen_ai.evaluation.target. Defaults to the decorated function's __name__ when omitted.

None
msg_template LiteralString | None

Template for the call span's message. Defaults to "Calling {module}.{qualname}" like @logfire.instrument.

None
span_name str | None

Override for the call span's name. Defaults to msg_template.

None
extract_args bool | Iterable[str]

Whether to record function arguments as span attributes. False (default) records nothing; True records all bound arguments; an iterable of names records only those arguments. Requires logfire to be installed — raises RuntimeError at decoration time otherwise.

False
record_return bool

Whether to record the function's return value as a return span attribute. Requires logfire for the same reason as extract_args.

False

Returns:

Type Description
Callable[[Callable[_P, _R]], Callable[_P, _R]]

A decorator that wraps the function with online evaluation.

Example:

from dataclasses import dataclass

from pydantic_evals.evaluators import Evaluator, EvaluatorContext
from pydantic_evals.online import evaluate


@dataclass
class IsNonEmpty(Evaluator):
    def evaluate(self, ctx: EvaluatorContext) -> bool:
        return bool(ctx.output)


@evaluate(IsNonEmpty())
async def my_function(x: int) -> int:
    return x

Source code in pydantic_evals/pydantic_evals/online.py
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
def evaluate(
    *evaluators: Evaluator | OnlineEvaluator,
    target: str | None = None,
    msg_template: LiteralString | None = None,
    span_name: str | None = None,
    extract_args: bool | Iterable[str] = False,
    record_return: bool = False,
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
    """Decorator to attach online evaluators to a function using the global default config.

    Equivalent to `DEFAULT_CONFIG.evaluate(...)`.

    Args:
        *evaluators: Evaluators to attach. Can be `Evaluator` or `OnlineEvaluator` instances.
        target: Name of the thing being evaluated. Written to sinks and emitted
            OTel events as `gen_ai.evaluation.target`. Defaults to the decorated
            function's `__name__` when omitted.
        msg_template: Template for the call span's message. Defaults to
            `"Calling {module}.{qualname}"` like `@logfire.instrument`.
        span_name: Override for the call span's name. Defaults to `msg_template`.
        extract_args: Whether to record function arguments as span attributes.
            `False` (default) records nothing; `True` records all bound arguments;
            an iterable of names records only those arguments. Requires logfire
            to be installed — raises `RuntimeError` at decoration time otherwise.
        record_return: Whether to record the function's return value as a `return`
            span attribute. Requires logfire for the same reason as `extract_args`.

    Returns:
        A decorator that wraps the function with online evaluation.

    Example:
    ```python
    from dataclasses import dataclass

    from pydantic_evals.evaluators import Evaluator, EvaluatorContext
    from pydantic_evals.online import evaluate


    @dataclass
    class IsNonEmpty(Evaluator):
        def evaluate(self, ctx: EvaluatorContext) -> bool:
            return bool(ctx.output)


    @evaluate(IsNonEmpty())
    async def my_function(x: int) -> int:
        return x
    ```
    """
    return DEFAULT_CONFIG.evaluate(
        *evaluators,
        target=target,
        msg_template=msg_template,
        span_name=span_name,
        extract_args=extract_args,
        record_return=record_return,
    )

configure

configure(
    *,
    default_sink: (
        EvaluationSink
        | Sequence[EvaluationSink | SinkCallback]
        | SinkCallback
        | None
        | Unset
    ) = UNSET,
    default_sample_rate: (
        float
        | Callable[[SamplingContext], float | bool]
        | Unset
    ) = UNSET,
    sampling_mode: SamplingMode | Unset = UNSET,
    enabled: bool | Unset = UNSET,
    metadata: dict[str, Any] | None | Unset = UNSET,
    on_max_concurrency: (
        OnMaxConcurrencyCallback | None | Unset
    ) = UNSET,
    on_sampling_error: (
        OnSamplingErrorCallback | None | Unset
    ) = UNSET,
    on_error: OnErrorCallback | None | Unset = UNSET,
    emit_otel_events: bool | Unset = UNSET,
    include_baggage: bool | Unset = UNSET
) -> None

Configure the global default OnlineEvalConfig.

Only provided values are updated; unset arguments are ignored. Pass None explicitly to clear default_sink, metadata, on_max_concurrency, on_sampling_error, or on_error.

Parameters:

Name Type Description Default
default_sink EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset

Default sink(s) for evaluators. Pass None to clear.

UNSET
default_sample_rate float | Callable[[SamplingContext], float | bool] | Unset

Default sample rate for evaluators.

UNSET
sampling_mode SamplingMode | Unset

Sampling mode ('independent' or 'correlated').

UNSET
enabled bool | Unset

Whether online evaluation is enabled.

UNSET
metadata dict[str, Any] | None | Unset

Metadata to include in evaluator contexts. Pass None to clear.

UNSET
on_max_concurrency OnMaxConcurrencyCallback | None | Unset

Default handler for dropped evaluations. Pass None to clear.

UNSET
on_sampling_error OnSamplingErrorCallback | None | Unset

Default handler for sample_rate exceptions. Pass None to clear.

UNSET
on_error OnErrorCallback | None | Unset

Default handler for pipeline exceptions. Pass None to clear.

UNSET
emit_otel_events bool | Unset

Whether to emit gen_ai.evaluation.result OTel events.

UNSET
include_baggage bool | Unset

Whether to copy current OTel baggage onto every emitted event.

UNSET
Source code in pydantic_evals/pydantic_evals/online.py
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def configure(
    *,
    default_sink: EvaluationSink | Sequence[EvaluationSink | SinkCallback] | SinkCallback | None | Unset = UNSET,
    default_sample_rate: float | Callable[[SamplingContext], float | bool] | Unset = UNSET,
    sampling_mode: SamplingMode | Unset = UNSET,
    enabled: bool | Unset = UNSET,
    metadata: dict[str, Any] | None | Unset = UNSET,
    on_max_concurrency: OnMaxConcurrencyCallback | None | Unset = UNSET,
    on_sampling_error: OnSamplingErrorCallback | None | Unset = UNSET,
    on_error: OnErrorCallback | None | Unset = UNSET,
    emit_otel_events: bool | Unset = UNSET,
    include_baggage: bool | Unset = UNSET,
) -> None:
    """Configure the global default `OnlineEvalConfig`.

    Only provided values are updated; unset arguments are ignored.
    Pass `None` explicitly to clear `default_sink`, `metadata`, `on_max_concurrency`,
    `on_sampling_error`, or `on_error`.

    Args:
        default_sink: Default sink(s) for evaluators. Pass `None` to clear.
        default_sample_rate: Default sample rate for evaluators.
        sampling_mode: Sampling mode (`'independent'` or `'correlated'`).
        enabled: Whether online evaluation is enabled.
        metadata: Metadata to include in evaluator contexts. Pass `None` to clear.
        on_max_concurrency: Default handler for dropped evaluations. Pass `None` to clear.
        on_sampling_error: Default handler for sample_rate exceptions. Pass `None` to clear.
        on_error: Default handler for pipeline exceptions. Pass `None` to clear.
        emit_otel_events: Whether to emit `gen_ai.evaluation.result` OTel events.
        include_baggage: Whether to copy current OTel baggage onto every emitted event.
    """
    if not isinstance(default_sink, Unset):
        DEFAULT_CONFIG.default_sink = default_sink
    if not isinstance(default_sample_rate, Unset):
        DEFAULT_CONFIG.default_sample_rate = default_sample_rate
    if not isinstance(sampling_mode, Unset):
        DEFAULT_CONFIG.sampling_mode = sampling_mode
    if not isinstance(enabled, Unset):
        DEFAULT_CONFIG.enabled = enabled
    if not isinstance(metadata, Unset):
        DEFAULT_CONFIG.metadata = metadata
    if not isinstance(on_max_concurrency, Unset):
        DEFAULT_CONFIG.on_max_concurrency = on_max_concurrency
    if not isinstance(on_sampling_error, Unset):
        DEFAULT_CONFIG.on_sampling_error = on_sampling_error
    if not isinstance(on_error, Unset):
        DEFAULT_CONFIG.on_error = on_error
    if not isinstance(emit_otel_events, Unset):
        DEFAULT_CONFIG.emit_otel_events = emit_otel_events
    if not isinstance(include_baggage, Unset):
        DEFAULT_CONFIG.include_baggage = include_baggage

wait_for_evaluations async

wait_for_evaluations(*, timeout: float = 30.0) -> None

Wait for all pending background evaluation tasks and threads to complete.

This is useful in tests to deterministically wait for background evaluators to finish instead of relying on timing-based sleeps.

For async decorated functions, evaluators run as tasks on the caller's event loop and are awaited directly. For sync decorated functions, evaluators run in background threads which are joined with the given timeout.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for each background thread. Defaults to 30.

30.0
Source code in pydantic_evals/pydantic_evals/online.py
950
951
952
953
954
955
956
957
958
959
960
961
962
963
async def wait_for_evaluations(*, timeout: float = 30.0) -> None:
    """Wait for all pending background evaluation tasks and threads to complete.

    This is useful in tests to deterministically wait for background evaluators
    to finish instead of relying on timing-based sleeps.

    For async decorated functions, evaluators run as tasks on the caller's event loop
    and are awaited directly. For sync decorated functions, evaluators run in background
    threads which are joined with the given timeout.

    Args:
        timeout: Maximum seconds to wait for each background thread. Defaults to 30.
    """
    await _online_internal.wait_for_evaluations(timeout=timeout)