Skip to main content
Laddr provides everything you need to build production-ready multi-agent systems.

Scalability

Queue-based architecture with horizontal scaling support.

Parallel Workers

Run multiple agent workers in parallel to handle high throughput:
# Scale workers
laddr scale researcher 5
laddr scale coordinator 3

Queue Backends

Choose your message queue backend:
  • Redis - Fast, lightweight (development)
  • Kafka - Durable, scalable (production)
  • Memory - In-memory (testing)
# .env
QUEUE_BACKEND=redis  # or kafka, memory

Load Distribution

Kafka automatically distributes tasks across workers using consumer groups:

Observability

Complete visibility into agent execution with traces, metrics, and dashboards.

Real-Time Dashboard

Web-based dashboard at http://localhost:5173:
  • Agent Traces - See every execution step
  • Token Usage - Track LLM costs
  • System Metrics - Monitor performance
  • Job History - Review past executions

Structured Traces

Every agent execution is traced:
{
  "job_id": "abc-123",
  "agent_name": "researcher",
  "events": [
    {"type": "task_start", "timestamp": "..."},
    {"type": "llm_call", "tokens": 150},
    {"type": "tool_call", "tool": "web_search"},
    {"type": "task_complete", "result": "..."}
  ]
}

Logging

Comprehensive logging at multiple levels:
# View logs
laddr logs researcher --follow
laddr logs coordinator --tail 100

Extensibility

Connect your own tools, APIs, and models with full control.

Custom Tools

Create tools with the @tool decorator:
from laddr import tool

@tool(
    name="weather_lookup",
    description="Get weather for a city",
    parameters={
        "type": "object",
        "properties": {
            "city": {"type": "string"}
        }
    }
)
def weather_lookup(city: str):
    return get_weather(city)

agent = Agent(
    name="weather_agent",
    tools=[weather_lookup]
)

MCP Integration

Connect to Model Context Protocol servers:
from laddr.core.mcp_tools import MCPToolProvider

mcp = MCPToolProvider(
    command="npx -y @modelcontextprotocol/server-filesystem /path",
    transport="stdio"
)

agent = Agent(
    name="filesystem_agent",
    tools=[mcp]
)

System Tool Overrides

Customize delegation and storage behavior:
from laddr import override_system_tool, TaskDelegationTool

@override_system_tool("system_delegate_task")
async def custom_delegate(
    agent_name: str,
    task_description: str,
    task: str,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    # Add custom logic
    logger.info(f"Delegating to {agent_name}")
    
    # Reuse base functionality
    tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    return await tool.delegate_task(...)


Configurability

Flexible configuration for every component.

Storage Backends

Choose your storage backend:
  • MinIO - S3-compatible object storage
  • S3 - AWS S3
  • Local - File system (development)
# .env
STORAGE_BACKEND=minio
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin

Database Options

  • PostgreSQL - Production database
  • SQLite - Local development
# .env
DB_BACKEND=postgresql
DATABASE_URL=postgresql://user:pass@localhost:5432/laddr

LLM Providers

Support for multiple LLM providers:
  • OpenAI - GPT-4, GPT-3.5
  • Anthropic - Claude
  • Google - Gemini
  • Ollama - Local models
from laddr.llms import openai, gemini, ollama

# OpenAI
agent = Agent(..., llm=openai(model="gpt-4o-mini"))

# Gemini
agent = Agent(..., llm=gemini(model="gemini-2.5-flash"))

# Ollama (local)
agent = Agent(..., llm=ollama(model="llama3.2:latest"))

APIs

REST API for integration with your systems.

Submit Jobs

POST /api/prompts
Content-Type: application/json

{
  "prompt_name": "researcher",
  "inputs": {"query": "Latest AI trends"}
}

Get Results

GET /api/prompts/{prompt_id}

Health Check

GET /api/health
See the API Reference for complete documentation.

Agent Orchestration

Coordinate multiple agents working together.

Task Delegation

Agents can delegate tasks to other agents:
# In coordinator agent
result = await self.delegate_task(
    agent_name="researcher",
    task_description="Research AI trends",
    task="research",
    task_data={"topic": "AI trends"}
)

Parallel Execution

Run multiple tasks in parallel:
results = await self.delegate_parallel(
    agent_name="analyzer",
    tasks=[
        {"task": "analyze", "task_data": {"data": data1}},
        {"task": "analyze", "task_data": {"data": data2}},
        {"task": "analyze", "task_data": {"data": data3}}
    ]
)

Workflow Patterns


Error Handling

Built-in retry logic and error recovery.

Automatic Retries

agent = Agent(
    name="researcher",
    max_retries=3,  # Retry failed tasks
    timeout=60      # Timeout in seconds
)

Error Tracking

All errors are logged and traced:
{
  "event_type": "tool_error",
  "tool": "web_search",
  "error": "API rate limit exceeded",
  "retry_count": 1
}

Next Steps