Streaming Evaluation

Check LLM output token-by-token as it streams. Detect toxic content, PII, or quality drops mid-generation and stop early.

📝
TL;DR
  • Score LLM output as it’s being generated, word by word
  • Stop generation early if toxicity, PII, or quality drops are detected
  • Built-in safety and quality presets, or write your own scorer

Instead of waiting for the full response and scoring it afterwards, streaming checks run on each chunk as the LLM generates it. If something goes wrong mid-response, you can cut it off before the user sees it.

Note

Requires pip install ai-evaluation. The streaming module uses local scorer functions, not the cloud Turing engine.

Quick Example

from fi.evals.streaming import StreamingEvaluator, EarlyStopPolicy

# Create a safety-focused assessor
assessor = StreamingEvaluator.for_safety(toxicity_threshold=0.5)

# Simulate a token stream (in practice, this comes from your LLM)
tokens = ["Hello", " there", "!", " How", " can", " I", " help", " you", "?"]

for token in tokens:
    result = assessor.process_token(token)
    if result and result.should_stop:
        print(f"Stopped at chunk {result.chunk_index}: {result.stop_reason}")
        break

final = assessor.finalize()
print(final.passed)        # True
print(final.final_text)    # "Hello there! How can I help you?"
print(final.total_chunks)  # number of chunks checked
print(final.summary())     # human-readable summary

StreamingEvaluator

Creating an assessor

from fi.evals.streaming import StreamingEvaluator, StreamingConfig, EarlyStopPolicy

# Default settings
assessor = StreamingEvaluator.with_defaults()

# Safety-optimized (lower thresholds, stops on toxic content)
assessor = StreamingEvaluator.for_safety(toxicity_threshold=0.3)

# Quality-optimized (larger chunks, less frequent checks)
assessor = StreamingEvaluator.for_quality(min_chunk_size=50, eval_interval_ms=500)

# Full custom config
assessor = StreamingEvaluator(
    config=StreamingConfig(
        min_chunk_size=10,
        max_chunk_size=100,
        eval_interval_ms=200,
        enable_early_stop=True,
    ),
    policy=EarlyStopPolicy.default(),
)

Adding scoring functions

Each scorer takes (chunk_text, cumulative_text) and returns a float score.

from fi.evals.streaming import (
    StreamingEvaluator,
    toxicity_scorer,
    pii_scorer,
    coherence_scorer,
)

assessor = StreamingEvaluator.with_defaults()

# Built-in scorers
assessor.add_eval("toxicity", toxicity_scorer, threshold=0.5, pass_above=False)
assessor.add_eval("pii", pii_scorer, threshold=0.5, pass_above=False)
assessor.add_eval("coherence", coherence_scorer, threshold=0.3, pass_above=True)

# Custom scorer
def length_scorer(chunk: str, cumulative: str) -> float:
    """Penalize very long responses."""
    return min(1.0, len(cumulative) / 1000)

assessor.add_eval("length", length_scorer, threshold=0.8, pass_above=False)

Parameters for add_eval():

ParameterTypeDefaultDescription
namestrrequiredName of the check
eval_fncallablerequired(chunk_text, cumulative_text) -> float
thresholdfloat0.7Passing threshold
weightfloat1.0Weight for final score
pass_aboveboolTrueIf True, scores above threshold pass. If False, scores below pass.

Processing tokens

# One token at a time
for token in llm_stream:
    result = assessor.process_token(token)
    if result and result.should_stop:
        break

# Or process larger chunks
result = assessor.process_chunk("a larger piece of text")

# Or run an entire stream at once
final = assessor.evaluate_stream(token_iterator)

# Async version
final = await assessor.evaluate_stream_async(async_token_iterator)

Getting results

final = assessor.finalize()

print(final.passed)              # bool
print(final.final_text)          # str — the full accumulated text
print(final.total_chunks)        # int — chunks checked
print(final.final_scores)        # dict — {name: final_score}
print(final.early_stopped)       # bool — was generation stopped early?
print(final.stop_reason)         # EarlyStopReason enum
print(final.stopped_at_chunk)    # int or None
print(final.total_latency_ms)    # float — total checking time
print(final.summary())           # str — human-readable summary

StreamingConfig

Controls how often checks run and when to stop.

