Introduction
Evaluation
- Overview
- Quickstart
- Future AGI Models
- Concept
- How To
- Eval Definition
Dataset
- Overview
- Concept
- Adding Dataset
- Create Dynamic Column
- Add Annotations
- Change Column Type
- Create Static Column
- Create Synthetic Data
- Experimentation
Tracing
- Overview
- Concept
- Instrumentation ( Auto )
- Manual Tracing
- Implementing Tracing
- Instrument with traceAI Helpers
- Get Current Tracer and Span
- Enriching Spans with Attributes, Metadata, and Tags
- Logging Prompt Templates & Variables
- Integrate Events, Exceptions, and Status into Spans
- Set Session ID and User ID
- Tool Spans Creation
- Mask Span Attributes
- Advanced Tracing (OTEL)
- FI Semantic Conventions
- In-line Evaluations
MCP
Admin & Settings
Advanced Tracing (OTEL)
Exploring Manual Context Propagation, Custom Decorators, and Sampling Techniques
This documentation provides advanced use cases and examples, such as manual context propagation, custom decorators, and sampling , filtering methods. These examples cater to real-world requirements like asynchronous execution, multi-service interactions, and specialized exporters or decorators for observability platforms like Future AGI.
1. Manual Context Propagation
In OpenTelemetry, context propagation ensures that the current tracing context (i.e., the active span and its metadata) is maintained across thread, task, or process switches. This is crucial when your code involves asynchronous tasks or spans multiple microservices.
Typically, OTEL instrumentation libraries manage context propagation automatically. However, manual intervention is sometimes necessary, especially in asynchronous workflows or custom instrumentation scenarios.
Handling Propagation in Async Functions
For Python async/await code, you might need to manually pass context if automated instrumentation doesn’t suffice or if custom logic is involved. The process involves:
- Extracting the current context (e.g., from an incoming HTTP request)
- Creating a new span as a child of that context
- Embedding the context into the async function for reuse
import asyncio
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current
tracer = trace.get_tracer(__name__)
async def async_func(ctx):
token = attach(ctx)
try:
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input") # Corrected attribute key
await asyncio.sleep(1) # Simulate async work
finally:
detach(token)
def sync_func():
with tracer.start_as_current_span("sync_span") as span:
# Capture the current context
context = get_current()
# Run the async function, passing the context
asyncio.run(async_func(context))
if __name__ == "__main__":
sync_func()
import asyncio
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current
tracer = trace.get_tracer(__name__)
async def async_func(ctx):
token = attach(ctx)
try:
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input") # Corrected attribute key
await asyncio.sleep(1) # Simulate async work
finally:
detach(token)
def sync_func():
with tracer.start_as_current_span("sync_span") as span:
# Capture the current context
context = get_current()
# Run the async function, passing the context
asyncio.run(async_func(context))
if __name__ == "__main__":
sync_func()
import { trace, context, Context } from "@opentelemetry/api";
import { promisify } from "util";
const sleep = promisify(setTimeout);
const tracer = trace.getTracer("my-app-tracer");
async function asyncFunc(ctx: Context): Promise<void> {
// context.with ensures the passed context is active within this function's scope.
await context.with(ctx, async () => {
const currentSpan = trace.getSpan(context.active());
if (currentSpan) {
currentSpan.setAttribute("input.value", "User Input from TS");
}
await sleep(1000); // Simulate async work
});
}
async function syncFunc(): Promise<void> {
// Start a parent span
await tracer.startActiveSpan("sync_span", async (span) => {
// Capture the current context (which includes sync_span)
const currentActiveContext = context.active();
// Run the async function, passing the captured context
await asyncFunc(currentActiveContext);
span.end();
});
}
// To run the example:
// syncFunc().then(() => console.log("Trace example completed."));
Propagation Between Microservices
When making HTTP or gRPC calls to another microservice, the current tracing context is usually propagated through HTTP headers. Built-in instrumentation (like opentelemetry-instrumentation-requests or opentelemetry-instrumentation-httpx) handles this automatically. For a custom approach, follow these steps:
- Inject the current span context into HTTP headers before sending the request
- Extract the context from incoming headers on the receiving microservice
Example: Service A sends a request to Service B.
Service A:
import requests
from opentelemetry import trace
# from opentelemetry.context import Context # Not strictly needed for inject but good for awareness
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
tracer = trace.get_tracer(__name__)
def make_request_to_service_b():
# Start a new span for this operation
with tracer.start_as_current_span("llm_service_a") as span:
# Prepare headers
headers = {}
DefaultTextMapPropagator().inject(carrier=headers) # Inject the current context
# Make the request with the injected headers
response = requests.get("http://localhost:5001/endpoint", headers=headers) # Assuming Python Service B runs on 5001
return response.text
# Example usage (ensure Service B is running and OTel SDK is configured for console output):
# if __name__ == "__main__":
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# print(make_request_to_service_b())
import requests
from opentelemetry import trace
# from opentelemetry.context import Context # Not strictly needed for inject but good for awareness
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
tracer = trace.get_tracer(__name__)
def make_request_to_service_b():
# Start a new span for this operation
with tracer.start_as_current_span("llm_service_a") as span:
# Prepare headers
headers = {}
DefaultTextMapPropagator().inject(carrier=headers) # Inject the current context
# Make the request with the injected headers
response = requests.get("http://localhost:5001/endpoint", headers=headers) # Assuming Python Service B runs on 5001
return response.text
# Example usage (ensure Service B is running and OTel SDK is configured for console output):
# if __name__ == "__main__":
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# print(make_request_to_service_b())
import { trace, context, propagation, SpanStatusCode } from "@opentelemetry/api";
import { HttpTraceContextPropagator } from "@opentelemetry/core";
import fetch from "node-fetch"; // yarn add node-fetch @types/node-fetch
const tracer = trace.getTracer("my-service-a-tracer");
// It's common to set this globally once for an application.
propagation.setGlobalPropagator(new HttpTraceContextPropagator());
async function makeRequestToServiceB(): Promise<string> {
return await tracer.startActiveSpan("typescript_llm_service_a", async (span) => {
const headers: Record<string, string> = {};
propagation.inject(context.active(), headers);
try {
const response = await fetch("http://localhost:5002/ts-endpoint", { headers }); // Assuming TS Service B on 5002
if (!response.ok) {
span.setStatus({ code: SpanStatusCode.ERROR, message: `HTTP error! status: ${response.status}` });
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.text();
span.setStatus({ code: SpanStatusCode.OK });
return data;
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
throw error;
} finally {
span.end();
}
});
}
// Example usage (ensure Service B is running and OTel SDK is configured):
// async function main() {
// // Minimal OTel SDK setup for console output
// const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
// const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
// const provider = new NodeTracerProvider();
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
// provider.register();
// try {
// const response = await makeRequestToServiceB();
// console.log("Response from Service B:", response);
// } catch (err) {
// console.error("Error making request:", err);
// }
// }
// main();
Service B:
from flask import Flask, request
from opentelemetry import trace
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
# Minimal OTel setup for console output if not already configured globally
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
app = Flask(__name__)
tracer = trace.get_tracer("my-service-b-tracer") # Corrected tracer name from __name__ for clarity
@app.route("/endpoint")
def endpoint():
# Extract the context from incoming request
context_from_propagator = DefaultTextMapPropagator().extract(carrier=dict(request.headers))
# Create a new span as child
with tracer.start_as_current_span("python_service_b_processing", context=context_from_propagator) as span:
span.add_event("Received request in Python Service B")
# ... do some processing ...
return "Hello from Python Service B"
# if __name__ == "__main__":
# app.run(port=5001) # Assuming Python Service B runs on 5001
from flask import Flask, request
from opentelemetry import trace
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
# Minimal OTel setup for console output if not already configured globally
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
app = Flask(__name__)
tracer = trace.get_tracer("my-service-b-tracer") # Corrected tracer name from __name__ for clarity
@app.route("/endpoint")
def endpoint():
# Extract the context from incoming request
context_from_propagator = DefaultTextMapPropagator().extract(carrier=dict(request.headers))
# Create a new span as child
with tracer.start_as_current_span("python_service_b_processing", context=context_from_propagator) as span:
span.add_event("Received request in Python Service B")
# ... do some processing ...
return "Hello from Python Service B"
# if __name__ == "__main__":
# app.run(port=5001) # Assuming Python Service B runs on 5001
import { trace, context, propagation, SpanStatusCode } from "@opentelemetry/api";
import { HttpTraceContextPropagator } from "@opentelemetry/core";
import express, { Request, Response } from 'express'; // yarn add express @types/express
const tracer = trace.getTracer("my-ts-service-b-tracer");
// Ensure the same propagator is used as in Service A.
// If not set globally in Service A, ensure it's configured here or use a globally set one.
// propagation.setGlobalPropagator(new HttpTraceContextPropagator()); // Usually set globally once.
const app = express();
const port = 5002; // Assuming TS Service B runs on 5002
app.get('/ts-endpoint', (req: Request, res: Response) => {
const parentContext = propagation.extract(context.active(), req.headers);
tracer.startActiveSpan("typescript_service_b_processing", { context: parentContext }, (span) => {
try {
span.addEvent("Received request in Typescript Service B");
// ... do some processing ...
res.send("Hello from Typescript Service B");
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
res.status(500).send("Error processing request");
} finally {
span.end();
}
});
});
// Example OTel SDK setup for console output before starting server:
// async function startServer() {
// // Minimal OTel SDK setup for console output
// const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
// const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
// const provider = new NodeTracerProvider();
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
// provider.register();
// app.listen(port, () => {
// console.log(`Typescript Service B listening on http://localhost:${port}`);
// });
// }
// startServer();
Propagation with Concurrent Threads
When tasks are submitted to a ThreadPoolExecutor
or any concurrency mechanism, each task runs in a separate thread. The tracer’s current context (which stores the active span or baggage) doesn’t automatically follow tasks to worker threads. By capturing the context in the main thread and attaching it in each worker thread, you maintain the association between tasks and the original trace context.
Example: Below is a detailed, annotated example to show how you can:
-
Capture the current context before submitting tasks to the executor.
-
Attach that context within each worker thread (using
attach
). -
Run your task logic (e.g., processing questions).
-
Detach the context when the task is complete (using
detach
).
import concurrent.futures
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current
from typing import Callable # Added for type hint
tracer = trace.get_tracer(__name__)
def func1():
# Some example work done in a thread.
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input from func1") # Corrected attribute value
return "func1 result"
def func2():
# Another example function that logs an event to the current span.
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input from func2") # Corrected attribute value
return "func2 result"
def wrapped_func(func: Callable, main_context):
# Wraps the original function to attach/detach the captured context
# so the worker thread has the correct span context.
def wrapper():
token = attach(main_context) # Attach context to this thread
try:
return func()
finally:
detach(token) # Detach after finishing
return wrapper
# Example main execution logic:
# def main_concurrent_execution():
# with tracer.start_as_current_span("main_operation") as parent_span:
# parent_span.set_attribute("orchestrator", "ThreadPoolExecutor")
# # Capture the context from the current thread (main_operation's context)
# main_context_to_propagate = get_current()
# # Create a list of functions to be executed in parallel
# funcs_to_run = [func1, func2, func1, func2]
# results = []
# with concurrent.futures.ThreadPoolExecutor() as executor:
# # Map each function to its wrapped version, passing the captured context
# futures = [executor.submit(wrapped_func(f, main_context_to_propagate)) for f in funcs_to_run]
# for future in concurrent.futures.as_completed(futures):
# results.append(future.result())
# parent_span.set_attribute("results.count", len(results))
# return results
# if __name__ == "__main__":
# # Minimal OTel SDK setup for console output
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# final_results = main_concurrent_execution()
# print(f"Concurrent execution results: {final_results}")
# The original return results was outside a function, wrapped it in main_concurrent_execution for clarity
import concurrent.futures
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current
from typing import Callable # Added for type hint
tracer = trace.get_tracer(__name__)
def func1():
# Some example work done in a thread.
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input from func1") # Corrected attribute value
return "func1 result"
def func2():
# Another example function that logs an event to the current span.
current_span = trace.get_current_span()
current_span.set_attribute("input.value", "User Input from func2") # Corrected attribute value
return "func2 result"
def wrapped_func(func: Callable, main_context):
# Wraps the original function to attach/detach the captured context
# so the worker thread has the correct span context.
def wrapper():
token = attach(main_context) # Attach context to this thread
try:
return func()
finally:
detach(token) # Detach after finishing
return wrapper
# Example main execution logic:
# def main_concurrent_execution():
# with tracer.start_as_current_span("main_operation") as parent_span:
# parent_span.set_attribute("orchestrator", "ThreadPoolExecutor")
# # Capture the context from the current thread (main_operation's context)
# main_context_to_propagate = get_current()
# # Create a list of functions to be executed in parallel
# funcs_to_run = [func1, func2, func1, func2]
# results = []
# with concurrent.futures.ThreadPoolExecutor() as executor:
# # Map each function to its wrapped version, passing the captured context
# futures = [executor.submit(wrapped_func(f, main_context_to_propagate)) for f in funcs_to_run]
# for future in concurrent.futures.as_completed(futures):
# results.append(future.result())
# parent_span.set_attribute("results.count", len(results))
# return results
# if __name__ == "__main__":
# # Minimal OTel SDK setup for console output
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# final_results = main_concurrent_execution()
# print(f"Concurrent execution results: {final_results}")
# The original return results was outside a function, wrapped it in main_concurrent_execution for clarity
import { trace, context, Context } from "@opentelemetry/api";
import { promisify } from "util";
const sleep = promisify(setTimeout);
const tracer = trace.getTracer("my-app-tracer-concurrent");
async function processItem(itemNumber: number, parentCtx: Context): Promise<string> {
// Use context.with to ensure operations run within the parentCtx
return await context.with(parentCtx, async () => {
// This new span will be a child of the span in parentCtx (e.g., "main_async_operation")
return await tracer.startActiveSpan(`process_item_${itemNumber}`, async (span) => {
span.setAttribute("item.number", itemNumber);
await sleep(Math.random() * 100); // Simulate async work
const result = `Item ${itemNumber} processed`;
span.setAttribute("output.value", result);
span.end();
return result;
});
});
}
async function mainAsyncOrchestration() {
// Start a main parent span
return await tracer.startActiveSpan("main_async_operation", async (parentSpan) => {
parentSpan.setAttribute("orchestrator", "Promise.all");
// Capture the context of the main_async_operation span
const contextToPropagate = context.active();
const itemsToProcess = [1, 2, 3, 4];
const processingPromises = itemsToProcess.map(item =>
processItem(item, contextToPropagate) // Pass the captured context to each task
);
const results = await Promise.all(processingPromises);
parentSpan.setAttribute("results.count", results.length);
parentSpan.end();
return results;
});
}
// Example usage:
// async function runExample() {
// // Minimal OTel SDK setup for console output
// const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
// const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
// const provider = new NodeTracerProvider();
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
// provider.register();
// const finalResults = await mainAsyncOrchestration();
// console.log("Async orchestration results:", finalResults);
// }
// runExample();
2. Creating Custom Decorators
Decorators offer a convenient way to instrument functions and methods across your codebase without repeatedly inserting tracing calls. A custom decorator can:
- Initiate a new span before the function call
- Add attributes/events with function arguments (inputs)
- Return the function’s result (outputs) and annotate or log it in the span
- Conclude the span
Example Decorator Implementation:
from opentelemetry import trace
import functools # Import functools for functools.wraps
def trace_function(span_kind=None, additional_attributes=None):
def decorator(func):
@functools.wraps(func) # Preserve function metadata
def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__, "0.1.0") # Added version for tracer
with tracer.start_as_current_span(func.__name__) as span:
if span_kind:
span.set_attribute("fi.span.kind", span_kind)
# Securely convert args and kwargs to string for attributes
try:
span.set_attribute("function.arguments", str(args))
span.set_attribute("function.keyword_arguments", str(kwargs))
except Exception as e:
span.set_attribute("function.arguments.error", str(e))
if additional_attributes:
for key, value in additional_attributes.items():
span.set_attribute(key, value)
result = func(*args, **kwargs)
try:
span.set_attribute("function.return_value", str(result))
except Exception as e:
span.set_attribute("function.return_value.error", str(e))
return result
return wrapper
return decorator
# Example Implementation
@trace_function(span_kind="LLM", additional_attributes={"llm.model_name": "gpt-4o"})
def process_text(text: str, verbose: bool = False):
if verbose:
print(f"Processing text: {text}")
return text.upper()
# if __name__ == "__main__":
# # Minimal OTel SDK setup for console output
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# print(process_text("hello world", verbose=True))
from opentelemetry import trace
import functools # Import functools for functools.wraps
def trace_function(span_kind=None, additional_attributes=None):
def decorator(func):
@functools.wraps(func) # Preserve function metadata
def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__, "0.1.0") # Added version for tracer
with tracer.start_as_current_span(func.__name__) as span:
if span_kind:
span.set_attribute("fi.span.kind", span_kind)
# Securely convert args and kwargs to string for attributes
try:
span.set_attribute("function.arguments", str(args))
span.set_attribute("function.keyword_arguments", str(kwargs))
except Exception as e:
span.set_attribute("function.arguments.error", str(e))
if additional_attributes:
for key, value in additional_attributes.items():
span.set_attribute(key, value)
result = func(*args, **kwargs)
try:
span.set_attribute("function.return_value", str(result))
except Exception as e:
span.set_attribute("function.return_value.error", str(e))
return result
return wrapper
return decorator
# Example Implementation
@trace_function(span_kind="LLM", additional_attributes={"llm.model_name": "gpt-4o"})
def process_text(text: str, verbose: bool = False):
if verbose:
print(f"Processing text: {text}")
return text.upper()
# if __name__ == "__main__":
# # Minimal OTel SDK setup for console output
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# print(process_text("hello world", verbose=True))
import { trace, Attributes, SpanStatusCode } from "@opentelemetry/api";
// Define a type for the function that will be decorated
type TraceableFunction<T extends any[], R> = (...args: T) => R;
interface TraceFunctionOptions {
spanKind?: string;
additionalAttributes?: Attributes;
}
function traceFunction<T extends any[], R>(
func: TraceableFunction<T, R>,
options?: TraceFunctionOptions
): TraceableFunction<T, R> {
const tracer = trace.getTracer("my-app-tracer-decorator", "0.1.0");
const funcName = func.name || "anonymous_function";
return (...args: T): R => {
return tracer.startActiveSpan(funcName, (span) => {
if (options?.spanKind) {
span.setAttribute("fi.span.kind", options.spanKind);
}
try {
span.setAttribute("function.arguments", JSON.stringify(args));
} catch (e) {
span.setAttribute("function.arguments.error", String(e));
}
if (options?.additionalAttributes) {
span.setAttributes(options.additionalAttributes);
}
try {
const result = func(...args);
try {
span.setAttribute("function.return_value", JSON.stringify(result));
} catch (e) {
span.setAttribute("function.return_value.error", String(e));
}
span.setStatus({ code: SpanStatusCode.OK });
span.end();
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
span.end();
throw error;
}
});
};
}
// Example Implementation
function processTextTs(text: string, verbose: boolean = false): string {
if (verbose) {
console.log(`TS Processing text: ${text}`);
}
return text.toUpperCase();
}
const tracedProcessText = traceFunction(processTextTs, {
spanKind: "LLM",
additionalAttributes: { "llm.model_name": "gpt-4o-ts" },
});
// Example usage:
// async function runDecoratorExample() {
// // Minimal OTel SDK setup for console output
// const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
// const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
// const provider = new NodeTracerProvider();
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
// provider.register();
// console.log(tracedProcessText("hello from typescript", true));
// }
// runDecoratorExample();
3. Selective Span Filtering Based on Attributes
In large-scale applications, recording every span may not be necessary. Instead, you might want to selectively sample:
- Spans from a specific service or component
- Spans meeting certain business criteria (e.g., user.id in a specific subset)
- Only error or slow spans
Creating a custom sampler allows you to dynamically control which spans are recorded/exported based on their attributes or names. This approach helps manage telemetry volume and cost while ensuring you capture the most relevant traces for debugging or analysis.
Basics of Custom Sampling
Sampler Interface
In OTEL Python, create a custom sampler by subclassing the Sampler
interface from opentelemetry.sdk.trace.sampling
. Implement:
should_sample(...)
- Determines whether the span is recorded (Sampled) or dropped (NotSampled)
- You can examine attributes, span name, span kind, parent context, etc.
Sampling Result
When implementing should_sample
, you must return a SamplingResult
, which indicates:
- Sampling Decision:
Decision.RECORD_AND_SAMPLE
,Decision.RECORD_ONLY
, orDecision.DROP
- Attributes: Optionally modify or add attributes in the returned
SamplingResult
(e.g., a reason for sampling)
Example:
from opentelemetry.context import Context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision, ParentBasedTraceIdRatio
from opentelemetry import trace
from opentelemetry.util.types import Attributes # For type hinting
USER_ID_TO_DROP = "user_to_skip_tracing"
class UserBasedSampler(Sampler):
# A custom sampler that drops any span having a `user.id` attribute matching
# a specified user ID. For other cases, it delegates to a root sampler.
def __init__(self, root_sampler: Sampler = ParentBasedTraceIdRatio(0.5)):
self._root_sampler = root_sampler
def should_sample(
self,
parent_context: Context,
trace_id: int,
name: str,
kind, # SpanKind is implicitly an int here
attributes: Attributes,
links
) -> SamplingResult:
user_id = attributes.get("user.id") if attributes else None
if user_id == USER_ID_TO_DROP:
return SamplingResult(
decision=Decision.DROP,
attributes={"sampler.reason": f"Dropping span for user.id={user_id}"}
)
else:
return self._root_sampler.should_sample(parent_context, trace_id, name, kind, attributes, links)
def get_description(self) -> str:
return f"UserBasedSampler(root_sampler={self._root_sampler.get_description()})"
# Example usage:
# if __name__ == "__main__":
# custom_sampler = UserBasedSampler(root_sampler=ParentBasedTraceIdRatio(1.0))
# provider = TracerProvider(sampler=custom_sampler)
# trace.set_tracer_provider(provider)
# provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# tracer = trace.get_tracer(__name__, "0.1.0")
# with tracer.start_as_current_span("op_for_dropped_user", attributes={"user.id": USER_ID_TO_DROP}): pass
# with tracer.start_as_current_span("op_for_sampled_user", attributes={"user.id": "another_user"}): pass
# with tracer.start_as_current_span("op_without_user_id"): pass
from opentelemetry.context import Context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision, ParentBasedTraceIdRatio
from opentelemetry import trace
from opentelemetry.util.types import Attributes # For type hinting
USER_ID_TO_DROP = "user_to_skip_tracing"
class UserBasedSampler(Sampler):
# A custom sampler that drops any span having a `user.id` attribute matching
# a specified user ID. For other cases, it delegates to a root sampler.
def __init__(self, root_sampler: Sampler = ParentBasedTraceIdRatio(0.5)):
self._root_sampler = root_sampler
def should_sample(
self,
parent_context: Context,
trace_id: int,
name: str,
kind, # SpanKind is implicitly an int here
attributes: Attributes,
links
) -> SamplingResult:
user_id = attributes.get("user.id") if attributes else None
if user_id == USER_ID_TO_DROP:
return SamplingResult(
decision=Decision.DROP,
attributes={"sampler.reason": f"Dropping span for user.id={user_id}"}
)
else:
return self._root_sampler.should_sample(parent_context, trace_id, name, kind, attributes, links)
def get_description(self) -> str:
return f"UserBasedSampler(root_sampler={self._root_sampler.get_description()})"
# Example usage:
# if __name__ == "__main__":
# custom_sampler = UserBasedSampler(root_sampler=ParentBasedTraceIdRatio(1.0))
# provider = TracerProvider(sampler=custom_sampler)
# trace.set_tracer_provider(provider)
# provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
# tracer = trace.get_tracer(__name__, "0.1.0")
# with tracer.start_as_current_span("op_for_dropped_user", attributes={"user.id": USER_ID_TO_DROP}): pass
# with tracer.start_as_current_span("op_for_sampled_user", attributes={"user.id": "another_user"}): pass
# with tracer.start_as_current_span("op_without_user_id"): pass
import { Context, Link, SpanAttributes, SpanKind, trace } from "@opentelemetry/api";
import { Sampler, SamplingDecision, SamplingResult, ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { SimpleSpanProcessor, ConsoleSpanExporter } from "@opentelemetry/sdk-trace-base";
const USER_ID_TO_DROP_TS = "user_to_skip_tracing_ts";
class UserBasedSamplerTs implements Sampler {
private _rootSampler: Sampler;
constructor(rootSampler?: Sampler) {
// Default to a ParentBased sampler that samples 50% of traces if no root is provided.
this._rootSampler = rootSampler ?? new ParentBasedSampler({ root: new TraceIdRatioBasedSampler(0.5) });
}
shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: SpanAttributes,
links: Link[]
): SamplingResult {
const userId = attributes["user.id"];
if (userId === USER_ID_TO_DROP_TS) {
return {
decision: SamplingDecision.DROP,
attributes: { ...attributes, "sampler.reason": `Dropping span for user.id=${userId}` }
};
}
return this._rootSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}
toString(): string {
return `UserBasedSamplerTs(rootSampler=${this._rootSampler.toString()})`;
}
}
// Example usage:
// async function runSamplerExample() {
// const customSamplerTs = new UserBasedSamplerTs(
// new ParentBasedSampler({ root: new TraceIdRatioBasedSampler(1.0) }) // Sample all non-dropped
// );
// const provider = new NodeTracerProvider({ sampler: customSamplerTs });
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
// provider.register();
// const tracer = trace.getTracer("my-app-sampler-example", "0.1.0");
// tracer.startActiveSpan("op_for_dropped_user_ts", { attributes: { "user.id": USER_ID_TO_DROP_TS } }, (span) => {
// console.log("This span (dropped user) should not appear in console.");
// span.end();
// });
// tracer.startActiveSpan("op_for_sampled_user_ts", { attributes: { "user.id": "another_user_ts" } }, (span) => {
// console.log("This span (sampled user) should appear in console.");
// span.end();
// });
// tracer.startActiveSpan("op_without_user_id_ts", (span) => {
// console.log("This span (no user) should appear in console.");
// span.end();
// });
// }
// runSamplerExample();
You then pass your custom sampler into your tracer provider.
Was this page helpful?