Advanced Tracing (OTEL)

Explore manual context propagation, custom decorators, and sampling techniques for real-world async, multi-service, and high-volume tracing scenarios.

What it is

Advanced Tracing covers OpenTelemetry patterns that go beyond basic span creation. You’ll learn how to propagate trace context manually across async tasks, threads, and microservices; build custom decorators that instrument functions without modifying their logic; and write custom samplers that selectively record or drop spans based on attributes like user ID or span name.

Use cases

  • Async tracing — Manually pass and attach context in Python async/await or JS Promise-based code where automated propagation doesn’t suffice.
  • Multi-service tracing — Inject and extract trace context from HTTP headers to link spans across microservices into a single distributed trace.
  • Concurrent thread tracing — Capture context in the main thread and propagate it to worker threads so all tasks remain associated with the parent trace.
  • Function-level instrumentation — Write a custom decorator that starts a span, records inputs and outputs, and ends the span — without touching the function body.
  • Selective sampling — Drop spans for specific users or conditions to reduce telemetry volume and cost while keeping high-value traces.

How to

Choose the propagation scenario that matches your architecture.

For Python async/await code, capture the current context before entering an async function and attach it inside so the active span is accessible.

import asyncio
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current

tracer = trace.get_tracer(__name__)

async def async_func(ctx):
    token = attach(ctx)
    try:
        current_span = trace.get_current_span()
        current_span.set_attribute("input.value", "User Input") # Corrected attribute key
        await asyncio.sleep(1)  # Simulate async work
    finally:
        detach(token)

def sync_func():
    with tracer.start_as_current_span("sync_span") as span:
        # Capture the current context
        context = get_current()
        # Run the async function, passing the context
        asyncio.run(async_func(context))

if __name__ == "__main__":
    sync_func()
import { trace, context, Context } from "@opentelemetry/api";
import { promisify } from "util";

const sleep = promisify(setTimeout);
const tracer = trace.getTracer("my-app-tracer");

async function asyncFunc(ctx: Context): Promise<void> {
    // context.with ensures the passed context is active within this function's scope.
    await context.with(ctx, async () => {
        const currentSpan = trace.getSpan(context.active());
        if (currentSpan) {
            currentSpan.setAttribute("input.value", "User Input from TS");
        }
        await sleep(1000); // Simulate async work
    });
}

async function syncFunc(): Promise<void> {
    // Start a parent span
    await tracer.startActiveSpan("sync_span", async (span) => {
        // Capture the current context (which includes sync_span)
        const currentActiveContext = context.active();
        // Run the async function, passing the captured context
        await asyncFunc(currentActiveContext);
        span.end();
    });
}

// To run the example:
// syncFunc().then(() => console.log("Trace example completed."));

When making HTTP calls to another microservice, inject the current trace context into request headers in Service A and extract it in Service B to link spans across services.

Service A — inject context into outgoing request headers:

import requests
from opentelemetry import trace
# from opentelemetry.context import Context # Not strictly needed for inject but good for awareness
from opentelemetry.propagators.textmap import DefaultTextMapPropagator

tracer = trace.get_tracer(__name__)

def make_request_to_service_b():
    # Start a new span for this operation
    with tracer.start_as_current_span("llm_service_a") as span:
        # Prepare headers
        headers = {}
        DefaultTextMapPropagator().inject(carrier=headers)  # Inject the current context

        # Make the request with the injected headers
        response = requests.get("http://localhost:5001/endpoint", headers=headers) # Assuming Python Service B runs on 5001
        return response.text

# Example usage (ensure Service B is running and OTel SDK is configured for console output):
# if __name__ == "__main__":
#     from opentelemetry.sdk.trace import TracerProvider
#     from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
#     trace.set_tracer_provider(TracerProvider())
#     trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
#     print(make_request_to_service_b())
import { trace, context, propagation, SpanStatusCode } from "@opentelemetry/api";
import { HttpTraceContextPropagator } from "@opentelemetry/core";
import fetch from "node-fetch"; // yarn add node-fetch @types/node-fetch

const tracer = trace.getTracer("my-service-a-tracer");

// It's common to set this globally once for an application.
propagation.setGlobalPropagator(new HttpTraceContextPropagator());

