Creating Custom Tools
Flo AI provides a powerful tool framework that makes it easy to create custom tools for your agents.Basic Tool Creation
Using the @flo_tool Decorator
Copy
from flo_ai.tool import flo_tool
@flo_tool(description="Get current weather for a city")
async def get_weather(city: str, country: str = None) -> str:
"""Get weather information for a specific city."""
# Your weather API implementation
return f"Weather in {city}: sunny, 25°C"
# Use in agent
agent = (
AgentBuilder()
.with_name('Weather Assistant')
.with_llm(OpenAI(model='gpt-4o-mini'))
.with_tools([get_weather.tool])
.build()
)
Advanced Tool Configuration
Copy
@flo_tool(
name="advanced_calculator",
description="Perform complex mathematical calculations",
parameter_descriptions={
"operation": "Mathematical operation to perform",
"numbers": "List of numbers to operate on"
}
)
async def advanced_calculate(operation: str, numbers: list[float]) -> dict:
"""Perform advanced mathematical operations."""
result = 0
if operation == "sum":
result = sum(numbers)
elif operation == "product":
result = 1
for num in numbers:
result *= num
elif operation == "average":
result = sum(numbers) / len(numbers)
return {
"operation": operation,
"numbers": numbers,
"result": result
}
Tool Types
Synchronous Tools
Copy
@flo_tool(description="Simple synchronous calculation")
def simple_add(x: float, y: float) -> float:
"""Add two numbers."""
return x + y
# Convert to async for agent use
async def async_add(x: float, y: float) -> float:
return simple_add(x, y)
agent = AgentBuilder().with_tools([async_add]).build()
Asynchronous Tools
Copy
import aiohttp
@flo_tool(description="Fetch data from external API")
async def fetch_data(url: str, timeout: int = 30) -> dict:
"""Fetch data from a REST API."""
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
return await response.json()
File Processing Tools
Copy
import os
from pathlib import Path
@flo_tool(description="Read file contents")
async def read_file(file_path: str) -> str:
"""Read contents of a text file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error reading file: {e}"
@flo_tool(description="Write text to file")
async def write_file(file_path: str, content: str) -> str:
"""Write content to a file."""
try:
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {file_path}"
except Exception as e:
return f"Error writing file: {e}"
Database Integration
SQL Database Tools
Copy
import sqlite3
import asyncio
from typing import List, Dict
@flo_tool(description="Query database")
async def query_database(query: str, db_path: str) -> List[Dict]:
"""Execute SQL query and return results."""
def sync_query():
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query)
results = [dict(row) for row in cursor.fetchall()]
conn.close()
return results
return await asyncio.get_event_loop().run_in_executor(None, sync_query)
@flo_tool(description="Insert data into database")
async def insert_data(table: str, data: Dict, db_path: str) -> str:
"""Insert data into database table."""
def sync_insert():
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data.keys()])
query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
cursor.execute(query, list(data.values()))
conn.commit()
conn.close()
return f"Inserted data into {table}"
return await asyncio.get_event_loop().run_in_executor(None, sync_insert)
MongoDB Tools
Copy
from motor.motor_asyncio import AsyncIOMotorClient
@flo_tool(description="Query MongoDB collection")
async def mongo_query(collection: str, query: dict, database: str = "mydb") -> List[dict]:
"""Query MongoDB collection."""
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client[database]
coll = db[collection]
cursor = coll.find(query)
results = await cursor.to_list(length=100)
return results
Web Scraping Tools
Copy
import aiohttp
from bs4 import BeautifulSoup
@flo_tool(description="Scrape webpage content")
async def scrape_webpage(url: str, selector: str = None) -> str:
"""Scrape content from a webpage."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
if selector:
elements = soup.select(selector)
return '\n'.join([elem.get_text() for elem in elements])
else:
return soup.get_text()
API Integration Tools
REST API Tools
Copy
import aiohttp
import json
@flo_tool(description="Call REST API endpoint")
async def call_api(
url: str,
method: str = "GET",
headers: dict = None,
data: dict = None
) -> dict:
"""Make HTTP request to REST API."""
async with aiohttp.ClientSession() as session:
async with session.request(
method,
url,
headers=headers,
json=data
) as response:
return {
"status": response.status,
"data": await response.json()
}
# Example: GitHub API
@flo_tool(description="Get GitHub repository information")
async def get_github_repo(owner: str, repo: str) -> dict:
"""Get information about a GitHub repository."""
url = f"https://api.github.com/repos/{owner}/{repo}"
return await call_api(url)
GraphQL Tools
Copy
@flo_tool(description="Execute GraphQL query")
async def graphql_query(endpoint: str, query: str, variables: dict = None) -> dict:
"""Execute GraphQL query."""
payload = {
"query": query,
"variables": variables or {}
}
return await call_api(
endpoint,
method="POST",
headers={"Content-Type": "application/json"},
data=payload
)
Image Processing Tools
Copy
from PIL import Image
import io
import base64
@flo_tool(description="Resize image")
async def resize_image(image_data: str, width: int, height: int) -> str:
"""Resize image to specified dimensions."""
# Decode base64 image
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# Resize image
resized = image.resize((width, height))
# Convert back to base64
buffer = io.BytesIO()
resized.save(buffer, format='PNG')
return base64.b64encode(buffer.getvalue()).decode()
@flo_tool(description="Extract text from image")
async def ocr_image(image_data: str) -> str:
"""Extract text from image using OCR."""
# Implementation would use pytesseract or similar
return "Extracted text from image"
Email Tools
Copy
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
@flo_tool(description="Send email")
async def send_email(
to_email: str,
subject: str,
body: str,
smtp_server: str = "smtp.gmail.com",
smtp_port: int = 587
) -> str:
"""Send email using SMTP."""
def sync_send():
msg = MIMEMultipart()
msg['From'] = os.getenv('EMAIL_USER')
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(os.getenv('EMAIL_USER'), os.getenv('EMAIL_PASS'))
server.send_message(msg)
server.quit()
return f"Email sent to {to_email}"
return await asyncio.get_event_loop().run_in_executor(None, sync_send)
Tool Composition
Chaining Tools
Copy
@flo_tool(description="Complete data processing pipeline")
async def process_data_pipeline(
url: str,
output_file: str
) -> str:
"""Complete pipeline: fetch data, process, save to file."""
# Step 1: Fetch data
data = await fetch_data(url)
# Step 2: Process data (example transformation)
processed = {
"timestamp": data.get("timestamp"),
"processed_count": len(data.get("items", [])),
"summary": f"Processed {len(data.get('items', []))} items"
}
# Step 3: Save to file
content = json.dumps(processed, indent=2)
result = await write_file(output_file, content)
return f"Pipeline completed: {result}"
Tool Dependencies
Copy
# Base tool
@flo_tool(description="Get user profile")
async def get_user_profile(user_id: str) -> dict:
"""Get user profile from database."""
return {"id": user_id, "name": "John Doe", "email": "john@example.com"}
# Dependent tool
@flo_tool(description="Send personalized email")
async def send_personalized_email(user_id: str, message: str) -> str:
"""Send email to user with their profile information."""
profile = await get_user_profile(user_id)
personalized_message = f"Hello {profile['name']},\n\n{message}"
return await send_email(
to_email=profile['email'],
subject="Personalized Message",
body=personalized_message
)
Error Handling
Tool Error Handling
Copy
@flo_tool(description="Robust API call with error handling")
async def robust_api_call(url: str, max_retries: int = 3) -> dict:
"""Make API call with retry logic and error handling."""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
if attempt == max_retries - 1:
return {"error": f"Failed after {max_retries} attempts: {e}"}
await asyncio.sleep(2 ** attempt) # Exponential backoff
Validation Tools
Copy
@flo_tool(description="Validate email address")
async def validate_email(email: str) -> dict:
"""Validate email address format."""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(pattern, email):
return {"valid": True, "email": email}
else:
return {"valid": False, "error": "Invalid email format"}
@flo_tool(description="Validate and process data")
async def validate_and_process(data: dict) -> dict:
"""Validate input data and process if valid."""
required_fields = ['name', 'email', 'age']
for field in required_fields:
if field not in data:
return {"error": f"Missing required field: {field}"}
# Additional validation
if not isinstance(data['age'], int) or data['age'] < 0:
return {"error": "Age must be a positive integer"}
# Process valid data
return {
"status": "processed",
"data": {
"name": data['name'].title(),
"email": data['email'].lower(),
"age": data['age']
}
}
Best Practices
Tool Design
- Single Responsibility: Each tool should have one clear purpose
- Error Handling: Always handle errors gracefully
- Documentation: Provide clear descriptions and parameter info
- Validation: Validate inputs before processing
- Async by Default: Use async functions for better performance
Performance Optimization
Copy
# Use connection pooling for database tools
import asyncpg
class DatabaseTool:
def __init__(self):
self.pool = None
async def init_pool(self, connection_string: str):
self.pool = await asyncpg.create_pool(connection_string)
@flo_tool(description="Query database with connection pooling")
async def query_with_pool(self, query: str) -> list:
"""Query database using connection pool."""
async with self.pool.acquire() as conn:
return await conn.fetch(query)
Security Considerations
Copy
@flo_tool(description="Safe file operations")
async def safe_file_operation(file_path: str, operation: str) -> str:
"""Perform safe file operations with path validation."""
import os
from pathlib import Path
# Validate path to prevent directory traversal
safe_path = Path(file_path).resolve()
if not str(safe_path).startswith('/allowed/directory'):
return "Error: Path not allowed"
# Perform operation
if operation == "read":
with open(safe_path, 'r') as f:
return f.read()
elif operation == "write":
# Implementation for safe writing
pass

