OpenTelemetry Integration
Flo AI includes built-in OpenTelemetry support for comprehensive observability of your AI agents and workflows.Basic Setup
Configuration
Copy
from flo_ai import configure_telemetry, shutdown_telemetry
# Configure telemetry at startup
configure_telemetry(
service_name="my_ai_app",
service_version="1.0.0",
console_export=True, # For debugging
otlp_endpoint="http://localhost:4317" # OTLP collector
)
# Your application code here...
# Shutdown to flush data
shutdown_telemetry()
Environment Variables
Copy
# Telemetry configuration
export OTEL_SERVICE_NAME="flo-ai-app"
export OTEL_SERVICE_VERSION="1.0.0"
export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"
export OTEL_EXPORTER_OTLP_HEADERS="x-api-key=your-key"
# Jaeger (alternative to OTLP)
export JAEGER_AGENT_HOST="localhost"
export JAEGER_AGENT_PORT="14268"
Agent Instrumentation
Automatic Instrumentation
Copy
from flo_ai.builder.agent_builder import AgentBuilder
from flo_ai.llm import OpenAI
# Agents are automatically instrumented
agent = (
AgentBuilder()
.with_name('Monitored Agent')
.with_prompt('You are a helpful assistant.')
.with_llm(OpenAI(model='gpt-4o-mini'))
.build()
)
# All agent interactions are automatically traced
response = await agent.run('Hello, world!')
Custom Spans
Copy
from flo_ai.telemetry import get_tracer
tracer = get_tracer(__name__)
async def custom_agent_operation():
with tracer.start_as_current_span("custom_operation") as span:
span.set_attribute("operation.type", "data_processing")
span.set_attribute("operation.input_size", 1000)
# Your custom logic here
result = await process_data()
span.set_attribute("operation.output_size", len(result))
span.set_attribute("operation.success", True)
return result
Workflow Observability
Arium Workflow Tracing
Copy
from flo_ai.arium import AriumBuilder
# Workflows are automatically traced
workflow = (
AriumBuilder()
.add_agents([agent1, agent2, agent3])
.start_with(agent1)
.connect(agent1, agent2)
.connect(agent2, agent3)
.end_with(agent3)
)
# Each step in the workflow is traced
result = await workflow.build_and_run(["Input data"])
Custom Workflow Metrics
Copy
from flo_ai.telemetry import get_meter
meter = get_meter(__name__)
# Create custom metrics
workflow_counter = meter.create_counter(
"workflow_executions_total",
description="Total number of workflow executions"
)
workflow_duration = meter.create_histogram(
"workflow_duration_seconds",
description="Duration of workflow executions"
)
# Record metrics
async def execute_workflow():
start_time = time.time()
workflow_counter.add(1, {"workflow_type": "analysis"})
# Execute workflow
result = await workflow.build_and_run(["data"])
duration = time.time() - start_time
workflow_duration.record(duration, {"workflow_type": "analysis"})
return result
Metrics Collection
Agent Metrics
Copy
from flo_ai.telemetry import FloTelemetry
# Get telemetry instance
telemetry = FloTelemetry()
# Agent execution metrics
agent_metrics = {
"agent_executions_total": meter.create_counter(
"agent_executions_total",
description="Total agent executions"
),
"agent_duration_seconds": meter.create_histogram(
"agent_duration_seconds",
description="Agent execution duration"
),
"agent_tokens_used": meter.create_counter(
"agent_tokens_used",
description="Total tokens used by agents"
)
}
# Record agent metrics
async def execute_agent_with_metrics(agent, input_data):
start_time = time.time()
agent_metrics["agent_executions_total"].add(1, {
"agent_name": agent.name,
"model": agent.llm.model
})
response = await agent.run(input_data)
duration = time.time() - start_time
agent_metrics["agent_duration_seconds"].record(duration, {
"agent_name": agent.name
})
# Record token usage if available
if hasattr(response, 'usage'):
agent_metrics["agent_tokens_used"].add(
response.usage.total_tokens,
{"agent_name": agent.name}
)
return response
LLM Provider Metrics
Copy
# LLM-specific metrics
llm_metrics = {
"llm_requests_total": meter.create_counter(
"llm_requests_total",
description="Total LLM requests"
),
"llm_request_duration": meter.create_histogram(
"llm_request_duration_seconds",
description="LLM request duration"
),
"llm_tokens_total": meter.create_counter(
"llm_tokens_total",
description="Total tokens processed"
),
"llm_errors_total": meter.create_counter(
"llm_errors_total",
description="Total LLM errors"
)
}
# Record LLM metrics
async def llm_request_with_metrics(llm, prompt):
start_time = time.time()
llm_metrics["llm_requests_total"].add(1, {
"provider": llm.provider,
"model": llm.model
})
try:
response = await llm.generate(prompt)
duration = time.time() - start_time
llm_metrics["llm_request_duration"].record(duration, {
"provider": llm.provider,
"model": llm.model
})
# Record token usage
if hasattr(response, 'usage'):
llm_metrics["llm_tokens_total"].add(
response.usage.total_tokens,
{"provider": llm.provider, "model": llm.model}
)
return response
except Exception as e:
llm_metrics["llm_errors_total"].add(1, {
"provider": llm.provider,
"error_type": type(e).__name__
})
raise
Distributed Tracing
Trace Context Propagation
Copy
from opentelemetry import trace
from opentelemetry.trace import get_current_span
async def distributed_workflow():
# Get current trace context
current_span = get_current_span()
if current_span:
# Add custom attributes
current_span.set_attribute("workflow.id", "analysis_workflow")
current_span.set_attribute("workflow.version", "1.0.0")
# Execute workflow - trace context is automatically propagated
result = await workflow.build_and_run(["data"])
return result
Cross-Service Tracing
Copy
import httpx
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
# Instrument HTTP client for distributed tracing
HTTPXClientInstrumentor().instrument()
async def call_external_service():
async with httpx.AsyncClient() as client:
# Trace context is automatically propagated
response = await client.get("https://api.example.com/data")
return response.json()
Custom Instrumentation
Custom Spans
Copy
from flo_ai.telemetry import get_tracer
tracer = get_tracer(__name__)
async def custom_operation():
with tracer.start_as_current_span("custom_operation") as span:
# Set span attributes
span.set_attribute("operation.name", "data_processing")
span.set_attribute("operation.input_size", 1000)
# Add events
span.add_event("Processing started")
# Your custom logic
result = await process_data()
span.add_event("Processing completed", {
"output_size": len(result),
"processing_time": time.time() - start_time
})
# Set status
span.set_status(trace.Status(trace.StatusCode.OK))
return result
Custom Metrics
Copy
from flo_ai.telemetry import get_meter
meter = get_meter(__name__)
# Create custom metrics
custom_counter = meter.create_counter(
"custom_operations_total",
description="Total custom operations"
)
custom_histogram = meter.create_histogram(
"custom_operation_duration",
description="Custom operation duration"
)
# Record custom metrics
async def custom_operation_with_metrics():
start_time = time.time()
custom_counter.add(1, {"operation_type": "data_processing"})
# Your operation
result = await custom_operation()
duration = time.time() - start_time
custom_histogram.record(duration, {"operation_type": "data_processing"})
return result
Monitoring Dashboards
Prometheus Metrics
Copy
from prometheus_client import start_http_server, Counter, Histogram
# Prometheus metrics
agent_executions = Counter(
'flo_ai_agent_executions_total',
'Total agent executions',
['agent_name', 'model']
)
agent_duration = Histogram(
'flo_ai_agent_duration_seconds',
'Agent execution duration',
['agent_name']
)
# Start Prometheus metrics server
start_http_server(8000)
Grafana Dashboard
Copy
{
"dashboard": {
"title": "Flo AI Monitoring",
"panels": [
{
"title": "Agent Executions",
"type": "graph",
"targets": [
{
"expr": "rate(flo_ai_agent_executions_total[5m])",
"legendFormat": "{{agent_name}}"
}
]
},
{
"title": "Agent Duration",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, flo_ai_agent_duration_seconds)",
"legendFormat": "95th percentile"
}
]
}
]
}
}
Error Tracking
Exception Handling
Copy
from flo_ai.telemetry import get_tracer
tracer = get_tracer(__name__)
async def monitored_operation():
with tracer.start_as_current_span("monitored_operation") as span:
try:
# Your operation
result = await risky_operation()
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except Exception as e:
# Record exception
span.record_exception(e)
span.set_status(trace.Status(
trace.StatusCode.ERROR,
str(e)
))
raise
Error Metrics
Copy
error_counter = meter.create_counter(
"flo_ai_errors_total",
description="Total errors by type"
)
async def operation_with_error_tracking():
try:
return await operation()
except Exception as e:
error_counter.add(1, {
"error_type": type(e).__name__,
"operation": "data_processing"
})
raise
Performance Monitoring
Latency Monitoring
Copy
latency_histogram = meter.create_histogram(
"flo_ai_operation_latency",
description="Operation latency",
unit="seconds"
)
async def monitored_operation():
start_time = time.time()
try:
result = await operation()
return result
finally:
duration = time.time() - start_time
latency_histogram.record(duration)
Throughput Monitoring
Copy
throughput_counter = meter.create_counter(
"flo_ai_operations_total",
description="Total operations"
)
async def operation():
throughput_counter.add(1, {"operation_type": "processing"})
return await actual_operation()
Best Practices
Telemetry Configuration
- Use structured logging: Include relevant context in logs
- Set appropriate sampling rates: Balance observability with performance
- Monitor resource usage: Track memory and CPU usage
- Implement alerting: Set up alerts for critical metrics
- Regular cleanup: Clean up old traces and metrics
Performance Optimization
Copy
# Optimize telemetry for production
configure_telemetry(
service_name="production_ai_app",
service_version="1.0.0",
console_export=False, # Disable console export in production
sampling_rate=0.1, # Sample 10% of traces
batch_size=100, # Batch telemetry data
export_timeout=30 # Export timeout
)
Security Considerations
Copy
# Sanitize sensitive data
def sanitize_span_attributes(attributes):
sensitive_keys = ['password', 'token', 'key', 'secret']
sanitized = {}
for key, value in attributes.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = "[REDACTED]"
else:
sanitized[key] = value
return sanitized

