Skip to main content

OpenTelemetry Integration

Flo AI includes built-in OpenTelemetry support for comprehensive observability of your AI agents and workflows.

Basic Setup

Configuration

from flo_ai import configure_telemetry, shutdown_telemetry

# Configure telemetry at startup
configure_telemetry(
    service_name="my_ai_app",
    service_version="1.0.0",
    console_export=True,  # For debugging
    otlp_endpoint="http://localhost:4317"  # OTLP collector
)

# Your application code here...

# Shutdown to flush data
shutdown_telemetry()

Environment Variables

# Telemetry configuration
export OTEL_SERVICE_NAME="flo-ai-app"
export OTEL_SERVICE_VERSION="1.0.0"
export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"
export OTEL_EXPORTER_OTLP_HEADERS="x-api-key=your-key"

# Jaeger (alternative to OTLP)
export JAEGER_AGENT_HOST="localhost"
export JAEGER_AGENT_PORT="14268"

Agent Instrumentation

Automatic Instrumentation

from flo_ai.builder.agent_builder import AgentBuilder
from flo_ai.llm import OpenAI

# Agents are automatically instrumented
agent = (
    AgentBuilder()
    .with_name('Monitored Agent')
    .with_prompt('You are a helpful assistant.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .build()
)

# All agent interactions are automatically traced
response = await agent.run('Hello, world!')

Custom Spans

from flo_ai.telemetry import get_tracer

tracer = get_tracer(__name__)

async def custom_agent_operation():
    with tracer.start_as_current_span("custom_operation") as span:
        span.set_attribute("operation.type", "data_processing")
        span.set_attribute("operation.input_size", 1000)
        
        # Your custom logic here
        result = await process_data()
        
        span.set_attribute("operation.output_size", len(result))
        span.set_attribute("operation.success", True)
        
        return result

Workflow Observability

Arium Workflow Tracing

from flo_ai.arium import AriumBuilder

# Workflows are automatically traced
workflow = (
    AriumBuilder()
    .add_agents([agent1, agent2, agent3])
    .start_with(agent1)
    .connect(agent1, agent2)
    .connect(agent2, agent3)
    .end_with(agent3)
)

# Each step in the workflow is traced
result = await workflow.build_and_run(["Input data"])

Custom Workflow Metrics

from flo_ai.telemetry import get_meter

meter = get_meter(__name__)

# Create custom metrics
workflow_counter = meter.create_counter(
    "workflow_executions_total",
    description="Total number of workflow executions"
)

workflow_duration = meter.create_histogram(
    "workflow_duration_seconds",
    description="Duration of workflow executions"
)

# Record metrics
async def execute_workflow():
    start_time = time.time()
    
    workflow_counter.add(1, {"workflow_type": "analysis"})
    
    # Execute workflow
    result = await workflow.build_and_run(["data"])
    
    duration = time.time() - start_time
    workflow_duration.record(duration, {"workflow_type": "analysis"})
    
    return result

Metrics Collection

Agent Metrics

from flo_ai.telemetry import FloTelemetry

# Get telemetry instance
telemetry = FloTelemetry()

# Agent execution metrics
agent_metrics = {
    "agent_executions_total": meter.create_counter(
        "agent_executions_total",
        description="Total agent executions"
    ),
    "agent_duration_seconds": meter.create_histogram(
        "agent_duration_seconds",
        description="Agent execution duration"
    ),
    "agent_tokens_used": meter.create_counter(
        "agent_tokens_used",
        description="Total tokens used by agents"
    )
}

# Record agent metrics
async def execute_agent_with_metrics(agent, input_data):
    start_time = time.time()
    
    agent_metrics["agent_executions_total"].add(1, {
        "agent_name": agent.name,
        "model": agent.llm.model
    })
    
    response = await agent.run(input_data)
    
    duration = time.time() - start_time
    agent_metrics["agent_duration_seconds"].record(duration, {
        "agent_name": agent.name
    })
    
    # Record token usage if available
    if hasattr(response, 'usage'):
        agent_metrics["agent_tokens_used"].add(
            response.usage.total_tokens,
            {"agent_name": agent.name}
        )
    
    return response

LLM Provider Metrics

# LLM-specific metrics
llm_metrics = {
    "llm_requests_total": meter.create_counter(
        "llm_requests_total",
        description="Total LLM requests"
    ),
    "llm_request_duration": meter.create_histogram(
        "llm_request_duration_seconds",
        description="LLM request duration"
    ),
    "llm_tokens_total": meter.create_counter(
        "llm_tokens_total",
        description="Total tokens processed"
    ),
    "llm_errors_total": meter.create_counter(
        "llm_errors_total",
        description="Total LLM errors"
    )
}

# Record LLM metrics
async def llm_request_with_metrics(llm, prompt):
    start_time = time.time()
    
    llm_metrics["llm_requests_total"].add(1, {
        "provider": llm.provider,
        "model": llm.model
    })
    
    try:
        response = await llm.generate(prompt)
        
        duration = time.time() - start_time
        llm_metrics["llm_request_duration"].record(duration, {
            "provider": llm.provider,
            "model": llm.model
        })
        
        # Record token usage
        if hasattr(response, 'usage'):
            llm_metrics["llm_tokens_total"].add(
                response.usage.total_tokens,
                {"provider": llm.provider, "model": llm.model}
            )
        
        return response
        
    except Exception as e:
        llm_metrics["llm_errors_total"].add(1, {
            "provider": llm.provider,
            "error_type": type(e).__name__
        })
        raise

Distributed Tracing

Trace Context Propagation

from opentelemetry import trace
from opentelemetry.trace import get_current_span

async def distributed_workflow():
    # Get current trace context
    current_span = get_current_span()
    
    if current_span:
        # Add custom attributes
        current_span.set_attribute("workflow.id", "analysis_workflow")
        current_span.set_attribute("workflow.version", "1.0.0")
    
    # Execute workflow - trace context is automatically propagated
    result = await workflow.build_and_run(["data"])
    
    return result

Cross-Service Tracing

import httpx
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# Instrument HTTP client for distributed tracing
HTTPXClientInstrumentor().instrument()

async def call_external_service():
    async with httpx.AsyncClient() as client:
        # Trace context is automatically propagated
        response = await client.get("https://api.example.com/data")
        return response.json()

Custom Instrumentation

Custom Spans

from flo_ai.telemetry import get_tracer

tracer = get_tracer(__name__)

async def custom_operation():
    with tracer.start_as_current_span("custom_operation") as span:
        # Set span attributes
        span.set_attribute("operation.name", "data_processing")
        span.set_attribute("operation.input_size", 1000)
        
        # Add events
        span.add_event("Processing started")
        
        # Your custom logic
        result = await process_data()
        
        span.add_event("Processing completed", {
            "output_size": len(result),
            "processing_time": time.time() - start_time
        })
        
        # Set status
        span.set_status(trace.Status(trace.StatusCode.OK))
        
        return result

Custom Metrics

from flo_ai.telemetry import get_meter

meter = get_meter(__name__)

# Create custom metrics
custom_counter = meter.create_counter(
    "custom_operations_total",
    description="Total custom operations"
)

custom_histogram = meter.create_histogram(
    "custom_operation_duration",
    description="Custom operation duration"
)

# Record custom metrics
async def custom_operation_with_metrics():
    start_time = time.time()
    
    custom_counter.add(1, {"operation_type": "data_processing"})
    
    # Your operation
    result = await custom_operation()
    
    duration = time.time() - start_time
    custom_histogram.record(duration, {"operation_type": "data_processing"})
    
    return result

Monitoring Dashboards

Prometheus Metrics

from prometheus_client import start_http_server, Counter, Histogram

# Prometheus metrics
agent_executions = Counter(
    'flo_ai_agent_executions_total',
    'Total agent executions',
    ['agent_name', 'model']
)

agent_duration = Histogram(
    'flo_ai_agent_duration_seconds',
    'Agent execution duration',
    ['agent_name']
)

# Start Prometheus metrics server
start_http_server(8000)

Grafana Dashboard

{
  "dashboard": {
    "title": "Flo AI Monitoring",
    "panels": [
      {
        "title": "Agent Executions",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(flo_ai_agent_executions_total[5m])",
            "legendFormat": "{{agent_name}}"
          }
        ]
      },
      {
        "title": "Agent Duration",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, flo_ai_agent_duration_seconds)",
            "legendFormat": "95th percentile"
          }
        ]
      }
    ]
  }
}

