Async Evaluations for Large-Scale Testing

Fire-and-forget async evaluations, poll for results, and run parallel evals across hundreds of items using the Evaluator SDK.

📝
TL;DR

Submit fire-and-forget async evaluations, poll for results, and run 50+ evals in parallel using the Evaluator SDK with ThreadPoolExecutor.

Open in ColabGitHub
TimeDifficultyPackage
15 minIntermediateai-evaluation
Prerequisites

Install

pip install ai-evaluation
export FI_API_KEY="your-api-key"
export FI_SECRET_KEY="your-secret-key"

What is async evaluation?

The standalone evaluate() function blocks until the score returns. For large workloads you need two things: non-blocking submission (is_async=True on Evaluator.evaluate()) and client-side parallelism (concurrent.futures) to submit many requests at once.

Tip

For dataset-level batch evaluation — upload a CSV, run evals across every row server-side — see Dataset SDK: Batch Evaluation. This cookbook covers client-side async and parallel patterns for custom pipelines.

Run a synchronous eval as a baseline

A single synchronous call blocks until the result is ready.

from fi.evals import evaluate

result = evaluate(
    "groundedness",
    output="The Eiffel Tower is in Paris, France.",
    context="The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris.",
    model="turing_small",
)

print(f"Score: {result.score}  Passed: {result.passed}")
print(f"Reason: {result.reason}")

Expected output:

Score: 1.0  Passed: True
Reason: The output is fully supported by the provided context.

This is fine for single items. For 50+ items it becomes slow because each call waits for the server response before the next one starts.

Submit an async evaluation (fire and forget)

Use Evaluator.evaluate() with is_async=True. The call returns immediately with an eval_id you can poll later.

import os
from fi.evals import Evaluator

evaluator = Evaluator(
    fi_api_key=os.environ["FI_API_KEY"],
    fi_secret_key=os.environ["FI_SECRET_KEY"],
)

result = evaluator.evaluate(
    eval_templates="groundedness",
    inputs={
        "output": "The Eiffel Tower is in Paris, France.",
        "context": "The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris.",
    },
    model_name="turing_small",
    is_async=True,
)

# Extract the eval_id for polling
eval_id = result.eval_results[0].eval_id
print(f"Submitted async eval (eval_id: {eval_id})")

Expected output:

Submitted async eval (eval_id: abc123-def456-...)

Note

is_async=True is only available on Evaluator.evaluate(), not on the standalone evaluate() function.

Poll for results with get_eval_result()

Use get_eval_result(eval_id) to retrieve the result once processing completes.

import time

# Poll until the result is ready
for attempt in range(15):
    poll_result = evaluator.get_eval_result(eval_id)
    inner = poll_result.get("result", {})

    if isinstance(inner, dict) and inner.get("evalStatus") == "completed":
        eval_data = inner["result"]
        print(f"\n✅ Evaluation complete!")
        print(f"  Metric:  {eval_data['name']}")
        print(f"  Value:   {eval_data['value']}")
        print(f"  Runtime: {eval_data['runtime'] / 1000:.1f}s")
        print(f"  Reason:  {eval_data['reason'][:120]}...")
        break

    print(f"  ⏳ Attempt {attempt + 1}/15: still processing...")
    time.sleep(5)
else:
    print("❌ Timed out waiting for result")

Expected output:

  ⏳ Attempt 1/15: still processing...
  ⏳ Attempt 2/15: still processing...

✅ Evaluation complete!
  Metric:  groundedness
  Value:   Passed
  Runtime: 24.2s
  Reason:  The output is fully supported by the provided context. The Eiffel Tower being in Paris, France is...

Evaluate 50+ items in parallel

Use concurrent.futures.ThreadPoolExecutor to submit many evaluations concurrently. Each thread calls Evaluator.evaluate() independently.

import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from fi.evals import Evaluator

evaluator = Evaluator(
    fi_api_key=os.environ["FI_API_KEY"],
    fi_secret_key=os.environ["FI_SECRET_KEY"],
)

# Sample dataset — 50 items
test_cases = [
    {
        "output": f"Response {i}: The capital of France is Paris.",
        "context": "Paris is the capital and most populous city of France.",
        "input": f"Question {i}: What is the capital of France?",
    }
    for i in range(50)
]

def evaluate_one(index, test_case):
    """Evaluate a single test case and return the index + result."""
    result = evaluator.evaluate(
        eval_templates="groundedness",
        inputs=test_case,
        model_name="turing_small",
    )
    return index, result

# Run evaluations in parallel
results = [None] * len(test_cases)
completed = 0
start = time.time()

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {
        executor.submit(evaluate_one, i, tc): i
        for i, tc in enumerate(test_cases)
    }

    for future in as_completed(futures):
        idx, result = future.result()
        results[idx] = result
        completed += 1
        if completed % 10 == 0:
            elapsed = time.time() - start
            print(f"Progress: {completed}/{len(test_cases)} ({elapsed:.1f}s)")