async function makeRequestToServiceB(): Promise<string> {
    return await tracer.startActiveSpan("typescript_llm_service_a", async (span) => {
        const headers: Record<string, string> = {};
        propagation.inject(context.active(), headers);

        try {
            const response = await fetch("http://localhost:5002/ts-endpoint", { headers }); // Assuming TS Service B on 5002
            if (!response.ok) {
                span.setStatus({ code: SpanStatusCode.ERROR, message: `HTTP error! status: ${response.status}` });
                throw new Error(`HTTP error! status: ${response.status}`);
            }
            const data = await response.text();
            span.setStatus({ code: SpanStatusCode.OK });
            return data;
        } catch (error) {
            span.recordException(error as Error);
            span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
            throw error;
        } finally {
            span.end();
        }
    });
}

// Example usage (ensure Service B is running and OTel SDK is configured):
// async function main() {
//     // Minimal OTel SDK setup for console output
//     const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
//     const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
//     const provider = new NodeTracerProvider();
//     provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
//     provider.register();
//     try {
//         const response = await makeRequestToServiceB();
//         console.log("Response from Service B:", response);
//     } catch (err) {
//         console.error("Error making request:", err);
//     }
// }
// main();

Service B — extract context from incoming request headers:

from flask import Flask, request
from opentelemetry import trace
from opentelemetry.propagators.textmap import DefaultTextMapPropagator
# Minimal OTel setup for console output if not already configured globally
# from opentelemetry.sdk.trace import TracerProvider
# from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# trace.set_tracer_provider(TracerProvider())
# trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))

app = Flask(__name__)
tracer = trace.get_tracer("my-service-b-tracer") # Corrected tracer name from __name__ for clarity

@app.route("/endpoint")
def endpoint():
    # Extract the context from incoming request
    context_from_propagator = DefaultTextMapPropagator().extract(carrier=dict(request.headers))

    # Create a new span as child
    with tracer.start_as_current_span("python_service_b_processing", context=context_from_propagator) as span:
        span.add_event("Received request in Python Service B")
        # ... do some processing ...
        return "Hello from Python Service B"

# if __name__ == "__main__":
#     app.run(port=5001) # Assuming Python Service B runs on 5001
import { trace, context, propagation, SpanStatusCode } from "@opentelemetry/api";
import { HttpTraceContextPropagator } from "@opentelemetry/core";
import express, { Request, Response } from 'express'; // yarn add express @types/express

const tracer = trace.getTracer("my-ts-service-b-tracer");

// Ensure the same propagator is used as in Service A.
// If not set globally in Service A, ensure it's configured here or use a globally set one.
// propagation.setGlobalPropagator(new HttpTraceContextPropagator()); // Usually set globally once.

const app = express();
const port = 5002; // Assuming TS Service B runs on 5002

app.get('/ts-endpoint', (req: Request, res: Response) => {
    const parentContext = propagation.extract(context.active(), req.headers);

    tracer.startActiveSpan("typescript_service_b_processing", { context: parentContext }, (span) => {
        try {
            span.addEvent("Received request in Typescript Service B");
            // ... do some processing ...
            res.send("Hello from Typescript Service B");
            span.setStatus({ code: SpanStatusCode.OK });
        } catch (error) {
            span.recordException(error as Error);
            span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
            res.status(500).send("Error processing request");
        } finally {
            span.end();
        }
    });
});

// Example OTel SDK setup for console output before starting server:
// async function startServer() {
//     // Minimal OTel SDK setup for console output
//     const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
//     const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
//     const provider = new NodeTracerProvider();
//     provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
//     provider.register();

//     app.listen(port, () => {
//         console.log(`Typescript Service B listening on http://localhost:${port}`);
//     });
// }
// startServer();

When tasks run in a ThreadPoolExecutor or via Promise.all, capture the context in the main thread and attach it in each worker so all tasks remain linked to the parent span.

import concurrent.futures
from opentelemetry import trace
from opentelemetry.context import attach, detach, get_current
from typing import Callable # Added for type hint

tracer = trace.get_tracer(__name__)

def func1():
    # Some example work done in a thread.
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", "User Input from func1") # Corrected attribute value
    return "func1 result"

def func2():
    # Another example function that logs an event to the current span.
    current_span = trace.get_current_span()
    current_span.set_attribute("input.value", "User Input from func2") # Corrected attribute value
    return "func2 result"

