Distributed Evaluator
Run evaluations at scale with blocking, async, or distributed execution. Backends for ThreadPool, Celery, Ray, Temporal, and Kubernetes. Built-in resilience.
- Three modes: blocking (sync), non-blocking (async), distributed (via backends)
- Backends: ThreadPool (default), Celery, Ray, Temporal, Kubernetes
- Built-in resilience: circuit breakers, rate limiting, retries, graceful degradation
The FrameworkEvaluator runs evaluations across execution modes — synchronous for development, async for low-latency production, or distributed across workers for scale. Wrap any evaluation (built-in or custom) and the framework handles orchestration, error recovery, and OpenTelemetry span enrichment.
Note
Requires pip install ai-evaluation. For distributed backends, also install: ai-evaluation[celery], ai-evaluation[ray], or ai-evaluation[temporal].
Quick Example
from fi.evals.framework import blocking_evaluator, custom_eval
@custom_eval(name="length_check", required_fields=["response"])
def check_length(inputs):
length = len(inputs["response"])
return {"score": min(length / 100, 1.0), "passed": length > 20}
evaluator = blocking_evaluator(check_length)
result = evaluator.run({"response": "This is a detailed answer with enough content."})
print(result.batch.success_rate) # 1.0
for r in result.batch.results:
print(f"{r.eval_name}: score={r.value.score}, passed={r.value.passed}")
Execution Modes
| Mode | Factory | When to use |
|---|---|---|
BLOCKING | blocking_evaluator() | Development, testing, simple scripts |
NON_BLOCKING | async_evaluator() | Production APIs where latency matters |
DISTRIBUTED | distributed_evaluator() | Large-scale batch runs across workers |
Blocking (synchronous)
Runs evaluations and waits for results.
from fi.evals.framework import blocking_evaluator
evaluator = blocking_evaluator(eval1, eval2, eval3, fail_fast=True)
result = evaluator.run({"response": "...", "context": "..."})
print(result.batch.success_rate)
for r in result.batch.results:
print(f"{r.eval_name}: {r.value}")
Non-blocking (async)
Returns immediately with a future. Results compute in background threads.
from fi.evals.framework import async_evaluator
evaluator = async_evaluator(eval1, eval2, max_workers=8)
result = evaluator.run({"response": "..."})
# Do other work...
batch = result.wait(timeout=30)
print(batch.success_rate)
Distributed
Sends evaluations to a backend for execution across workers.
from fi.evals.framework import distributed_evaluator
from fi.evals.framework.backends import CeleryBackend, CeleryConfig
backend = CeleryBackend(CeleryConfig(broker_url="redis://localhost:6379"))
evaluator = distributed_evaluator(eval1, eval2, backend=backend)
result = evaluator.run({"response": "..."})
Backends
ThreadPool (default)
Used by async_evaluator(). No extra dependencies.
from fi.evals.framework.backends import ThreadPoolBackend, ThreadPoolConfig
backend = ThreadPoolBackend(ThreadPoolConfig(max_workers=8, timeout_seconds=60))
Celery
Distributed task queue. Requires pip install ai-evaluation[celery].
from fi.evals.framework.backends import CeleryBackend, CeleryConfig
backend = CeleryBackend(CeleryConfig(
broker_url="redis://localhost:6379",
max_workers=16,
timeout_seconds=300,
))
Ray
Distributed computing. Requires pip install ai-evaluation[ray].
from fi.evals.framework.backends import RayBackend, RayConfig
backend = RayBackend(RayConfig(max_workers=32))
Temporal
Durable workflow execution. Requires pip install ai-evaluation[temporal].
from fi.evals.framework.backends import TemporalBackend, TemporalConfig
backend = TemporalBackend(TemporalConfig(
host="localhost:7233",
namespace="evaluations",
))
Resilience
Wrap any backend with circuit breakers, rate limiting, retries, and graceful degradation.
from fi.evals.framework import resilient_evaluator
from fi.evals.framework.resilience import (
ResilienceConfig, CircuitBreakerConfig, RateLimitConfig, RetryConfig,
)
evaluator = resilient_evaluator(
eval1, eval2,
resilience=ResilienceConfig(
circuit_breaker=CircuitBreakerConfig(failure_threshold=5, timeout_seconds=30),
rate_limit=RateLimitConfig(requests_per_second=10, burst_size=20),
retry=RetryConfig(max_retries=3, exponential_base=2.0, jitter=True),
),
fallback_backend=ThreadPoolBackend(),
)
result = evaluator.run({"response": "..."})
Presets
config = ResilienceConfig.default() # balanced defaults
config = ResilienceConfig.minimal() # retries only
config = ResilienceConfig.strict() # aggressive circuit breaking
Custom Evaluations
Build your own scoring logic and run it through the framework.
Decorator
from fi.evals.framework import custom_eval
@custom_eval(name="tone_check", required_fields=["response"], threshold=0.7)
def check_tone(inputs):
response = inputs["response"]
is_professional = "dear" in response.lower() or "regards" in response.lower()
return {"score": 1.0 if is_professional else 0.3, "passed": is_professional}
Simple (one-liner)
from fi.evals.framework import simple_eval
length_check = simple_eval(
name="min_length",
scorer=lambda inputs: min(len(inputs["response"]) / 100, 1.0),
threshold=0.5,
required_fields=["response"],
)
Builder
from fi.evals.framework import EvalBuilder
my_eval = (
EvalBuilder("custom_relevance")
.version("2.0.0")
.require("response", "query")
.threshold(0.8)
.evaluator(lambda inputs: {
"score": 0.9,
"passed": True,
"details": {"method": "keyword_overlap"},
})
.build()
)
Mixing custom + built-in
from fi.evals.framework import blocking_evaluator, custom_eval, simple_eval
from fi.evals import evaluate as run_eval
@custom_eval(name="toxicity_wrapper", required_fields=["response"])
def toxicity_check(inputs):
result = run_eval("toxicity", output=inputs["response"], model="turing_flash")
return {"score": result.score, "passed": result.passed}
length_check = simple_eval("min_length", lambda i: min(len(i["response"]) / 100, 1.0))
evaluator = blocking_evaluator(toxicity_check, length_check)
result = evaluator.run({"response": "This is a helpful answer."})
Result Types
EvaluatorResult
Returned by evaluator.run().
| Field/Method | Type | Description |
|---|---|---|
.batch | BatchEvalResult | Results (blocking mode) |
.future | BatchEvalFuture | Future (non-blocking mode) |
.is_future | bool | Whether result is a future |
.wait(timeout) | BatchEvalResult | Block until done |
BatchEvalResult
| Field/Method | Type | Description |
|---|---|---|
.success_rate | float | 0.0 to 1.0 |
.avg_latency_ms | float | Average per-evaluation time |
.total_count | int | Number of evaluations |
.success_count | int | Passed evaluations |
.failure_count | int | Failed evaluations |
.get_by_name(name) | list | Results for a specific evaluation |
.get_failures() | list | Only failed results |