Skip to main content

Creating Custom Tools

Flo AI provides a powerful tool framework that makes it easy to create custom tools for your agents.

Basic Tool Creation

Using the @flo_tool Decorator

from flo_ai.tool import flo_tool

@flo_tool(description="Get current weather for a city")
async def get_weather(city: str, country: str = None) -> str:
    """Get weather information for a specific city."""
    # Your weather API implementation
    return f"Weather in {city}: sunny, 25°C"

# Use in agent
agent = (
    AgentBuilder()
    .with_name('Weather Assistant')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .with_tools([get_weather.tool])
    .build()
)

Advanced Tool Configuration

@flo_tool(
    name="advanced_calculator",
    description="Perform complex mathematical calculations",
    parameter_descriptions={
        "operation": "Mathematical operation to perform",
        "numbers": "List of numbers to operate on"
    }
)
async def advanced_calculate(operation: str, numbers: list[float]) -> dict:
    """Perform advanced mathematical operations."""
    result = 0
    if operation == "sum":
        result = sum(numbers)
    elif operation == "product":
        result = 1
        for num in numbers:
            result *= num
    elif operation == "average":
        result = sum(numbers) / len(numbers)
    
    return {
        "operation": operation,
        "numbers": numbers,
        "result": result
    }

Tool Types

Synchronous Tools

@flo_tool(description="Simple synchronous calculation")
def simple_add(x: float, y: float) -> float:
    """Add two numbers."""
    return x + y

# Convert to async for agent use
async def async_add(x: float, y: float) -> float:
    return simple_add(x, y)

agent = AgentBuilder().with_tools([async_add]).build()

Asynchronous Tools

import aiohttp

@flo_tool(description="Fetch data from external API")
async def fetch_data(url: str, timeout: int = 30) -> dict:
    """Fetch data from a REST API."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=timeout) as response:
            return await response.json()

File Processing Tools

import os
from pathlib import Path

@flo_tool(description="Read file contents")
async def read_file(file_path: str) -> str:
    """Read contents of a text file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    except Exception as e:
        return f"Error reading file: {e}"

@flo_tool(description="Write text to file")
async def write_file(file_path: str, content: str) -> str:
    """Write content to a file."""
    try:
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)
        return f"Successfully wrote to {file_path}"
    except Exception as e:
        return f"Error writing file: {e}"

Database Integration

SQL Database Tools

import sqlite3
import asyncio
from typing import List, Dict

@flo_tool(description="Query database")
async def query_database(query: str, db_path: str) -> List[Dict]:
    """Execute SQL query and return results."""
    def sync_query():
        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(query)
        results = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return results
    
    return await asyncio.get_event_loop().run_in_executor(None, sync_query)

@flo_tool(description="Insert data into database")
async def insert_data(table: str, data: Dict, db_path: str) -> str:
    """Insert data into database table."""
    def sync_insert():
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?' for _ in data.keys()])
        query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
        
        cursor.execute(query, list(data.values()))
        conn.commit()
        conn.close()
        return f"Inserted data into {table}"
    
    return await asyncio.get_event_loop().run_in_executor(None, sync_insert)

MongoDB Tools

from motor.motor_asyncio import AsyncIOMotorClient

@flo_tool(description="Query MongoDB collection")
async def mongo_query(collection: str, query: dict, database: str = "mydb") -> List[dict]:
    """Query MongoDB collection."""
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client[database]
    coll = db[collection]
    
    cursor = coll.find(query)
    results = await cursor.to_list(length=100)
    return results

Web Scraping Tools

import aiohttp
from bs4 import BeautifulSoup