def wrapped_func(func: Callable, main_context):
    # Wraps the original function to attach/detach the captured context
    # so the worker thread has the correct span context.
    def wrapper():
        token = attach(main_context)  # Attach context to this thread
        try:
            return func()
        finally:
            detach(token)              # Detach after finishing
    return wrapper

# Example main execution logic:
# def main_concurrent_execution():
#     with tracer.start_as_current_span("main_operation") as parent_span:
#         parent_span.set_attribute("orchestrator", "ThreadPoolExecutor")
#         # Capture the context from the current thread (main_operation's context)
#         main_context_to_propagate = get_current()

#         # Create a list of functions to be executed in parallel
#         funcs_to_run = [func1, func2, func1, func2]
#         results = []

#         with concurrent.futures.ThreadPoolExecutor() as executor:
#             # Map each function to its wrapped version, passing the captured context
#             futures = [executor.submit(wrapped_func(f, main_context_to_propagate)) for f in funcs_to_run]
#             for future in concurrent.futures.as_completed(futures):
#                 results.append(future.result())
#         parent_span.set_attribute("results.count", len(results))
#     return results

# if __name__ == "__main__":
#     # Minimal OTel SDK setup for console output
#     from opentelemetry.sdk.trace import TracerProvider
#     from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
#     trace.set_tracer_provider(TracerProvider())
#     trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
#     final_results = main_concurrent_execution()
#     print(f"Concurrent execution results: {final_results}")

# The original return results was outside a function, wrapped it in main_concurrent_execution for clarity
import { trace, context, Context } from "@opentelemetry/api";
import { promisify } from "util";

const sleep = promisify(setTimeout);
const tracer = trace.getTracer("my-app-tracer-concurrent");

async function processItem(itemNumber: number, parentCtx: Context): Promise<string> {
    // Use context.with to ensure operations run within the parentCtx
    return await context.with(parentCtx, async () => {
        // This new span will be a child of the span in parentCtx (e.g., "main_async_operation")
        return await tracer.startActiveSpan(`process_item_${itemNumber}`, async (span) => {
            span.setAttribute("item.number", itemNumber);
            await sleep(Math.random() * 100); // Simulate async work
            const result = `Item ${itemNumber} processed`;
            span.setAttribute("output.value", result);
            span.end();
            return result;
        });
    });
}

async function mainAsyncOrchestration() {
    // Start a main parent span
    return await tracer.startActiveSpan("main_async_operation", async (parentSpan) => {
        parentSpan.setAttribute("orchestrator", "Promise.all");

        // Capture the context of the main_async_operation span
        const contextToPropagate = context.active();

        const itemsToProcess = [1, 2, 3, 4];
        const processingPromises = itemsToProcess.map(item =>
            processItem(item, contextToPropagate) // Pass the captured context to each task
        );

        const results = await Promise.all(processingPromises);
        parentSpan.setAttribute("results.count", results.length);
        parentSpan.end();
        return results;
    });
}

// Example usage:
// async function runExample() {
//     // Minimal OTel SDK setup for console output
//     const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
//     const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
//     const provider = new NodeTracerProvider();
//     provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
//     provider.register();

//     const finalResults = await mainAsyncOrchestration();
//     console.log("Async orchestration results:", finalResults);
// }
// runExample();

A custom decorator starts a span before the function call, records function arguments and return values as span attributes, and ends the span — without modifying the function body.

from opentelemetry import trace
import functools # Import functools for functools.wraps

def trace_function(span_kind=None, additional_attributes=None):
    def decorator(func):
        @functools.wraps(func) # Preserve function metadata
        def wrapper(*args, **kwargs):
            tracer = trace.get_tracer(__name__, "0.1.0") # Added version for tracer
            with tracer.start_as_current_span(func.__name__) as span:
                if span_kind:
                    span.set_attribute("fi.span.kind", span_kind)

                # Securely convert args and kwargs to string for attributes
                try:
                    span.set_attribute("function.arguments", str(args))
                    span.set_attribute("function.keyword_arguments", str(kwargs))
                except Exception as e:
                    span.set_attribute("function.arguments.error", str(e))

                if additional_attributes:
                    for key, value in additional_attributes.items():
                        span.set_attribute(key, value)

                result = func(*args, **kwargs)
                try:
                    span.set_attribute("function.return_value", str(result))
                except Exception as e:
                    span.set_attribute("function.return_value.error", str(e))
                return result
        return wrapper
    return decorator

