Exploring Manual Context Propagation, Custom Decorators, and Sampling Techniques

This documentation provides advanced use cases and examples, such as manual context propagation, custom decorators, and sampling , filtering methods. These examples cater to real-world requirements like asynchronous execution, multi-service interactions, and specialized exporters or decorators for observability platforms like Future AGI.

1. Manual Context Propagation

In OpenTelemetry, context propagation ensures that the current tracing context (i.e., the active span and its metadata) is maintained across thread, task, or process switches. This is crucial when your code involves asynchronous tasks or spans multiple microservices.

Typically, OTEL instrumentation libraries manage context propagation automatically. However, manual intervention is sometimes necessary, especially in asynchronous workflows or custom instrumentation scenarios.

Handling Propagation in Async Functions

For Python async/await code, you might need to manually pass context if automated instrumentation doesn’t suffice or if custom logic is involved. The process involves:

  1. Extracting the current context (e.g., from an incoming HTTP request)
  2. Creating a new span as a child of that context
  3. Embedding the context into the async function for reuse
import asyncio
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current

tracer = trace.get_tracer(__name__)

async def async_func(ctx):
    token = attach(ctx)
    try:
        current_span = trace.get_current_span()
        current_span.set_attribute("input.value", User Input)
        await asyncio.sleep(1)  # Simulate async work
    finally:
        detach(token)

def sync_func():
    with tracer.start_as_current_span("sync_span") as span:
        # Capture the current context 
        context = get_current()
        # Run the async function, passing the context
        asyncio.run(async_func(context))

if __name__ == "__main__":
    sync_func()

Propagation Between Microservices

When making HTTP or gRPC calls to another microservice, the current tracing context is usually propagated through HTTP headers. Built-in instrumentation (like opentelemetry-instrumentation-requests or opentelemetry-instrumentation-httpx) handles this automatically. For a custom approach, follow these steps:

  1. Inject the current span context into HTTP headers before sending the request
  2. Extract the context from incoming headers on the receiving microservice

Example: Service A sends a request to Service B.

Service A:

import requests
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.propagators.textmap import CarrierT, DefaultTextMapPropagator

tracer = trace.get_tracer(__name__)

def make_request_to_service_b():
    # Start a new span for this operation
    with tracer.start_as_current_span("llm_service_a") as span:
        # Prepare headers
        headers = {}
        DefaultTextMapPropagator().inject(carrier=headers)  # Inject the current context

        # Make the request with the injected headers
        response = requests.get("http://service-b:5000/endpoint", headers=headers)
        return response.text

Service B:

from flask import Flask, request
from opentelemetry import trace
from opentelemetry.propagators.textmap import DefaultTextMapPropagator

app = Flask(__name__)
tracer = trace.get_tracer(__name__)

@app.route("/endpoint")
def endpoint():
    # Extract the context from incoming request
    context = DefaultTextMapPropagator().extract(carrier=dict(request.headers))

    # Create a new span as child
    with tracer.start_as_current_span("service_b_processing", context=context) as span:
        span.add_event("Received request in service B")
        # ... do some processing ...
        return "Hello from Service B"

Propagation with Concurrent Threads

When tasks are submitted to a ThreadPoolExecutor or any concurrency mechanism, each task runs in a separate thread. The tracer’s current context (which stores the active span or baggage) doesn’t automatically follow tasks to worker threads. By capturing the context in the main thread and attaching it in each worker thread, you maintain the association between tasks and the original trace context.

Example: Below is a detailed, annotated example to show how you can:

  1. Capture the current context before submitting tasks to the executor.

  2. Attach that context within each worker thread (using attach).

  3. Run your task logic (e.g., processing questions).

  4. Detach the context when the task is complete (using detach).

import concurrent.futures
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current

tracer = trace.get_tracer(__name__)

def func1():
    """
    Some example work done in a thread.
    """
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", User Input)
    return "func1 result"

