This guide explains how to create custom system tools by importing and extending Laddr’s base tool classes.
Overview
Laddr provides three base system tool classes that you can import and use to build custom overrides:
TaskDelegationTool - Single-task delegation to other agents
ParallelDelegationTool - Parallel multi-task delegation (fan-out pattern)
ArtifactStorageTool - Storage and retrieval of large data artifacts
These classes are fully composable, meaning you can use them as building blocks within your custom tool implementations.
You can import the base tools from three different locations:
Top-Level Import (Recommended)
from laddr import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool
Core Module Import
from laddr.core import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool
Direct Module Import
from laddr.core.system_tools import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool
Importing the Override Decorator
To register your custom tools, you’ll need the override decorator:
from laddr import override_system_tool
# or
from laddr.core import override_system_tool
Runtime Injection
When your custom tool override is called, the Laddr runtime automatically injects three special keyword arguments:
_message_bus - The message queue backend (Redis/Kafka)
_artifact_storage - The storage backend (MinIO/S3)
_agent - The current agent instance (for job_id propagation)
These are injected by the runtime - you don’t pass them manually.
All base tool classes accept these dependencies in their constructors:
# TaskDelegationTool and ParallelDelegationTool
tool = TaskDelegationTool(
message_bus=_message_bus,
artifact_storage=_artifact_storage,
agent=_agent
)
# ArtifactStorageTool
tool = ArtifactStorageTool(
storage_backend=_artifact_storage,
default_bucket="my-bucket"
)
Common Patterns
Pattern 1: Enhance with Pre/Post Processing
Add logging, metrics, or validation while reusing base functionality:
from laddr import override_system_tool, TaskDelegationTool
import logging
logger = logging.getLogger(__name__)
@override_system_tool("system_delegate_parallel")
async def custom_parallel_delegation(
agent_name: str,
tasks: list[dict],
timeout_seconds: int = 300,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
"""Enhanced parallel delegation with logging and metrics."""
# Pre-processing
logger.info(f"Starting parallel delegation of {len(tasks)} tasks to {agent_name}")
# Reuse base TaskDelegationTool for actual delegation
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
results = []
for task in tasks:
logger.debug(f"Delegating task: {task.get('task_description', 'N/A')}")
result = await delegation_tool.delegate_task(
agent_name=agent_name,
task_description=task.get("task_description", ""),
task=task.get("task"),
task_data=task.get("task_data"),
timeout_seconds=timeout_seconds
)
results.append(result)
# Post-processing
logger.info(f"Completed {len(results)} delegations")
return {"results": results, "total": len(results)}
Pattern 2: Add Rate Limiting
Control delegation rate to prevent overwhelming downstream agents:
from laddr import override_system_tool, TaskDelegationTool
import asyncio
import time
class RateLimiter:
def __init__(self, max_per_second: int):
self.max_per_second = max_per_second
self.last_call = 0
async def wait(self):
now = time.time()
elapsed = now - self.last_call
if elapsed < (1.0 / self.max_per_second):
await asyncio.sleep((1.0 / self.max_per_second) - elapsed)
self.last_call = time.time()
rate_limiter = RateLimiter(max_per_second=5)
@override_system_tool("system_delegate_task")
async def rate_limited_delegation(
agent_name: str,
task_description: str,
task: str,
task_data: dict = None,
timeout_seconds: int = 300,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
"""Rate-limited task delegation."""
# Wait for rate limiter
await rate_limiter.wait()
# Use base tool for actual delegation
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
return await delegation_tool.delegate_task(
agent_name=agent_name,
task_description=task_description,
task=task,
task_data=task_data,
timeout_seconds=timeout_seconds
)
Pattern 3: Add Retry Logic
Implement automatic retries for failed delegations:
from laddr import override_system_tool, TaskDelegationTool
import asyncio
import logging
logger = logging.getLogger(__name__)
@override_system_tool("system_delegate_task")
async def retry_delegation(
agent_name: str,
task_description: str,
task: str,
task_data: dict = None,
timeout_seconds: int = 300,
max_retries: int = 3,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
"""Task delegation with automatic retries."""
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
for attempt in range(max_retries):
try:
result = await delegation_tool.delegate_task(
agent_name=agent_name,
task_description=task_description,
task=task,
task_data=task_data,
timeout_seconds=timeout_seconds
)
return result
except Exception as e:
logger.warning(f"Delegation attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
logger.error(f"All {max_retries} attempts failed")
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)
Pattern 4: Add Circuit Breaker
Prevent cascading failures by stopping delegation to failing agents:
from laddr import override_system_tool, TaskDelegationTool
from collections import defaultdict
import time
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = defaultdict(int)
self.last_failure_time = defaultdict(float)
def is_open(self, agent_name: str) -> bool:
"""Check if circuit breaker is open for this agent."""
if self.failures[agent_name] < self.failure_threshold:
return False
# Check if timeout has elapsed
if time.time() - self.last_failure_time[agent_name] > self.timeout:
# Reset circuit breaker
self.failures[agent_name] = 0
return False
return True
def record_failure(self, agent_name: str):
"""Record a failure for this agent."""
self.failures[agent_name] += 1
self.last_failure_time[agent_name] = time.time()
def record_success(self, agent_name: str):
"""Record a success for this agent."""
self.failures[agent_name] = 0
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
@override_system_tool("system_delegate_task")
async def circuit_breaker_delegation(
agent_name: str,
task_description: str,
task: str,
task_data: dict = None,
timeout_seconds: int = 300,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
"""Task delegation with circuit breaker pattern."""
# Check circuit breaker
if circuit_breaker.is_open(agent_name):
raise RuntimeError(f"Circuit breaker open for agent '{agent_name}'")
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
try:
result = await delegation_tool.delegate_task(
agent_name=agent_name,
task_description=task_description,
task=task,
task_data=task_data,
timeout_seconds=timeout_seconds
)
circuit_breaker.record_success(agent_name)
return result
except Exception as e:
circuit_breaker.record_failure(agent_name)
raise
Pattern 5: Custom Artifact Storage with Compression
Add automatic compression for large artifacts:
from laddr import override_system_tool, ArtifactStorageTool
import gzip
import json
@override_system_tool("system_store_artifact")
async def compressed_storage(
data: dict,
artifact_type: str,
metadata: dict = None,
bucket: str = None,
compress: bool = True,
_artifact_storage=None,
**kwargs
):
"""Store artifacts with optional compression."""
storage_tool = ArtifactStorageTool(
storage_backend=_artifact_storage,
default_bucket=bucket or "artifacts"
)
# Compress if enabled and data is large
if compress and len(json.dumps(data)) > 1024: # Compress if > 1KB
compressed_data = gzip.compress(json.dumps(data).encode())
# Update metadata to indicate compression
metadata = metadata or {}
metadata["compressed"] = True
metadata["original_size"] = len(json.dumps(data))
metadata["compressed_size"] = len(compressed_data)
# Store compressed data
return await storage_tool.store_artifact(
data={"compressed": compressed_data.hex()},
artifact_type=artifact_type,
metadata=metadata,
bucket=bucket
)
else:
# Store uncompressed
return await storage_tool.store_artifact(
data=data,
artifact_type=artifact_type,
metadata=metadata,
bucket=bucket
)
Handles single-task delegation to other agents.
class TaskDelegationTool:
def __init__(
self,
message_bus: MessageBus,
artifact_storage: Storage,
agent: Agent
):
"""
Initialize task delegation tool.
Args:
message_bus: Message queue backend
artifact_storage: Storage backend for large payloads
agent: Current agent instance
"""
pass
async def delegate_task(
self,
agent_name: str,
task_description: str,
task: str,
task_data: dict = None,
timeout_seconds: int = 300
) -> dict:
"""
Delegate a task to another agent.
Args:
agent_name: Name of target agent
task_description: Human-readable task description
task: Task type/action identifier
task_data: Additional task parameters
timeout_seconds: Maximum wait time for result
Returns:
Task result from target agent
"""
pass
Handles parallel multi-task delegation (fan-out pattern).
class ParallelDelegationTool:
def __init__(
self,
message_bus: MessageBus,
artifact_storage: Storage,
agent: Agent
):
"""
Initialize parallel delegation tool.
Args:
message_bus: Message queue backend
artifact_storage: Storage backend
agent: Current agent instance
"""
pass
async def delegate_parallel(
self,
agent_name: str,
tasks: list[dict],
timeout_seconds: int = 300
) -> dict:
"""
Delegate multiple tasks in parallel to an agent.
Args:
agent_name: Name of target agent
tasks: List of task dictionaries with 'task' and 'task_data' keys
timeout_seconds: Maximum wait time per task
Returns:
Dictionary with 'results' list and 'total' count
"""
pass
Handles storage and retrieval of large data artifacts.
class ArtifactStorageTool:
def __init__(
self,
storage_backend: Storage,
default_bucket: str = "artifacts"
):
"""
Initialize artifact storage tool.
Args:
storage_backend: Storage backend (MinIO/S3)
default_bucket: Default bucket name
"""
pass
async def store_artifact(
self,
data: dict,
artifact_type: str,
metadata: dict = None,
bucket: str = None
) -> dict:
"""
Store a data artifact.
Args:
data: Data to store
artifact_type: Type/category of artifact
metadata: Additional metadata
bucket: Storage bucket (optional)
Returns:
Dictionary with artifact_id and storage location
"""
pass
async def retrieve_artifact(
self,
artifact_id: str,
bucket: str = None
) -> dict:
"""
Retrieve a stored artifact.
Args:
artifact_id: Unique artifact identifier
bucket: Storage bucket (optional)
Returns:
Original artifact data
"""
pass
You can override any of the following system tools:
system_delegate_task - Single task delegation
system_delegate_parallel - Parallel task delegation
system_store_artifact - Store data artifact
system_retrieve_artifact - Retrieve data artifact
Registering Your Overrides
In Agent Files
Define your overrides in your agent Python file:
from laddr import Agent, override_system_tool, TaskDelegationTool
@override_system_tool("system_delegate_task")
async def my_custom_delegate(
agent_name: str,
task_description: str,
task: str,
task_data: dict = None,
timeout_seconds: int = 300,
_message_bus=None,
_artifact_storage=None,
_agent=None
):
"""My custom delegation logic."""
# Your custom logic here
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
return await delegation_tool.delegate_task(...)
class MyAgent(Agent):
"""Your agent implementation."""
pass
In Separate Override Modules
Create a dedicated module for your overrides:
# overrides/delegation.py
from laddr import override_system_tool, TaskDelegationTool
@override_system_tool("system_delegate_task")
async def custom_delegate(...):
"""Custom delegation."""
pass
@override_system_tool("system_delegate_parallel")
async def custom_parallel_delegate(...):
"""Custom parallel delegation."""
pass
Then import in your agent:
# my_agent.py
from laddr import Agent
import overrides.delegation # Registers overrides on import
class MyAgent(Agent):
pass
Testing Your Overrides
Verify Override Registration
from laddr.core.system_tools import get_tool_override, list_tool_overrides
# Check if your override is registered
override_func = get_tool_override("system_delegate_task")
print(f"Override registered: {override_func is not None}")
# List all registered overrides
all_overrides = list_tool_overrides()
print(f"All overrides: {all_overrides}")
Clear Overrides (for testing)
from laddr.core.system_tools import clear_tool_overrides
# Clear all overrides (useful in test cleanup)
clear_tool_overrides()
Best Practices
1. Always Accept Runtime-Injected Parameters
Your override functions must accept these parameters:
async def my_override(
# ... your parameters ...
_message_bus=None, # Always include
_artifact_storage=None, # Always include
_agent=None # Always include
):
pass
Don’t reimplement delegation logic - use the base tools:
# ❌ Bad - reimplementing delegation
async def my_override(...):
# Manually publish to message bus, handle responses, etc.
pass
# ✅ Good - reusing base tool
async def my_override(..., _message_bus=None, _artifact_storage=None, _agent=None):
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
return await delegation_tool.delegate_task(...)
3. Preserve Function Signatures
Maintain compatibility with the original system tool signatures.
4. Handle Errors Gracefully
import logging
logger = logging.getLogger(__name__)
@override_system_tool("system_delegate_task")
async def my_override(...):
try:
# Your logic
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
result = await delegation_tool.delegate_task(...)
return result
except Exception as e:
logger.error(f"Delegation failed: {e}", exc_info=True)
# Optionally re-raise or return error result
return {"error": str(e), "success": False}
5. Add Comprehensive Logging
import logging
logger = logging.getLogger(__name__)
@override_system_tool("system_delegate_task")
async def my_override(agent_name: str, task_description: str, ...):
logger.info(f"Custom delegation to {agent_name}: {task_description}")
try:
result = await ...
logger.info(f"Delegation succeeded: {result}")
return result
except Exception as e:
logger.error(f"Delegation failed: {e}")
raise
Troubleshooting
Override Not Being Called
-
Check registration:
from laddr.core.system_tools import get_tool_override
override = get_tool_override("system_delegate_task")
print(f"Registered: {override is not None}")
-
Verify import: Make sure your override module is imported before the agent runs
-
Check function signature: Ensure your override has the required parameters
Runtime Injection Not Working
Make sure you’re accepting the injected parameters:
# ✅ Correct
async def my_override(..., _message_bus=None, _artifact_storage=None, _agent=None):
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
# ❌ Wrong - missing injected parameters
async def my_override(...):
delegation_tool = TaskDelegationTool(???, ???, ???) # No dependencies!
Ensure you’re passing the injected dependencies correctly:
# ✅ Correct
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
# ❌ Wrong - wrong parameter names
delegation_tool = TaskDelegationTool(message_bus, storage, agent) # Undefined!
# ❌ Wrong - wrong order
delegation_tool = TaskDelegationTool(_agent, _message_bus, _artifact_storage)
Next Steps