Async Evaluations for Large-Scale Testing
Fire-and-forget async evaluations, poll for results, and run parallel evals across hundreds of items using the Evaluator SDK.
Submit fire-and-forget async evaluations, poll for results, and run 50+ evals in parallel using the Evaluator SDK with ThreadPoolExecutor.
| Time | Difficulty | Package |
|---|---|---|
| 15 min | Intermediate | ai-evaluation |
- FutureAGI account → app.futureagi.com
- API keys:
FI_API_KEYandFI_SECRET_KEY(see Get your API keys) - Python 3.9+
Install
pip install ai-evaluation
export FI_API_KEY="your-api-key"
export FI_SECRET_KEY="your-secret-key"
What is async evaluation?
The standalone evaluate() function blocks until the score returns. For large workloads you need two things: non-blocking submission (is_async=True on Evaluator.evaluate()) and client-side parallelism (concurrent.futures) to submit many requests at once.
Tip
For dataset-level batch evaluation — upload a CSV, run evals across every row server-side — see Dataset SDK: Batch Evaluation. This cookbook covers client-side async and parallel patterns for custom pipelines.
Run a synchronous eval as a baseline
A single synchronous call blocks until the result is ready.
from fi.evals import evaluate
result = evaluate(
"groundedness",
output="The Eiffel Tower is in Paris, France.",
context="The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris.",
model="turing_small",
)
print(f"Score: {result.score} Passed: {result.passed}")
print(f"Reason: {result.reason}")Expected output:
Score: 1.0 Passed: True
Reason: The output is fully supported by the provided context.This is fine for single items. For 50+ items it becomes slow because each call waits for the server response before the next one starts.
Submit an async evaluation (fire and forget)
Use Evaluator.evaluate() with is_async=True. The call returns immediately with an eval_id you can poll later.
import os
from fi.evals import Evaluator
evaluator = Evaluator(
fi_api_key=os.environ["FI_API_KEY"],
fi_secret_key=os.environ["FI_SECRET_KEY"],
)
result = evaluator.evaluate(
eval_templates="groundedness",
inputs={
"output": "The Eiffel Tower is in Paris, France.",
"context": "The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris.",
},
model_name="turing_small",
is_async=True,
)
# Extract the eval_id for polling
eval_id = result.eval_results[0].eval_id
print(f"Submitted async eval (eval_id: {eval_id})")Expected output:
Submitted async eval (eval_id: abc123-def456-...)Note
is_async=True is only available on Evaluator.evaluate(), not on the standalone evaluate() function.
Poll for results with get_eval_result()
Use get_eval_result(eval_id) to retrieve the result once processing completes.
import time
# Poll until the result is ready
for attempt in range(15):
poll_result = evaluator.get_eval_result(eval_id)
inner = poll_result.get("result", {})
if isinstance(inner, dict) and inner.get("evalStatus") == "completed":
eval_data = inner["result"]
print(f"\n✅ Evaluation complete!")
print(f" Metric: {eval_data['name']}")
print(f" Value: {eval_data['value']}")
print(f" Runtime: {eval_data['runtime'] / 1000:.1f}s")
print(f" Reason: {eval_data['reason'][:120]}...")
break
print(f" ⏳ Attempt {attempt + 1}/15: still processing...")
time.sleep(5)
else:
print("❌ Timed out waiting for result")Expected output:
⏳ Attempt 1/15: still processing...
⏳ Attempt 2/15: still processing...
✅ Evaluation complete!
Metric: groundedness
Value: Passed
Runtime: 24.2s
Reason: The output is fully supported by the provided context. The Eiffel Tower being in Paris, France is... Evaluate 50+ items in parallel
Use concurrent.futures.ThreadPoolExecutor to submit many evaluations concurrently. Each thread calls Evaluator.evaluate() independently.
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from fi.evals import Evaluator
evaluator = Evaluator(
fi_api_key=os.environ["FI_API_KEY"],
fi_secret_key=os.environ["FI_SECRET_KEY"],
)
# Sample dataset — 50 items
test_cases = [
{
"output": f"Response {i}: The capital of France is Paris.",
"context": "Paris is the capital and most populous city of France.",
"input": f"Question {i}: What is the capital of France?",
}
for i in range(50)
]
def evaluate_one(index, test_case):
"""Evaluate a single test case and return the index + result."""
result = evaluator.evaluate(
eval_templates="groundedness",
inputs=test_case,
model_name="turing_small",
)
return index, result
# Run evaluations in parallel
results = [None] * len(test_cases)
completed = 0
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(evaluate_one, i, tc): i
for i, tc in enumerate(test_cases)
}
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
completed += 1
if completed % 10 == 0:
elapsed = time.time() - start
print(f"Progress: {completed}/{len(test_cases)} ({elapsed:.1f}s)")
elapsed = time.time() - start
print(f"\nDone: {len(test_cases)} evaluations in {elapsed:.1f}s")
# Summarize results
scored = sum(
1 for r in results
if r and r.eval_results and r.eval_results[0].output is not None
)
print(f"Scored: {scored}/{len(test_cases)}")
# Print a sample result
sample = results[0].eval_results[0]
print(f"Sample (name: {sample.name}, output: {sample.output}, reason: {sample.reason})")Expected output:
Progress: 10/50 (3.2s)
Progress: 20/50 (5.8s)
Progress: 30/50 (8.1s)
Progress: 40/50 (10.5s)
Progress: 50/50 (12.9s)
Done: 50 evaluations in 12.9s
Scored: 50/50
Sample (name: groundedness, output: True, reason: The response is fully grounded...) Parallel async submissions with batch polling
Combine is_async=True with parallel submission for maximum throughput — submit all items without waiting, then poll results in bulk.
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from fi.evals import Evaluator
evaluator = Evaluator(
fi_api_key=os.environ["FI_API_KEY"],
fi_secret_key=os.environ["FI_SECRET_KEY"],
)
test_cases = [
{
"output": f"Response {i}: Python is a programming language.",
"context": "Python is a high-level, general-purpose programming language.",
}
for i in range(50)
]
# Phase 1: Submit all evaluations asynchronously
def submit_async(index, test_case):
result = evaluator.evaluate(
eval_templates="groundedness",
inputs=test_case,
model_name="turing_small",
is_async=True,
)
eval_id = result.eval_results[0].eval_id
return index, eval_id
eval_ids = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(submit_async, i, tc): i
for i, tc in enumerate(test_cases)
}
for future in as_completed(futures):
idx, eval_id = future.result()
eval_ids[idx] = eval_id
print(f"Submitted {len(eval_ids)} async evaluations")
# Phase 2: Poll for results until all complete
results = {}
max_polls = 15
for poll_round in range(max_polls):
still_pending = {
idx: eid for idx, eid in eval_ids.items() if idx not in results
}
if not still_pending:
break
for idx, eid in still_pending.items():
poll_result = evaluator.get_eval_result(eid)
inner = poll_result.get("result", {})
if isinstance(inner, dict) and inner.get("evalStatus") == "completed":
results[idx] = poll_result
print(f" 🔄 Poll {poll_round + 1}: {len(results)}/{len(eval_ids)} completed")
if len(results) < len(eval_ids):
time.sleep(3)
print(f"\n✅ All {len(results)} evaluations complete!")
# Print first few results
for idx in sorted(results)[:3]:
eval_data = results[idx]["result"]["result"]
print(f" Item {idx}: {eval_data['name']} = {eval_data['value']}")Expected output:
Submitted 50 async evaluations
🔄 Poll 1: 12/50 completed
🔄 Poll 2: 34/50 completed
🔄 Poll 3: 50/50 completed
✅ All 50 evaluations complete!
Item 0: groundedness = Passed
Item 1: groundedness = Passed
Item 2: groundedness = Passed Tips for large-scale evaluation
| Tip | Detail |
|---|---|
Tune max_workers | Pass max_workers=16 to the Evaluator() constructor to increase the internal thread pool size beyond the default of 8. Too high risks rate limiting. |
Use turing_small | Balanced speed and accuracy — best for most async workloads. Use turing_flash for lowest latency or turing_large when accuracy matters more than speed. |
| Add error handling | Wrap future.result() in try/except to catch timeouts and API errors without losing the whole batch. |
| Chunk large batches | For 1000+ items, split into chunks of 100 and add a short sleep between chunks to avoid rate limits. |
| Async for fire-and-forget | Use is_async=True when you do not need results immediately — for example, logging eval scores to a database in the background. |
| Sync parallel for immediate results | Use ThreadPoolExecutor without is_async when you need all scores before proceeding (e.g., CI gates). |
Error handling pattern
from concurrent.futures import ThreadPoolExecutor, as_completed
results = {}
errors = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(evaluate_one, i, tc): i
for i, tc in enumerate(test_cases)
}
for future in as_completed(futures):
idx = futures[future]
try:
_, result = future.result(timeout=60)
results[idx] = result
except Exception as exc:
errors[idx] = str(exc)
print(f"Item {idx} failed: {exc}")
print(f"Succeeded: {len(results)}, Failed: {len(errors)}")
What you built
You can now submit async evaluations, poll for results, and run large-scale parallel evals using the Evaluator SDK.
- Ran a synchronous eval as a baseline with the standalone
evaluate()function - Submitted a non-blocking async eval with
Evaluator.evaluate(is_async=True) - Polled async results with
evaluator.get_eval_result(eval_id) - Evaluated 50 items in parallel with
ThreadPoolExecutorand progress tracking - Combined async submission with batch polling for maximum throughput