Pipecat
Integrate Pipecat with Future AGI for voice application observability. Trace and monitor voice pipelines with OpenTelemetry-based traceAI-pipecat.
Overview
This integration provides support for using OpenTelemetry with Pipecat applications. It enables tracing and monitoring of voice applications built with Pipecat, with automatic attribute mapping to Future AGI conventions.
1. Installation
Install the traceAI Pipecat package:
pip install traceAI-pipecat pipecat-ai[tracing]
2. Set Environment Variables
Set up your environment variables to authenticate with FutureAGI and Pipecat:
import os
os.environ["FI_API_KEY"] = FI_API_KEY
os.environ["FI_SECRET_KEY"] = FI_SECRET_KEY
3. Initialize Trace Provider
Set up the trace provider to establish the observability pipeline:
from fi_instrumentation.otel import register, Transport, ProjectType
trace_provider = register(
project_type=ProjectType.OBSERVE,
project_name="Pipecat Voice App",
set_global_tracer_provider=True,
)
4. Enable Attribute Mapping
Enable attribute mapping to convert Pipecat attributes to Future AGI conventions. This method automatically updates your existing span exporters:
from traceai_pipecat import enable_http_attribute_mapping
# For HTTP transport
success = enable_http_attribute_mapping()from traceai_pipecat import enable_grpc_attribute_mapping
# For gRPC transport
success = enable_grpc_attribute_mapping()from traceai_pipecat import enable_fi_attribute_mapping
from fi_instrumentation.otel import Transport
# Or specify transport explicitly via enum
success = enable_fi_attribute_mapping(transport=Transport.HTTP) # or Transport.GRPC 5. Initialize The Pipecat Application
Initialize the Pipecat application with the trace provider:
Note
Enabling Tracing in Pipecat requires you to set the enable_tracing flag to True in the PipelineParams object.
refer to this link for more details.
import os
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
enable_tracing=True,
enable_turn_tracking=True,
conversation_id="customer-123",
additional_span_attributes={"session.id": "abc-123"},
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Say hello and briefly introduce yourself."}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point for the bot starter."""
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Features
Automatic Attribute Mapping
The integration automatically maps Pipecat-specific attributes to Future AGI conventions:
- LLM Operations: Maps
gen_ai.system,gen_ai.request.modeltollm.provider,llm.model_name - Input/Output: Maps
input,output,transcriptto structured Future AGI format - Token Usage: Maps
gen_ai.usage.*tollm.token_count.* - Tools: Maps tool-related attributes to Future AGI tool conventions
- Session Data: Maps conversation and session information
- Metadata: Consolidates miscellaneous attributes into structured metadata
Transport Support
- HTTP: Full support for HTTP transport with automatic endpoint detection
- gRPC: Support for gRPC transport (requires
fi-instrumentation[grpc])
Span Kind Detection
Automatically determines the appropriate fi.span.kind based on span attributes:
LLM: For LLM, STT, and TTS operationsTOOL: For tool calls and resultsAGENT: For setup and configuration spansCHAIN: For turn and conversation spans
API Reference
Integration Functions
enable_fi_attribute_mapping(transport: Transport = Transport.HTTP) -> bool
Install attribute mapping by replacing existing span exporters.
Parameters:
transport: Transport protocol enum (Transport.HTTPorTransport.GRPC)
Returns:
bool: True if at least one exporter was replaced
enable_http_attribute_mapping() -> bool
Convenience function for HTTP transport.
enable_grpc_attribute_mapping() -> bool
Convenience function for gRPC transport.
Exporter Creation Functions
create_mapped_http_exporter(endpoint: Optional[str] = None, headers: Optional[dict] = None)
Create a new HTTP exporter with Pipecat attribute mapping.
create_mapped_grpc_exporter(endpoint: Optional[str] = None, headers: Optional[dict] = None)
Create a new gRPC exporter with Pipecat attribute mapping.
Exporter Classes
MappedHTTPSpanExporter
HTTP span exporter that maps Pipecat attributes to Future AGI conventions.
MappedGRPCSpanExporter
gRPC span exporter that maps Pipecat attributes to Future AGI conventions.
BaseMappedSpanExporter
Base class for mapped span exporters.
Troubleshooting
Common Issues
-
No exporters found to replace
- Ensure you’ve called
register()before installing attribute mapping - Check that the transport type matches your tracer provider configuration
- Ensure you’ve called
-
Import errors for gRPC
- Install gRPC dependencies:
pip install "fi-instrumentation[grpc]"
- Install gRPC dependencies:
-
Data not being sent to FutureAGI
- Ensure that you have set the
FI_API_KEYandFI_SECRET_KEYenvironment variables - Ensure that the
set_global_tracer_providerin theregisterfunction is set toTrue
- Ensure that you have set the