Skip to main content

What is Arium?

Arium is Flo AI’s powerful workflow orchestration engine for creating complex multi-agent workflows. It allows you to chain agents together, implement conditional routing, and build sophisticated AI systems.

Basic Workflow Creation

Simple Agent Chain

Create a linear workflow with multiple agents:
from flo_ai.arium import AriumBuilder
from flo_ai.models.agent import Agent
from flo_ai.llm import OpenAI

async def simple_chain():
    llm = OpenAI(model='gpt-4o-mini')
    
    # Create agents
    analyst = Agent(
        name='content_analyst',
        system_prompt='Analyze the input and extract key insights.',
        llm=llm
    )
    
    summarizer = Agent(
        name='summarizer', 
        system_prompt='Create a concise summary based on the analysis.',
        llm=llm
    )
    
    # Build and run workflow
    result = await (
        AriumBuilder()
        .add_agents([analyst, summarizer])
        .start_with(analyst)
        .connect(analyst, summarizer)
        .end_with(summarizer)
        .build_and_run(["Analyze this complex business report..."])
    )
    
    return result

Conditional Routing

Route to different agents based on conditions:
from flo_ai.arium.memory import BaseMemory

def route_by_type(memory: BaseMemory) -> str:
    """Route based on classification result"""
    messages = memory.get()
    last_message = str(messages[-1]) if messages else ""
    
    if "technical" in last_message.lower():
        return "tech_specialist"
    else:
        return "business_specialist"

# Build workflow with conditional routing
result = await (
    AriumBuilder()
    .add_agents([classifier, tech_specialist, business_specialist, final_agent])
    .start_with(classifier)
    .add_edge(classifier, [tech_specialist, business_specialist], route_by_type)
    .connect(tech_specialist, final_agent)
    .connect(business_specialist, final_agent)
    .end_with(final_agent)
    .build_and_run(["How can we optimize our database performance?"])
)

YAML-Based Workflows

Define entire workflows in YAML for easy management:
metadata:
  name: "content-analysis-workflow"
  version: "1.0.0"
  description: "Multi-agent content analysis pipeline"

arium:
  agents:
    - name: "analyzer"
      role: "Content Analyst"
      job: "Analyze the input content and extract key insights."
      model:
        provider: "openai"
        name: "gpt-4o-mini"
    
    - name: "summarizer"
      role: "Content Summarizer"
      job: "Create a concise summary based on the analysis."
      model:
        provider: "anthropic"
        name: "claude-3-5-sonnet-20240620"

  workflow:
    start: "analyzer"
    edges:
      - from: "analyzer"
        to: ["summarizer"]
    end: ["summarizer"]
# Run YAML workflow
result = await (
    AriumBuilder()
    .from_yaml(yaml_file='workflow.yaml')
    .build_and_run(["Analyze this quarterly business report..."])
)

Advanced Routing

LLM-Powered Routers

Use LLMs for intelligent routing decisions:
routers:
  - name: "content_type_router"
    type: "smart"  # Uses LLM for intelligent routing
    routing_options:
      technical_writer: "Technical content, documentation, tutorials"
      creative_writer: "Creative writing, storytelling, fiction"
      marketing_writer: "Marketing copy, sales content, campaigns"
    model:
      provider: "openai"
      name: "gpt-4o-mini"

ReflectionRouter

For A→B→A→C feedback patterns:
routers:
  - name: "reflection_router"
    type: "reflection"
    flow_pattern: [writer, critic, writer]  # A → B → A pattern
    model:
      provider: "openai"
      name: "gpt-4o-mini"

PlanExecuteRouter

For Cursor-style plan-and-execute workflows:
routers:
  - name: "plan_router"
    type: "plan_execute"
  agents:
    planner: "Creates detailed execution plans"
    developer: "Implements features according to plan"
    tester: "Tests implementations and validates functionality"
    reviewer: "Reviews and approves completed work"
  settings:
    planner_agent: planner
    executor_agent: developer
    reviewer_agent: reviewer

Workflow Patterns

Sequential Processing

# A → B → C
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .connect(agent_b, agent_c)
    .end_with(agent_c)
)

Parallel Processing

# A → [B, C] → D
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c])
    .connect(agent_b, agent_d)
    .connect(agent_c, agent_d)
    .end_with(agent_d)
)

Fan-out/Fan-in

# A → [B, C, D] → E
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d, agent_e])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c, agent_d])
    .connect(agent_b, agent_e)
    .connect(agent_c, agent_e)
    .connect(agent_d, agent_e)
    .end_with(agent_e)
)

Memory Management

Shared Memory

from flo_ai.arium.memory import MessageMemory

# Create shared memory
shared_memory = MessageMemory()

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_memory(shared_memory)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Custom Memory

from flo_ai.arium.memory import BaseMemory

class CustomMemory(BaseMemory):
    def __init__(self):
        self.data = {}
    
    def add(self, key, value):
        self.data[key] = value
    
    def get(self, key):
        return self.data.get(key)

custom_memory = CustomMemory()

Event Handling

Workflow Events

async def on_workflow_start(workflow, input_data):
    print(f"Workflow started with input: {input_data}")

async def on_workflow_complete(workflow, result):
    print(f"Workflow completed with result: {result}")

async def on_agent_start(agent, input_data):
    print(f"Agent {agent.name} started")

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_event_handler('workflow_start', on_workflow_start)
    .with_event_handler('workflow_complete', on_workflow_complete)
    .with_event_handler('agent_start', on_agent_start)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Error Handling

Retry Logic

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_retries(3)  # Retry failed agents up to 3 times
    .with_timeout(60)  # 60 second timeout
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Error Recovery

async def error_handler(agent, error):
    print(f"Agent {agent.name} failed: {error}")
    # Implement custom error recovery logic
    return "fallback_response"

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_error_handler(error_handler)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Performance Optimization

Parallel Execution

# Execute multiple agents in parallel
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect_parallel(agent_a, [agent_b, agent_c])
    .end_with([agent_b, agent_c])
)

Caching

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_cache(ttl=3600)  # Cache results for 1 hour
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Best Practices

Workflow Design

  1. Keep it simple: Start with linear workflows before adding complexity
  2. Use meaningful names: Name agents and workflows descriptively
  3. Handle errors: Always implement error handling and recovery
  4. Test thoroughly: Test workflows with various inputs

Performance Tips

  1. Use appropriate models: Choose models based on task complexity
  2. Implement caching: Cache expensive operations
  3. Optimize routing: Use efficient routing logic
  4. Monitor performance: Use telemetry to track workflow performance

Debugging

# Enable debug mode
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_debug(True)  # Enable debug logging
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)
I