Distributed Evaluator

Run evaluations at scale with blocking, async, or distributed execution. Backends for ThreadPool, Celery, Ray, Temporal, and Kubernetes. Built-in resilience.

📝
TL;DR
  • Three modes: blocking (sync), non-blocking (async), distributed (via backends)
  • Backends: ThreadPool (default), Celery, Ray, Temporal, Kubernetes
  • Built-in resilience: circuit breakers, rate limiting, retries, graceful degradation

The FrameworkEvaluator runs evaluations across execution modes — synchronous for development, async for low-latency production, or distributed across workers for scale. Wrap any evaluation (built-in or custom) and the framework handles orchestration, error recovery, and OpenTelemetry span enrichment.

Note

Requires pip install ai-evaluation. For distributed backends, also install: ai-evaluation[celery], ai-evaluation[ray], or ai-evaluation[temporal].

Quick Example

from fi.evals.framework import blocking_evaluator, custom_eval

@custom_eval(name="length_check", required_fields=["response"])
def check_length(inputs):
    length = len(inputs["response"])
    return {"score": min(length / 100, 1.0), "passed": length > 20}

evaluator = blocking_evaluator(check_length)
result = evaluator.run({"response": "This is a detailed answer with enough content."})

print(result.batch.success_rate)  # 1.0
for r in result.batch.results:
    print(f"{r.eval_name}: score={r.value.score}, passed={r.value.passed}")

Execution Modes

ModeFactoryWhen to use
BLOCKINGblocking_evaluator()Development, testing, simple scripts
NON_BLOCKINGasync_evaluator()Production APIs where latency matters
DISTRIBUTEDdistributed_evaluator()Large-scale batch runs across workers

Blocking (synchronous)

Runs evaluations and waits for results.

from fi.evals.framework import blocking_evaluator

evaluator = blocking_evaluator(eval1, eval2, eval3, fail_fast=True)
result = evaluator.run({"response": "...", "context": "..."})

print(result.batch.success_rate)
for r in result.batch.results:
    print(f"{r.eval_name}: {r.value}")

Non-blocking (async)

Returns immediately with a future. Results compute in background threads.

from fi.evals.framework import async_evaluator

evaluator = async_evaluator(eval1, eval2, max_workers=8)
result = evaluator.run({"response": "..."})

# Do other work...
batch = result.wait(timeout=30)
print(batch.success_rate)

Distributed

Sends evaluations to a backend for execution across workers.

from fi.evals.framework import distributed_evaluator
from fi.evals.framework.backends import CeleryBackend, CeleryConfig

backend = CeleryBackend(CeleryConfig(broker_url="redis://localhost:6379"))
evaluator = distributed_evaluator(eval1, eval2, backend=backend)
result = evaluator.run({"response": "..."})

Backends

ThreadPool (default)

Used by async_evaluator(). No extra dependencies.

from fi.evals.framework.backends import ThreadPoolBackend, ThreadPoolConfig

backend = ThreadPoolBackend(ThreadPoolConfig(max_workers=8, timeout_seconds=60))

Celery

Distributed task queue. Requires pip install ai-evaluation[celery].

from fi.evals.framework.backends import CeleryBackend, CeleryConfig

backend = CeleryBackend(CeleryConfig(
    broker_url="redis://localhost:6379",
    max_workers=16,
    timeout_seconds=300,
))

Ray

Distributed computing. Requires pip install ai-evaluation[ray].

from fi.evals.framework.backends import RayBackend, RayConfig

backend = RayBackend(RayConfig(max_workers=32))

Temporal

Durable workflow execution. Requires pip install ai-evaluation[temporal].

from fi.evals.framework.backends import TemporalBackend, TemporalConfig

backend = TemporalBackend(TemporalConfig(
    host="localhost:7233",
    namespace="evaluations",
))

Resilience

Wrap any backend with circuit breakers, rate limiting, retries, and graceful degradation.

from fi.evals.framework import resilient_evaluator
from fi.evals.framework.resilience import (
    ResilienceConfig, CircuitBreakerConfig, RateLimitConfig, RetryConfig,
)

evaluator = resilient_evaluator(
    eval1, eval2,
    resilience=ResilienceConfig(
        circuit_breaker=CircuitBreakerConfig(failure_threshold=5, timeout_seconds=30),
        rate_limit=RateLimitConfig(requests_per_second=10, burst_size=20),
        retry=RetryConfig(max_retries=3, exponential_base=2.0, jitter=True),
    ),
    fallback_backend=ThreadPoolBackend(),
)

result = evaluator.run({"response": "..."})

Presets

config = ResilienceConfig.default()   # balanced defaults
config = ResilienceConfig.minimal()   # retries only
config = ResilienceConfig.strict()    # aggressive circuit breaking

Custom Evaluations

Build your own scoring logic and run it through the framework.

Decorator

from fi.evals.framework import custom_eval

@custom_eval(name="tone_check", required_fields=["response"], threshold=0.7)
def check_tone(inputs):
    response = inputs["response"]
    is_professional = "dear" in response.lower() or "regards" in response.lower()
    return {"score": 1.0 if is_professional else 0.3, "passed": is_professional}

Simple (one-liner)

from fi.evals.framework import simple_eval

length_check = simple_eval(
    name="min_length",
    scorer=lambda inputs: min(len(inputs["response"]) / 100, 1.0),
    threshold=0.5,
    required_fields=["response"],
)

Builder

from fi.evals.framework import EvalBuilder

my_eval = (
    EvalBuilder("custom_relevance")
    .version("2.0.0")
    .require("response", "query")
    .threshold(0.8)
    .evaluator(lambda inputs: {
        "score": 0.9,
        "passed": True,
        "details": {"method": "keyword_overlap"},
    })
    .build()
)

Mixing custom + built-in

from fi.evals.framework import blocking_evaluator, custom_eval, simple_eval
from fi.evals import evaluate as run_eval

@custom_eval(name="toxicity_wrapper", required_fields=["response"])
def toxicity_check(inputs):
    result = run_eval("toxicity", output=inputs["response"], model="turing_flash")
    return {"score": result.score, "passed": result.passed}

length_check = simple_eval("min_length", lambda i: min(len(i["response"]) / 100, 1.0))

evaluator = blocking_evaluator(toxicity_check, length_check)
result = evaluator.run({"response": "This is a helpful answer."})

Result Types

EvaluatorResult

Returned by evaluator.run().

Field/MethodTypeDescription
.batchBatchEvalResultResults (blocking mode)
.futureBatchEvalFutureFuture (non-blocking mode)
.is_futureboolWhether result is a future
.wait(timeout)BatchEvalResultBlock until done

BatchEvalResult

Field/MethodTypeDescription
.success_ratefloat0.0 to 1.0
.avg_latency_msfloatAverage per-evaluation time
.total_countintNumber of evaluations
.success_countintPassed evaluations
.failure_countintFailed evaluations
.get_by_name(name)listResults for a specific evaluation
.get_failures()listOnly failed results
Was this page helpful?

Questions & Discussion