1. Installation

Install the traceAI and Google ADK packages.

pip install traceai-google-adk

2. Set Environment Variables

Set up your environment variables to authenticate with both FutureAGI and Google.

import os

os.environ["FI_API_KEY"] = "your-futureagi-api-key"
os.environ["FI_SECRET_KEY"] = "your-futureagi-secret-key"
os.environ["GOOGLE_API_KEY"] = "your-google-api-key"

3. Initialize Trace Provider

Set up the trace provider to create a new project in FutureAGI, establish telemetry data pipelines .

from fi_instrumentation import register
from fi_instrumentation.fi_types import ProjectType

trace_provider = register(
    project_type=ProjectType.OBSERVE,
    project_name="google_adk",
)

4. Instrument your Project

Instrument your project to enable automatic tracing.

from traceai_google_adk import GoogleADKInstrumentor

GoogleADKInstrumentor().instrument(tracer_provider=tracer_provider)

5. Interact with Google ADK

Start interacting with Google ADK as you normally would. Our Instrumentor will automatically trace and send the telemetry data to our platform. Here is a sample code using the Google ADK SDK.

def get_weather(city: str) -> dict:
    """Retrieves the current weather report for a specified city.

    Args:
        city (str): The name of the city for which to retrieve the weather report.

    Returns:
        dict: status and result or error msg.
    """
    if city.lower() == "new york":
        return {
            "status": "success",
            "report": (
                "The weather in New York is sunny with a temperature of 25 degrees"
                " Celsius (77 degrees Fahrenheit)."
            ),
        }
    else:
        return {
            "status": "error",
            "error_message": f"Weather information for '{city}' is not available.",
        }

agent = Agent(
   name="test_agent",
   model="gemini-2.5-flash-preview-05-20",
   description="Agent to answer questions using tools.",
   instruction="You must use the available tools to find an answer.",
   tools=[get_weather]
)

async def main():
    app_name = "test_instrumentation"
    user_id = "test_user"
    session_id = "test_session"
    runner = InMemoryRunner(agent=agent, app_name=app_name)
    session_service = runner.session_service
    await session_service.create_session(
        app_name=app_name,
        user_id=user_id,
        session_id=session_id
    )
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id,
        new_message=types.Content(role="user", parts=[
            types.Part(text="What is the weather in New York?")]
        )
    ):
        if event.is_final_response():
            print(event.content.parts[0].text.strip())

if __name__ == "__main__":
    asyncio.run(main())