# Example Implementation
@trace_function(span_kind="LLM", additional_attributes={"llm.model_name": "gpt-4o"})
def process_text(text: str, verbose: bool = False):
    if verbose:
        print(f"Processing text: {text}")
    return text.upper()

# if __name__ == "__main__":
#     # Minimal OTel SDK setup for console output
#     from opentelemetry.sdk.trace import TracerProvider
#     from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
#     trace.set_tracer_provider(TracerProvider())
#     trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
#     print(process_text("hello world", verbose=True))
import { trace, Attributes, SpanStatusCode } from "@opentelemetry/api";

// Define a type for the function that will be decorated
type TraceableFunction<T extends any[], R> = (...args: T) => R;

interface TraceFunctionOptions {
    spanKind?: string;
    additionalAttributes?: Attributes;
}

function traceFunction<T extends any[], R>(
    func: TraceableFunction<T, R>,
    options?: TraceFunctionOptions
): TraceableFunction<T, R> {
    const tracer = trace.getTracer("my-app-tracer-decorator", "0.1.0");
    const funcName = func.name || "anonymous_function";

    return (...args: T): R => {
        return tracer.startActiveSpan(funcName, (span) => {
            if (options?.spanKind) {
                span.setAttribute("fi.span.kind", options.spanKind);
            }
            try {
                span.setAttribute("function.arguments", JSON.stringify(args));
            } catch (e) {
                span.setAttribute("function.arguments.error", String(e));
            }
            if (options?.additionalAttributes) {
                span.setAttributes(options.additionalAttributes);
            }

            try {
                const result = func(...args);
                try {
                    span.setAttribute("function.return_value", JSON.stringify(result));
                } catch (e) {
                    span.setAttribute("function.return_value.error", String(e));
                }
                span.setStatus({ code: SpanStatusCode.OK });
                span.end();
                return result;
            } catch (error) {
                span.recordException(error as Error);
                span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
                span.end();
                throw error;
            }
        });
    };
}

// Example Implementation
function processTextTs(text: string, verbose: boolean = false): string {
    if (verbose) {
        console.log(`TS Processing text: ${text}`);
    }
    return text.toUpperCase();
}

const tracedProcessText = traceFunction(processTextTs, {
    spanKind: "LLM",
    additionalAttributes: { "llm.model_name": "gpt-4o-ts" },
});

// Example usage:
// async function runDecoratorExample() {
//     // Minimal OTel SDK setup for console output
//     const { NodeTracerProvider } = await import('@opentelemetry/sdk-trace-node');
//     const { ConsoleSpanExporter, SimpleSpanProcessor } = await import('@opentelemetry/sdk-trace-base');
//     const provider = new NodeTracerProvider();
//     provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
//     provider.register();

//     console.log(tracedProcessText("hello from typescript", true));
// }
// runDecoratorExample();

Create a custom sampler by subclassing the Sampler interface and implementing should_sample(). Return Decision.DROP for spans you want to discard, or delegate to a root sampler for everything else. Pass the custom sampler to your tracer provider.

from opentelemetry.context import Context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision, ParentBasedTraceIdRatio
from opentelemetry import trace
from opentelemetry.util.types import Attributes # For type hinting

USER_ID_TO_DROP = "user_to_skip_tracing"

class UserBasedSampler(Sampler):
    # A custom sampler that drops any span having a `user.id` attribute matching
    # a specified user ID. For other cases, it delegates to a root sampler.

    def __init__(self, root_sampler: Sampler = ParentBasedTraceIdRatio(0.5)):
        self._root_sampler = root_sampler

    def should_sample(
        self,
        parent_context: Context,
        trace_id: int,
        name: str,
        kind, # SpanKind is implicitly an int here
        attributes: Attributes,
        links
    ) -> SamplingResult:
        user_id = attributes.get("user.id") if attributes else None
        if user_id == USER_ID_TO_DROP:
            return SamplingResult(
                decision=Decision.DROP,
                attributes={"sampler.reason": f"Dropping span for user.id={user_id}"}
            )
        else:
            return self._root_sampler.should_sample(parent_context, trace_id, name, kind, attributes, links)

    def get_description(self) -> str:
        return f"UserBasedSampler(root_sampler={self._root_sampler.get_description()})"

