Streaming Evaluation
Check LLM output token-by-token as it streams. Detect toxic content, PII, or quality drops mid-generation and stop early.
- Score LLM output as it’s being generated, word by word
- Stop generation early if toxicity, PII, or quality drops are detected
- Built-in safety and quality presets, or write your own scorer
Instead of waiting for the full response and scoring it afterwards, streaming checks run on each chunk as the LLM generates it. If something goes wrong mid-response, you can cut it off before the user sees it.
Note
Requires pip install ai-evaluation. The streaming module uses local scorer functions, not the cloud Turing engine.
Quick Example
from fi.evals.streaming import StreamingEvaluator, EarlyStopPolicy
# Create a safety-focused assessor
assessor = StreamingEvaluator.for_safety(toxicity_threshold=0.5)
# Simulate a token stream (in practice, this comes from your LLM)
tokens = ["Hello", " there", "!", " How", " can", " I", " help", " you", "?"]
for token in tokens:
result = assessor.process_token(token)
if result and result.should_stop:
print(f"Stopped at chunk {result.chunk_index}: {result.stop_reason}")
break
final = assessor.finalize()
print(final.passed) # True
print(final.final_text) # "Hello there! How can I help you?"
print(final.total_chunks) # number of chunks checked
print(final.summary()) # human-readable summary
StreamingEvaluator
Creating an assessor
from fi.evals.streaming import StreamingEvaluator, StreamingConfig, EarlyStopPolicy
# Default settings
assessor = StreamingEvaluator.with_defaults()
# Safety-optimized (lower thresholds, stops on toxic content)
assessor = StreamingEvaluator.for_safety(toxicity_threshold=0.3)
# Quality-optimized (larger chunks, less frequent checks)
assessor = StreamingEvaluator.for_quality(min_chunk_size=50, eval_interval_ms=500)
# Full custom config
assessor = StreamingEvaluator(
config=StreamingConfig(
min_chunk_size=10,
max_chunk_size=100,
eval_interval_ms=200,
enable_early_stop=True,
),
policy=EarlyStopPolicy.default(),
)
Adding scoring functions
Each scorer takes (chunk_text, cumulative_text) and returns a float score.
from fi.evals.streaming import (
StreamingEvaluator,
toxicity_scorer,
pii_scorer,
coherence_scorer,
)
assessor = StreamingEvaluator.with_defaults()
# Built-in scorers
assessor.add_eval("toxicity", toxicity_scorer, threshold=0.5, pass_above=False)
assessor.add_eval("pii", pii_scorer, threshold=0.5, pass_above=False)
assessor.add_eval("coherence", coherence_scorer, threshold=0.3, pass_above=True)
# Custom scorer
def length_scorer(chunk: str, cumulative: str) -> float:
"""Penalize very long responses."""
return min(1.0, len(cumulative) / 1000)
assessor.add_eval("length", length_scorer, threshold=0.8, pass_above=False)
Parameters for add_eval():
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Name of the check |
eval_fn | callable | required | (chunk_text, cumulative_text) -> float |
threshold | float | 0.7 | Passing threshold |
weight | float | 1.0 | Weight for final score |
pass_above | bool | True | If True, scores above threshold pass. If False, scores below pass. |
Processing tokens
# One token at a time
for token in llm_stream:
result = assessor.process_token(token)
if result and result.should_stop:
break
# Or process larger chunks
result = assessor.process_chunk("a larger piece of text")
# Or run an entire stream at once
final = assessor.evaluate_stream(token_iterator)
# Async version
final = await assessor.evaluate_stream_async(async_token_iterator)
Getting results
final = assessor.finalize()
print(final.passed) # bool
print(final.final_text) # str — the full accumulated text
print(final.total_chunks) # int — chunks checked
print(final.final_scores) # dict — {name: final_score}
print(final.early_stopped) # bool — was generation stopped early?
print(final.stop_reason) # EarlyStopReason enum
print(final.stopped_at_chunk) # int or None
print(final.total_latency_ms) # float — total checking time
print(final.summary()) # str — human-readable summary
StreamingConfig
Controls how often checks run and when to stop.
| Parameter | Type | Default | Description |
|---|---|---|---|
min_chunk_size | int | 1 | Minimum characters before triggering a check |
max_chunk_size | int | 100 | Maximum characters per chunk before forced check |
eval_interval_ms | int | 100 | Minimum milliseconds between checks |
max_tokens | int or None | None | Stop after this many tokens |
max_chars | int or None | None | Stop after this many characters |
chunk_timeout_ms | int | 5000 | Timeout for a single chunk check |
total_timeout_ms | int | 60000 | Total timeout for the stream |
enable_early_stop | bool | True | Whether early stopping is enabled |
stop_on_first_failure | bool | False | Stop immediately on any failure |
eval_every_n_chunks | int | 1 | Run checks every N chunks |
eval_on_sentence_end | bool | True | Also check at sentence boundaries |
on_chunk_callback | callable or None | None | Called after each chunk check |
on_stop_callback | callable or None | None | Called when early stopping triggers |
EarlyStopPolicy
Defines conditions that trigger early stopping.
from fi.evals.streaming import EarlyStopPolicy
# Presets
policy = EarlyStopPolicy.default() # toxicity + safety stops
policy = EarlyStopPolicy.strict() # lower thresholds, stops faster
policy = EarlyStopPolicy.permissive() # only stops on severe issues
# Custom policy
policy = EarlyStopPolicy()
policy.add_toxicity_stop(threshold=0.7, consecutive=1)
policy.add_safety_stop(threshold=0.3, consecutive=1)
policy.add_quality_stop(threshold=0.3, consecutive=3)
# Custom condition
policy.add_condition(
name="max_repetition",
eval_name="repetition",
threshold=0.8,
comparison="above", # stop if score goes ABOVE threshold
consecutive_chunks=2, # must fail 2 chunks in a row
)
assessor.set_policy(policy)
EarlyStopReason values
When result.should_stop is True, result.stop_reason is one of:
| Value | Meaning |
|---|---|
NONE | No stop triggered |
TOXICITY | Toxic content detected |
SAFETY | Safety violation |
PII | PII detected |
JAILBREAK | Jailbreak attempt |
MAX_TOKENS | Token limit reached |
MAX_CHARS | Character limit reached |
THRESHOLD | Score dropped below threshold |
CUSTOM | Custom condition triggered |
TIMEOUT | Check timed out |
ERROR | Check errored |
ChunkResult
Returned by process_token() when a check is triggered.
result = assessor.process_token(token)
if result:
print(result.chunk_index) # int — which chunk this is
print(result.chunk_text) # str — the chunk that was checked
print(result.cumulative_text) # str — all text so far
print(result.scores) # dict — {name: score}
print(result.flags) # dict — {name: passed}
print(result.should_stop) # bool — should we stop?
print(result.stop_reason) # EarlyStopReason
print(result.all_passed) # bool — all checks passed?
print(result.latency_ms) # float — time for this chunk
Built-in Scorers
| Scorer | What it checks | Typical threshold | pass_above |
|---|---|---|---|
toxicity_scorer | Toxic or harmful language | 0.5 | False |
safety_scorer | General safety violations | 0.5 | False |
pii_scorer | Personally identifiable information | 0.5 | False |
jailbreak_scorer | Jailbreak attempts | 0.5 | False |
coherence_scorer | Text coherence and readability | 0.3 | True |
quality_scorer | Overall output quality | 0.3 | True |
safety_composite_scorer | Combined safety score | 0.5 | False |
quality_composite_scorer | Combined quality score | 0.3 | True |
from fi.evals.streaming import toxicity_scorer, pii_scorer, coherence_scorer
Common Patterns
Guardrails on a streaming chatbot
from fi.evals.streaming import StreamingEvaluator, EarlyStopPolicy, toxicity_scorer, pii_scorer
# Start from defaults and add your own scorers
assessor = StreamingEvaluator.with_defaults()
assessor.add_eval("toxicity", toxicity_scorer, threshold=0.3, pass_above=False)
assessor.add_eval("pii", pii_scorer, threshold=0.3, pass_above=False)
assessor.set_policy(EarlyStopPolicy.strict())
safe_text = ""
for token in llm.stream("Tell me about yourself"):
result = assessor.process_token(token)
if result and result.should_stop:
safe_text = result.cumulative_text
break
safe_text += token
final = assessor.finalize()
Callbacks for real-time monitoring
from fi.evals.streaming import StreamingEvaluator, StreamingConfig
def on_chunk(chunk_result):
for name, score in chunk_result.scores.items():
print(f" [{name}] score={score:.2f}")
def on_stop(reason, text):
print(f"STOPPED: {reason} after {len(text)} chars")
assessor = StreamingEvaluator(
config=StreamingConfig(
on_chunk_callback=on_chunk,
on_stop_callback=on_stop,
)
)