Error Tracking

Exception Handling

from flo_ai.telemetry import get_tracer

tracer = get_tracer(__name__)

async def monitored_operation():
    with tracer.start_as_current_span("monitored_operation") as span:
        try:
            # Your operation
            result = await risky_operation()
            span.set_status(trace.Status(trace.StatusCode.OK))
            return result
            
        except Exception as e:
            # Record exception
            span.record_exception(e)
            span.set_status(trace.Status(
                trace.StatusCode.ERROR,
                str(e)
            ))
            raise

Error Metrics

error_counter = meter.create_counter(
    "flo_ai_errors_total",
    description="Total errors by type"
)

async def operation_with_error_tracking():
    try:
        return await operation()
    except Exception as e:
        error_counter.add(1, {
            "error_type": type(e).__name__,
            "operation": "data_processing"
        })
        raise

Performance Monitoring

Latency Monitoring

latency_histogram = meter.create_histogram(
    "flo_ai_operation_latency",
    description="Operation latency",
    unit="seconds"
)

async def monitored_operation():
    start_time = time.time()
    
    try:
        result = await operation()
        return result
    finally:
        duration = time.time() - start_time
        latency_histogram.record(duration)

Throughput Monitoring

throughput_counter = meter.create_counter(
    "flo_ai_operations_total",
    description="Total operations"
)

async def operation():
    throughput_counter.add(1, {"operation_type": "processing"})
    return await actual_operation()

Best Practices

Telemetry Configuration

  1. Use structured logging: Include relevant context in logs
  2. Set appropriate sampling rates: Balance observability with performance
  3. Monitor resource usage: Track memory and CPU usage
  4. Implement alerting: Set up alerts for critical metrics
  5. Regular cleanup: Clean up old traces and metrics

Performance Optimization

# Optimize telemetry for production
configure_telemetry(
    service_name="production_ai_app",
    service_version="1.0.0",
    console_export=False,  # Disable console export in production
    sampling_rate=0.1,     # Sample 10% of traces
    batch_size=100,        # Batch telemetry data
    export_timeout=30      # Export timeout
)

Security Considerations

# Sanitize sensitive data
def sanitize_span_attributes(attributes):
    sensitive_keys = ['password', 'token', 'key', 'secret']
    sanitized = {}
    
    for key, value in attributes.items():
        if any(sensitive in key.lower() for sensitive in sensitive_keys):
            sanitized[key] = "[REDACTED]"
        else:
            sanitized[key] = value
    
    return sanitized
This comprehensive telemetry system provides full observability into your AI agents and workflows, enabling you to monitor performance, debug issues, and optimize your systems!
I