Overview

This integration provides support for using OpenTelemetry with Pipecat applications. It enables tracing and monitoring of voice applications built with Pipecat, with automatic attribute mapping to Future AGI conventions.

1. Installation

Install the traceAI Pipecat package:
pip install traceAI-pipecat pipecat-ai[tracing]

2. Set Environment Variables

Set up your environment variables to authenticate with FutureAGI and Pipecat:
import os

os.environ["FI_API_KEY"] = FI_API_KEY
os.environ["FI_SECRET_KEY"] = FI_SECRET_KEY

3. Initialize Trace Provider

Set up the trace provider to establish the observability pipeline:
from fi_instrumentation.otel import register, Transport, ProjectType

trace_provider = register(
    project_type=ProjectType.OBSERVE,
    project_name="Pipecat Voice App",
    set_global_tracer_provider=True,
)

4. Enable Attribute Mapping

Enable attribute mapping to convert Pipecat attributes to Future AGI conventions. This method automatically updates your existing span exporters:
from traceai_pipecat import enable_http_attribute_mapping

# For HTTP transport
success = enable_http_attribute_mapping()

5. Initialize The Pipecat Application

Initialize the Pipecat application with the trace provider:
Enabling Tracing in Pipecat requires you to set the enable_tracing flag to True in the PipelineParams object. refer to this link for more details.
import os

from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    logger.info(f"Starting bot")

    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
    )

    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

    messages = [
        {
            "role": "system",
            "content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
        },
    ]

    context = OpenAILLMContext(messages)
    context_aggregator = llm.create_context_aggregator(context)

    rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

    pipeline = Pipeline(
        [
            transport.input(),  # Transport user input
            rtvi,  # RTVI processor
            stt,
            context_aggregator.user(),  # User responses
            llm,  # LLM
            tts,  # TTS
            transport.output(),  # Transport bot output
            context_aggregator.assistant(),  # Assistant spoken responses
        ]
    )

    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            enable_metrics=True,
            enable_usage_metrics=True,
        ),
        enable_tracing=True,
        enable_turn_tracking=True,
        conversation_id="customer-123",
        additional_span_attributes={"session.id": "abc-123"},
        observers=[RTVIObserver(rtvi)],
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        logger.info(f"Client connected")
        # Kick off the conversation.
        messages.append(
            {"role": "system", "content": "Say hello and briefly introduce yourself."}
        )
        await task.queue_frames([context_aggregator.user().get_context_frame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        logger.info(f"Client disconnected")
        await task.cancel()

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    """Main bot entry point for the bot starter."""

    transport = SmallWebRTCTransport(
        params=TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
        ),
        webrtc_connection=runner_args.webrtc_connection,
    )

    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main

    main()


Features

Automatic Attribute Mapping

The integration automatically maps Pipecat-specific attributes to Future AGI conventions:
  • LLM Operations: Maps gen_ai.system, gen_ai.request.model to llm.provider, llm.model_name
  • Input/Output: Maps input, output, transcript to structured Future AGI format
  • Token Usage: Maps gen_ai.usage.* to llm.token_count.*
  • Tools: Maps tool-related attributes to Future AGI tool conventions
  • Session Data: Maps conversation and session information
  • Metadata: Consolidates miscellaneous attributes into structured metadata

Transport Support

  • HTTP: Full support for HTTP transport with automatic endpoint detection
  • gRPC: Support for gRPC transport (requires fi-instrumentation[grpc])

Span Kind Detection

Automatically determines the appropriate fi.span.kind based on span attributes:
  • LLM: For LLM, STT, and TTS operations
  • TOOL: For tool calls and results
  • AGENT: For setup and configuration spans
  • CHAIN: For turn and conversation spans

API Reference

Integration Functions

enable_fi_attribute_mapping(transport: Transport = Transport.HTTP) -> bool

Install attribute mapping by replacing existing span exporters. Parameters:
  • transport: Transport protocol enum (Transport.HTTP or Transport.GRPC)
Returns:
  • bool: True if at least one exporter was replaced

enable_http_attribute_mapping() -> bool

Convenience function for HTTP transport.

enable_grpc_attribute_mapping() -> bool

Convenience function for gRPC transport.

Exporter Creation Functions

create_mapped_http_exporter(endpoint: Optional[str] = None, headers: Optional[dict] = None)

Create a new HTTP exporter with Pipecat attribute mapping.

create_mapped_grpc_exporter(endpoint: Optional[str] = None, headers: Optional[dict] = None)

Create a new gRPC exporter with Pipecat attribute mapping.

Exporter Classes

MappedHTTPSpanExporter

HTTP span exporter that maps Pipecat attributes to Future AGI conventions.

MappedGRPCSpanExporter

gRPC span exporter that maps Pipecat attributes to Future AGI conventions.

BaseMappedSpanExporter

Base class for mapped span exporters.

Troubleshooting

Common Issues

  1. No exporters found to replace
    • Ensure you’ve called register() before installing attribute mapping
    • Check that the transport type matches your tracer provider configuration
  2. Import errors for gRPC
    • Install gRPC dependencies: pip install "fi-instrumentation[grpc]"
  3. Data not being sent to FutureAGI
    • Ensure that you have set the FI_API_KEY and FI_SECRET_KEY environment variables
    • Ensure that the set_global_tracer_provider in the register function is set to True