Documentation Index
Fetch the complete documentation index at: https://laddr.agnetlabs.com/llms.txt
Use this file to discover all available pages before exploring further.
Laddr provides everything you need to build production-ready multi-agent systems.
Scalability
Queue-based architecture with horizontal scaling support.
Parallel Workers
Run multiple agent workers in parallel to handle high throughput:
# Scale workers
laddr scale researcher 5
laddr scale coordinator 3
Queue Backends
Choose your message queue backend:
- Redis - Fast, lightweight (development)
- Kafka - Durable, scalable (production)
- Memory - In-memory (testing)
# .env
QUEUE_BACKEND=redis # or kafka, memory
Load Distribution
Kafka automatically distributes tasks across workers using consumer groups:
Observability
Complete visibility into agent execution with traces, metrics, and dashboards.
Real-Time Dashboard
Web-based dashboard at http://localhost:5173:
- Agent Traces - See every execution step
- Token Usage - Track LLM costs
- System Metrics - Monitor performance
- Job History - Review past executions
Structured Traces
Every agent execution is traced:
{
"job_id": "abc-123",
"agent_name": "researcher",
"events": [
{"type": "task_start", "timestamp": "..."},
{"type": "llm_call", "tokens": 150},
{"type": "tool_call", "tool": "web_search"},
{"type": "task_complete", "result": "..."}
]
}
Logging
Comprehensive logging at multiple levels:
# View logs
laddr logs researcher --follow
laddr logs coordinator --tail 100
Extensibility
Connect your own tools, APIs, and models with full control.
Create tools with the @tool decorator:
from laddr import tool
@tool(
name="weather_lookup",
description="Get weather for a city",
parameters={
"type": "object",
"properties": {
"city": {"type": "string"}
}
}
)
def weather_lookup(city: str):
return get_weather(city)
agent = Agent(
name="weather_agent",
tools=[weather_lookup]
)
MCP Integration
Connect to Model Context Protocol servers:
from laddr.core.mcp_tools import MCPToolProvider
mcp = MCPToolProvider(
command="npx -y @modelcontextprotocol/server-filesystem /path",
transport="stdio"
)
agent = Agent(
name="filesystem_agent",
tools=[mcp]
)
Customize delegation and storage behavior:
from laddr import override_system_tool, TaskDelegationTool
@override_system_tool("system_delegate_task")
async def custom_delegate(
agent_name: str,
task_description: str,
task: str,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
# Add custom logic
logger.info(f"Delegating to {agent_name}")
# Reuse base functionality
tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
return await tool.delegate_task(...)
Configurability
Flexible configuration for every component.
Storage Backends
Choose your storage backend:
- MinIO - S3-compatible object storage
- S3 - AWS S3
- Local - File system (development)
# .env
STORAGE_BACKEND=minio
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
Database Options
- PostgreSQL - Production database
- SQLite - Local development
# .env
DB_BACKEND=postgresql
DATABASE_URL=postgresql://user:pass@localhost:5432/laddr
LLM Providers
Support for multiple LLM providers:
- OpenAI - GPT-4, GPT-3.5
- Anthropic - Claude
- Google - Gemini
- Ollama - Local models
from laddr.llms import openai, gemini, ollama
# OpenAI
agent = Agent(..., llm=openai(model="gpt-4o-mini"))
# Gemini
agent = Agent(..., llm=gemini(model="gemini-2.5-flash"))
# Ollama (local)
agent = Agent(..., llm=ollama(model="llama3.2:latest"))
APIs
REST API for integration with your systems.
Submit Jobs
POST /api/prompts
Content-Type: application/json
{
"prompt_name": "researcher",
"inputs": {"query": "Latest AI trends"}
}
Get Results
GET /api/prompts/{prompt_id}
Health Check
See the API Reference for complete documentation.
Agent Orchestration
Coordinate multiple agents working together.
Task Delegation
Agents can delegate tasks to other agents:
# In coordinator agent
result = await self.delegate_task(
agent_name="researcher",
task_description="Research AI trends",
task="research",
task_data={"topic": "AI trends"}
)
Parallel Execution
Run multiple tasks in parallel:
results = await self.delegate_parallel(
agent_name="analyzer",
tasks=[
{"task": "analyze", "task_data": {"data": data1}},
{"task": "analyze", "task_data": {"data": data2}},
{"task": "analyze", "task_data": {"data": data3}}
]
)
Workflow Patterns
Error Handling
Built-in retry logic and error recovery.
Automatic Retries
agent = Agent(
name="researcher",
max_retries=3, # Retry failed tasks
timeout=60 # Timeout in seconds
)
Error Tracking
All errors are logged and traced:
{
"event_type": "tool_error",
"tool": "web_search",
"error": "API rate limit exceeded",
"retry_count": 1
}
Next Steps