@flo_tool(description="Scrape webpage content")
async def scrape_webpage(url: str, selector: str = None) -> str:
    """Scrape content from a webpage."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            html = await response.text()
            soup = BeautifulSoup(html, 'html.parser')
            
            if selector:
                elements = soup.select(selector)
                return '\n'.join([elem.get_text() for elem in elements])
            else:
                return soup.get_text()

API Integration Tools

REST API Tools

import aiohttp
import json

@flo_tool(description="Call REST API endpoint")
async def call_api(
    url: str, 
    method: str = "GET", 
    headers: dict = None, 
    data: dict = None
) -> dict:
    """Make HTTP request to REST API."""
    async with aiohttp.ClientSession() as session:
        async with session.request(
            method, 
            url, 
            headers=headers, 
            json=data
        ) as response:
            return {
                "status": response.status,
                "data": await response.json()
            }

# Example: GitHub API
@flo_tool(description="Get GitHub repository information")
async def get_github_repo(owner: str, repo: str) -> dict:
    """Get information about a GitHub repository."""
    url = f"https://api.github.com/repos/{owner}/{repo}"
    return await call_api(url)

GraphQL Tools

@flo_tool(description="Execute GraphQL query")
async def graphql_query(endpoint: str, query: str, variables: dict = None) -> dict:
    """Execute GraphQL query."""
    payload = {
        "query": query,
        "variables": variables or {}
    }
    
    return await call_api(
        endpoint,
        method="POST",
        headers={"Content-Type": "application/json"},
        data=payload
    )

Image Processing Tools

from PIL import Image
import io
import base64

@flo_tool(description="Resize image")
async def resize_image(image_data: str, width: int, height: int) -> str:
    """Resize image to specified dimensions."""
    # Decode base64 image
    image_bytes = base64.b64decode(image_data)
    image = Image.open(io.BytesIO(image_bytes))
    
    # Resize image
    resized = image.resize((width, height))
    
    # Convert back to base64
    buffer = io.BytesIO()
    resized.save(buffer, format='PNG')
    return base64.b64encode(buffer.getvalue()).decode()

@flo_tool(description="Extract text from image")
async def ocr_image(image_data: str) -> str:
    """Extract text from image using OCR."""
    # Implementation would use pytesseract or similar
    return "Extracted text from image"

Email Tools

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@flo_tool(description="Send email")
async def send_email(
    to_email: str, 
    subject: str, 
    body: str, 
    smtp_server: str = "smtp.gmail.com",
    smtp_port: int = 587
) -> str:
    """Send email using SMTP."""
    def sync_send():
        msg = MIMEMultipart()
        msg['From'] = os.getenv('EMAIL_USER')
        msg['To'] = to_email
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'plain'))
        
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()
        server.login(os.getenv('EMAIL_USER'), os.getenv('EMAIL_PASS'))
        server.send_message(msg)
        server.quit()
        
        return f"Email sent to {to_email}"
    
    return await asyncio.get_event_loop().run_in_executor(None, sync_send)

Tool Composition

Chaining Tools

@flo_tool(description="Complete data processing pipeline")
async def process_data_pipeline(
    url: str, 
    output_file: str
) -> str:
    """Complete pipeline: fetch data, process, save to file."""
    # Step 1: Fetch data
    data = await fetch_data(url)
    
    # Step 2: Process data (example transformation)
    processed = {
        "timestamp": data.get("timestamp"),
        "processed_count": len(data.get("items", [])),
        "summary": f"Processed {len(data.get('items', []))} items"
    }
    
    # Step 3: Save to file
    content = json.dumps(processed, indent=2)
    result = await write_file(output_file, content)
    
    return f"Pipeline completed: {result}"

Tool Dependencies

# Base tool
@flo_tool(description="Get user profile")
async def get_user_profile(user_id: str) -> dict:
    """Get user profile from database."""
    return {"id": user_id, "name": "John Doe", "email": "john@example.com"}

# Dependent tool
@flo_tool(description="Send personalized email")
async def send_personalized_email(user_id: str, message: str) -> str:
    """Send email to user with their profile information."""
    profile = await get_user_profile(user_id)
    personalized_message = f"Hello {profile['name']},\n\n{message}"
    
    return await send_email(
        to_email=profile['email'],
        subject="Personalized Message",
        body=personalized_message
    )

Error Handling

Tool Error Handling

@flo_tool(description="Robust API call with error handling")
async def robust_api_call(url: str, max_retries: int = 3) -> dict:
    """Make API call with retry logic and error handling."""
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=10) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        raise Exception(f"HTTP {response.status}")
        except Exception as e:
            if attempt == max_retries - 1:
                return {"error": f"Failed after {max_retries} attempts: {e}"}
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Validation Tools

@flo_tool(description="Validate email address")
async def validate_email(email: str) -> dict:
    """Validate email address format."""
    import re
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    
    if re.match(pattern, email):
        return {"valid": True, "email": email}
    else:
        return {"valid": False, "error": "Invalid email format"}

@flo_tool(description="Validate and process data")
async def validate_and_process(data: dict) -> dict:
    """Validate input data and process if valid."""
    required_fields = ['name', 'email', 'age']
    
    for field in required_fields:
        if field not in data:
            return {"error": f"Missing required field: {field}"}
    
    # Additional validation
    if not isinstance(data['age'], int) or data['age'] < 0:
        return {"error": "Age must be a positive integer"}
    
    # Process valid data
    return {
        "status": "processed",
        "data": {
            "name": data['name'].title(),
            "email": data['email'].lower(),
            "age": data['age']
        }
    }

Best Practices

Tool Design

  1. Single Responsibility: Each tool should have one clear purpose
  2. Error Handling: Always handle errors gracefully
  3. Documentation: Provide clear descriptions and parameter info
  4. Validation: Validate inputs before processing
  5. Async by Default: Use async functions for better performance

Performance Optimization

# Use connection pooling for database tools
import asyncpg

class DatabaseTool:
    def __init__(self):
        self.pool = None
    
    async def init_pool(self, connection_string: str):
        self.pool = await asyncpg.create_pool(connection_string)
    
    @flo_tool(description="Query database with connection pooling")
    async def query_with_pool(self, query: str) -> list:
        """Query database using connection pool."""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query)

Security Considerations

@flo_tool(description="Safe file operations")
async def safe_file_operation(file_path: str, operation: str) -> str:
    """Perform safe file operations with path validation."""
    import os
    from pathlib import Path
    
    # Validate path to prevent directory traversal
    safe_path = Path(file_path).resolve()
    if not str(safe_path).startswith('/allowed/directory'):
        return "Error: Path not allowed"
    
    # Perform operation
    if operation == "read":
        with open(safe_path, 'r') as f:
            return f.read()
    elif operation == "write":
        # Implementation for safe writing
        pass
This comprehensive tool framework allows you to create powerful, integrated AI agents that can interact with any external system or service!
I