# Example usage:
# if __name__ == "__main__":
#     custom_sampler = UserBasedSampler(root_sampler=ParentBasedTraceIdRatio(1.0))
#     provider = TracerProvider(sampler=custom_sampler)
#     trace.set_tracer_provider(provider)
#     provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
#     tracer = trace.get_tracer(__name__, "0.1.0")
#     with tracer.start_as_current_span("op_for_dropped_user", attributes={"user.id": USER_ID_TO_DROP}): pass
#     with tracer.start_as_current_span("op_for_sampled_user", attributes={"user.id": "another_user"}): pass
#     with tracer.start_as_current_span("op_without_user_id"): pass
import { Context, Link, SpanAttributes, SpanKind, trace } from "@opentelemetry/api";
import { Sampler, SamplingDecision, SamplingResult, ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { SimpleSpanProcessor, ConsoleSpanExporter } from "@opentelemetry/sdk-trace-base";

const USER_ID_TO_DROP_TS = "user_to_skip_tracing_ts";

class UserBasedSamplerTs implements Sampler {
    private _rootSampler: Sampler;

    constructor(rootSampler?: Sampler) {
        // Default to a ParentBased sampler that samples 50% of traces if no root is provided.
        this._rootSampler = rootSampler ?? new ParentBasedSampler({ root: new TraceIdRatioBasedSampler(0.5) });
    }

    shouldSample(
        context: Context,
        traceId: string,
        spanName: string,
        spanKind: SpanKind,
        attributes: SpanAttributes,
        links: Link[]
    ): SamplingResult {
        const userId = attributes["user.id"];
        if (userId === USER_ID_TO_DROP_TS) {
            return {
                decision: SamplingDecision.DROP,
                attributes: { ...attributes, "sampler.reason": `Dropping span for user.id=${userId}` }
            };
        }
        return this._rootSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
    }

    toString(): string {
        return `UserBasedSamplerTs(rootSampler=${this._rootSampler.toString()})`;
    }
}

// Example usage:
// async function runSamplerExample() {
//     const customSamplerTs = new UserBasedSamplerTs(
//         new ParentBasedSampler({ root: new TraceIdRatioBasedSampler(1.0) }) // Sample all non-dropped
//     );
//     const provider = new NodeTracerProvider({ sampler: customSamplerTs });
//     provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()));
//     provider.register();

//     const tracer = trace.getTracer("my-app-sampler-example", "0.1.0");

//     tracer.startActiveSpan("op_for_dropped_user_ts", { attributes: { "user.id": USER_ID_TO_DROP_TS } }, (span) => {
//         console.log("This span (dropped user) should not appear in console.");
//         span.end();
//     });

//     tracer.startActiveSpan("op_for_sampled_user_ts", { attributes: { "user.id": "another_user_ts" } }, (span) => {
//         console.log("This span (sampled user) should appear in console.");
//         span.end();
//     });

//     tracer.startActiveSpan("op_without_user_id_ts", (span) => {
//         console.log("This span (no user) should appear in console.");
//         span.end();
//     });
// }
// runSamplerExample();

Key concepts

  • attach() / detach() — Python functions to manually bind a captured context to the current thread or async task. Always call detach(token) in a finally block to avoid context leaks.
  • context.with(ctx, fn) — JS/TS equivalent of attach/detach. Runs fn with the specified context active, then restores the previous context automatically.
  • propagation.inject() / propagation.extract() — Serialize the current trace context into HTTP headers (inject) and deserialize it from incoming headers (extract) to link spans across services.
  • Custom decorators — Wrap functions with span start/end logic so every call is traced automatically. Use functools.wraps in Python to preserve the original function’s metadata.
  • Sampler interface — Implement should_sample() (Python) or shouldSample() (JS/TS) to return DROP, RECORD_ONLY, or RECORD_AND_SAMPLE based on span name, kind, or attributes.
  • SamplingResult — The object returned by a sampler. Set decision to control recording and optionally attach additional attributes (e.g., a sampling reason).

What you can do next

Was this page helpful?

Questions & Discussion