def func2():
    """
    Another example function that logs an event to the current span.
    """
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", User Input2)
    return "func2 result"
    
def wrapped_func(func: Callable):
    """
    Demonstrates how to capture the current context in the main thread,
    and attach/detach it within each worker thread. 
    Wraps the original function to attach/detach the current context
    so the worker thread has the correct span context.
    """
    # Capture the context from the current thread
    main_context = get_current()
    def wrapper():
        token = attach(main_context)  # Attach context to this thread
        try:
            return func()
        finally:
            detach(token)              # Detach after finishing
    return wrapper

# Create a list of functions to be executed in parallel
funcs = [func1, func2, func1, func2]

with concurrent.futures.ThreadPoolExecutor() as executor:
    # Map each function to its wrapped version
    results = list(executor.map(lambda f: wrapped_func(f)(), funcs))

return results

2. Creating Custom Decorators

Decorators offer a convenient way to instrument functions and methods across your codebase without repeatedly inserting tracing calls. A custom decorator can:

  • Initiate a new span before the function call
  • Add attributes/events with function arguments (inputs)
  • Return the function’s result (outputs) and annotate or log it in the span
  • Conclude the span

Example Decorator Implementation:

from opentelemetry import trace

def trace_function(span_kind=None, additional_attributes=None):
    tracer = trace.get_tracer(__name__)
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(func.__name__) as span:
               if span_kind:
                   span.set_attribute("fi.span.kind", span_kind)

               span.set_attribute("input.value", str(args))

               if additional_attributes:
                   for key, value in additional_attributes.items():
                       span.set_attribute(key, value)
              
               result = func(*args, **kwargs)
               span.set_attribute("output.value", str(result))
               return result
    return wrapper

# Example Implementation
@trace_function(span_kind="LLM", additional_attributes={"llm.model_name": "gpt-4o"})
def process_text(text):
    return text.upper()

3. Selective Span Filtering Based on Attributes

In large-scale applications, recording every span may not be necessary. Instead, you might want to selectively sample:

  • Spans from a specific service or component
  • Spans meeting certain business criteria (e.g., user.id in a specific subset)
  • Only error or slow spans

Creating a custom sampler allows you to dynamically control which spans are recorded/exported based on their attributes or names. This approach helps manage telemetry volume and cost while ensuring you capture the most relevant traces for debugging or analysis.

Basics of Custom Sampling

Sampler Interface

In OTEL Python, create a custom sampler by subclassing the Sampler interface from opentelemetry.sdk.trace.sampling. Implement:

  • should_sample(...)
    • Determines whether the span is recorded (Sampled) or dropped (NotSampled)
    • You can examine attributes, span name, span kind, parent context, etc.

Sampling Result

When implementing should_sample, you must return a SamplingResult, which indicates:

  • Sampling Decision: Decision.RECORD_AND_SAMPLE, Decision.RECORD_ONLY, or Decision.DROP
  • Attributes: Optionally modify or add attributes in the returned SamplingResult (e.g., a reason for sampling)

Example:

from opentelemetry.context import Context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
from opentelemetry import trace

class UserBasedSampler(Sampler):
    """
    A custom sampler that drops any span having a `user.id` attribute matching
    a specified user ID. Otherwise, spans are recorded and sampled.
    """
    def should_sample(
        self,
        parent_context: Context,
        trace_id: int,
        name: str,
        kind,
        attributes: dict,
        links
    ) -> SamplingResult:
        user_id = attributes.get("user.id")
        if user_id == USER_ID_TO_DROP:
            # If this user matches the one we want to drop, do not sample
            return SamplingResult(
                decision=Decision.DROP,
                attributes={"sampler.reason": f"Dropping span for user.id={user_id}"}
            )
        else:
            # Otherwise, record and sample normally
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes={}
            )

You then pass your custom sampler into your tracer provider.

tracer_provider = TracerProvider(sampler=UserBasedSampler())