elapsed = time.time() - start
print(f"\nDone: {len(test_cases)} evaluations in {elapsed:.1f}s")

# Summarize results
scored = sum(
    1 for r in results
    if r and r.eval_results and r.eval_results[0].output is not None
)
print(f"Scored: {scored}/{len(test_cases)}")

# Print a sample result
sample = results[0].eval_results[0]
print(f"Sample (name: {sample.name}, output: {sample.output}, reason: {sample.reason})")

Expected output:

Progress: 10/50 (3.2s)
Progress: 20/50 (5.8s)
Progress: 30/50 (8.1s)
Progress: 40/50 (10.5s)
Progress: 50/50 (12.9s)

Done: 50 evaluations in 12.9s
Scored: 50/50
Sample (name: groundedness, output: True, reason: The response is fully grounded...)

Parallel async submissions with batch polling

Combine is_async=True with parallel submission for maximum throughput — submit all items without waiting, then poll results in bulk.

import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from fi.evals import Evaluator

evaluator = Evaluator(
    fi_api_key=os.environ["FI_API_KEY"],
    fi_secret_key=os.environ["FI_SECRET_KEY"],
)

test_cases = [
    {
        "output": f"Response {i}: Python is a programming language.",
        "context": "Python is a high-level, general-purpose programming language.",
    }
    for i in range(50)
]

# Phase 1: Submit all evaluations asynchronously
def submit_async(index, test_case):
    result = evaluator.evaluate(
        eval_templates="groundedness",
        inputs=test_case,
        model_name="turing_small",
        is_async=True,
    )
    eval_id = result.eval_results[0].eval_id
    return index, eval_id

eval_ids = {}
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {
        executor.submit(submit_async, i, tc): i
        for i, tc in enumerate(test_cases)
    }
    for future in as_completed(futures):
        idx, eval_id = future.result()
        eval_ids[idx] = eval_id

print(f"Submitted {len(eval_ids)} async evaluations")

# Phase 2: Poll for results until all complete
results = {}
max_polls = 15

for poll_round in range(max_polls):
    still_pending = {
        idx: eid for idx, eid in eval_ids.items() if idx not in results
    }
    if not still_pending:
        break

    for idx, eid in still_pending.items():
        poll_result = evaluator.get_eval_result(eid)
        inner = poll_result.get("result", {})
        if isinstance(inner, dict) and inner.get("evalStatus") == "completed":
            results[idx] = poll_result

    print(f"  🔄 Poll {poll_round + 1}: {len(results)}/{len(eval_ids)} completed")
    if len(results) < len(eval_ids):
        time.sleep(3)

print(f"\n✅ All {len(results)} evaluations complete!")

# Print first few results
for idx in sorted(results)[:3]:
    eval_data = results[idx]["result"]["result"]
    print(f"  Item {idx}: {eval_data['name']} = {eval_data['value']}")

Expected output:

Submitted 50 async evaluations
  🔄 Poll 1: 12/50 completed
  🔄 Poll 2: 34/50 completed
  🔄 Poll 3: 50/50 completed

✅ All 50 evaluations complete!
  Item 0: groundedness = Passed
  Item 1: groundedness = Passed
  Item 2: groundedness = Passed

Tips for large-scale evaluation

TipDetail
Tune max_workersPass max_workers=16 to the Evaluator() constructor to increase the internal thread pool size beyond the default of 8. Too high risks rate limiting.
Use turing_smallBalanced speed and accuracy — best for most async workloads. Use turing_flash for lowest latency or turing_large when accuracy matters more than speed.
Add error handlingWrap future.result() in try/except to catch timeouts and API errors without losing the whole batch.
Chunk large batchesFor 1000+ items, split into chunks of 100 and add a short sleep between chunks to avoid rate limits.
Async for fire-and-forgetUse is_async=True when you do not need results immediately — for example, logging eval scores to a database in the background.
Sync parallel for immediate resultsUse ThreadPoolExecutor without is_async when you need all scores before proceeding (e.g., CI gates).

Error handling pattern

from concurrent.futures import ThreadPoolExecutor, as_completed

results = {}
errors = {}

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {
        executor.submit(evaluate_one, i, tc): i
        for i, tc in enumerate(test_cases)
    }

    for future in as_completed(futures):
        idx = futures[future]
        try:
            _, result = future.result(timeout=60)
            results[idx] = result
        except Exception as exc:
            errors[idx] = str(exc)
            print(f"Item {idx} failed: {exc}")

print(f"Succeeded: {len(results)}, Failed: {len(errors)}")

What you built

You can now submit async evaluations, poll for results, and run large-scale parallel evals using the Evaluator SDK.

  • Ran a synchronous eval as a baseline with the standalone evaluate() function
  • Submitted a non-blocking async eval with Evaluator.evaluate(is_async=True)
  • Polled async results with evaluator.get_eval_result(eval_id)
  • Evaluated 50 items in parallel with ThreadPoolExecutor and progress tracking
  • Combined async submission with batch polling for maximum throughput

Next steps

Was this page helpful?

Questions & Discussion