ParameterTypeDefaultDescription
min_chunk_sizeint1Minimum characters before triggering a check
max_chunk_sizeint100Maximum characters per chunk before forced check
eval_interval_msint100Minimum milliseconds between checks
max_tokensint or NoneNoneStop after this many tokens
max_charsint or NoneNoneStop after this many characters
chunk_timeout_msint5000Timeout for a single chunk check
total_timeout_msint60000Total timeout for the stream
enable_early_stopboolTrueWhether early stopping is enabled
stop_on_first_failureboolFalseStop immediately on any failure
eval_every_n_chunksint1Run checks every N chunks
eval_on_sentence_endboolTrueAlso check at sentence boundaries
on_chunk_callbackcallable or NoneNoneCalled after each chunk check
on_stop_callbackcallable or NoneNoneCalled when early stopping triggers

EarlyStopPolicy

Defines conditions that trigger early stopping.

from fi.evals.streaming import EarlyStopPolicy

# Presets
policy = EarlyStopPolicy.default()      # toxicity + safety stops
policy = EarlyStopPolicy.strict()       # lower thresholds, stops faster
policy = EarlyStopPolicy.permissive()   # only stops on severe issues

# Custom policy
policy = EarlyStopPolicy()
policy.add_toxicity_stop(threshold=0.7, consecutive=1)
policy.add_safety_stop(threshold=0.3, consecutive=1)
policy.add_quality_stop(threshold=0.3, consecutive=3)

# Custom condition
policy.add_condition(
    name="max_repetition",
    eval_name="repetition",
    threshold=0.8,
    comparison="above",      # stop if score goes ABOVE threshold
    consecutive_chunks=2,    # must fail 2 chunks in a row
)

assessor.set_policy(policy)

EarlyStopReason values

When result.should_stop is True, result.stop_reason is one of:

ValueMeaning
NONENo stop triggered
TOXICITYToxic content detected
SAFETYSafety violation
PIIPII detected
JAILBREAKJailbreak attempt
MAX_TOKENSToken limit reached
MAX_CHARSCharacter limit reached
THRESHOLDScore dropped below threshold
CUSTOMCustom condition triggered
TIMEOUTCheck timed out
ERRORCheck errored

ChunkResult

Returned by process_token() when a check is triggered.

result = assessor.process_token(token)

if result:
    print(result.chunk_index)      # int — which chunk this is
    print(result.chunk_text)       # str — the chunk that was checked
    print(result.cumulative_text)  # str — all text so far
    print(result.scores)           # dict — {name: score}
    print(result.flags)            # dict — {name: passed}
    print(result.should_stop)      # bool — should we stop?
    print(result.stop_reason)      # EarlyStopReason
    print(result.all_passed)       # bool — all checks passed?
    print(result.latency_ms)       # float — time for this chunk

Built-in Scorers

ScorerWhat it checksTypical thresholdpass_above
toxicity_scorerToxic or harmful language0.5False
safety_scorerGeneral safety violations0.5False
pii_scorerPersonally identifiable information0.5False
jailbreak_scorerJailbreak attempts0.5False
coherence_scorerText coherence and readability0.3True
quality_scorerOverall output quality0.3True
safety_composite_scorerCombined safety score0.5False
quality_composite_scorerCombined quality score0.3True
from fi.evals.streaming import toxicity_scorer, pii_scorer, coherence_scorer

Common Patterns

Guardrails on a streaming chatbot

from fi.evals.streaming import StreamingEvaluator, EarlyStopPolicy, toxicity_scorer, pii_scorer

# Start from defaults and add your own scorers
assessor = StreamingEvaluator.with_defaults()
assessor.add_eval("toxicity", toxicity_scorer, threshold=0.3, pass_above=False)
assessor.add_eval("pii", pii_scorer, threshold=0.3, pass_above=False)
assessor.set_policy(EarlyStopPolicy.strict())

safe_text = ""
for token in llm.stream("Tell me about yourself"):
    result = assessor.process_token(token)
    if result and result.should_stop:
        safe_text = result.cumulative_text
        break
    safe_text += token

final = assessor.finalize()

Callbacks for real-time monitoring

from fi.evals.streaming import StreamingEvaluator, StreamingConfig

def on_chunk(chunk_result):
    for name, score in chunk_result.scores.items():
        print(f"  [{name}] score={score:.2f}")

def on_stop(reason, text):
    print(f"STOPPED: {reason} after {len(text)} chars")

assessor = StreamingEvaluator(
    config=StreamingConfig(
        on_chunk_callback=on_chunk,
        on_stop_callback=on_stop,
    )
)
Was this page helpful?

Questions & Discussion