Skip to main content
This guide explains how to create custom system tools by importing and extending Laddr’s base tool classes.

Overview

Laddr provides three base system tool classes that you can import and use to build custom overrides:
  • TaskDelegationTool - Single-task delegation to other agents
  • ParallelDelegationTool - Parallel multi-task delegation (fan-out pattern)
  • ArtifactStorageTool - Storage and retrieval of large data artifacts
These classes are fully composable, meaning you can use them as building blocks within your custom tool implementations.

Importing Base Tools

You can import the base tools from three different locations:
from laddr import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool

Core Module Import

from laddr.core import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool

Direct Module Import

from laddr.core.system_tools import TaskDelegationTool, ParallelDelegationTool, ArtifactStorageTool


Importing the Override Decorator

To register your custom tools, you’ll need the override decorator:
from laddr import override_system_tool
# or
from laddr.core import override_system_tool


Understanding System Tool Architecture

Runtime Injection

When your custom tool override is called, the Laddr runtime automatically injects three special keyword arguments:
  • _message_bus - The message queue backend (Redis/Kafka)
  • _artifact_storage - The storage backend (MinIO/S3)
  • _agent - The current agent instance (for job_id propagation)
These are injected by the runtime - you don’t pass them manually.

Base Tool Initialization

All base tool classes accept these dependencies in their constructors:
# TaskDelegationTool and ParallelDelegationTool
tool = TaskDelegationTool(
    message_bus=_message_bus,
    artifact_storage=_artifact_storage,
    agent=_agent
)

# ArtifactStorageTool
tool = ArtifactStorageTool(
    storage_backend=_artifact_storage,
    default_bucket="my-bucket"
)


Common Patterns

Pattern 1: Enhance with Pre/Post Processing

Add logging, metrics, or validation while reusing base functionality:
from laddr import override_system_tool, TaskDelegationTool
import logging

logger = logging.getLogger(__name__)

@override_system_tool("system_delegate_parallel")
async def custom_parallel_delegation(
    agent_name: str,
    tasks: list[dict],
    timeout_seconds: int = 300,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    """Enhanced parallel delegation with logging and metrics."""
    
    # Pre-processing
    logger.info(f"Starting parallel delegation of {len(tasks)} tasks to {agent_name}")
    
    # Reuse base TaskDelegationTool for actual delegation
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    
    results = []
    for task in tasks:
        logger.debug(f"Delegating task: {task.get('task_description', 'N/A')}")
        
        result = await delegation_tool.delegate_task(
            agent_name=agent_name,
            task_description=task.get("task_description", ""),
            task=task.get("task"),
            task_data=task.get("task_data"),
            timeout_seconds=timeout_seconds
        )
        results.append(result)
    
    # Post-processing
    logger.info(f"Completed {len(results)} delegations")
    
    return {"results": results, "total": len(results)}

Pattern 2: Add Rate Limiting

Control delegation rate to prevent overwhelming downstream agents:
from laddr import override_system_tool, TaskDelegationTool
import asyncio
import time

class RateLimiter:
    def __init__(self, max_per_second: int):
        self.max_per_second = max_per_second
        self.last_call = 0
        
    async def wait(self):
        now = time.time()
        elapsed = now - self.last_call
        if elapsed < (1.0 / self.max_per_second):
            await asyncio.sleep((1.0 / self.max_per_second) - elapsed)
        self.last_call = time.time()

rate_limiter = RateLimiter(max_per_second=5)

@override_system_tool("system_delegate_task")
async def rate_limited_delegation(
    agent_name: str,
    task_description: str,
    task: str,
    task_data: dict = None,
    timeout_seconds: int = 300,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    """Rate-limited task delegation."""
    
    # Wait for rate limiter
    await rate_limiter.wait()
    
    # Use base tool for actual delegation
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    
    return await delegation_tool.delegate_task(
        agent_name=agent_name,
        task_description=task_description,
        task=task,
        task_data=task_data,
        timeout_seconds=timeout_seconds
    )

Pattern 3: Add Retry Logic

Implement automatic retries for failed delegations:
from laddr import override_system_tool, TaskDelegationTool
import asyncio
import logging

logger = logging.getLogger(__name__)

@override_system_tool("system_delegate_task")
async def retry_delegation(
    agent_name: str,
    task_description: str,
    task: str,
    task_data: dict = None,
    timeout_seconds: int = 300,
    max_retries: int = 3,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    """Task delegation with automatic retries."""
    
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    
    for attempt in range(max_retries):
        try:
            result = await delegation_tool.delegate_task(
                agent_name=agent_name,
                task_description=task_description,
                task=task,
                task_data=task_data,
                timeout_seconds=timeout_seconds
            )
            return result
            
        except Exception as e:
            logger.warning(f"Delegation attempt {attempt + 1} failed: {e}")
            
            if attempt == max_retries - 1:
                logger.error(f"All {max_retries} attempts failed")
                raise
                
            # Exponential backoff
            await asyncio.sleep(2 ** attempt)

Pattern 4: Add Circuit Breaker

Prevent cascading failures by stopping delegation to failing agents:
from laddr import override_system_tool, TaskDelegationTool
from collections import defaultdict
import time

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = defaultdict(int)
        self.last_failure_time = defaultdict(float)
        
    def is_open(self, agent_name: str) -> bool:
        """Check if circuit breaker is open for this agent."""
        if self.failures[agent_name] < self.failure_threshold:
            return False
            
        # Check if timeout has elapsed
        if time.time() - self.last_failure_time[agent_name] > self.timeout:
            # Reset circuit breaker
            self.failures[agent_name] = 0
            return False
            
        return True
        
    def record_failure(self, agent_name: str):
        """Record a failure for this agent."""
        self.failures[agent_name] += 1
        self.last_failure_time[agent_name] = time.time()
        
    def record_success(self, agent_name: str):
        """Record a success for this agent."""
        self.failures[agent_name] = 0

circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

@override_system_tool("system_delegate_task")
async def circuit_breaker_delegation(
    agent_name: str,
    task_description: str,
    task: str,
    task_data: dict = None,
    timeout_seconds: int = 300,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    """Task delegation with circuit breaker pattern."""
    
    # Check circuit breaker
    if circuit_breaker.is_open(agent_name):
        raise RuntimeError(f"Circuit breaker open for agent '{agent_name}'")
    
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    
    try:
        result = await delegation_tool.delegate_task(
            agent_name=agent_name,
            task_description=task_description,
            task=task,
            task_data=task_data,
            timeout_seconds=timeout_seconds
        )
        circuit_breaker.record_success(agent_name)
        return result
        
    except Exception as e:
        circuit_breaker.record_failure(agent_name)
        raise

Pattern 5: Custom Artifact Storage with Compression

Add automatic compression for large artifacts:
from laddr import override_system_tool, ArtifactStorageTool
import gzip
import json

@override_system_tool("system_store_artifact")
async def compressed_storage(
    data: dict,
    artifact_type: str,
    metadata: dict = None,
    bucket: str = None,
    compress: bool = True,
    _artifact_storage=None,
    **kwargs
):
    """Store artifacts with optional compression."""
    
    storage_tool = ArtifactStorageTool(
        storage_backend=_artifact_storage,
        default_bucket=bucket or "artifacts"
    )
    
    # Compress if enabled and data is large
    if compress and len(json.dumps(data)) > 1024:  # Compress if > 1KB
        compressed_data = gzip.compress(json.dumps(data).encode())
        
        # Update metadata to indicate compression
        metadata = metadata or {}
        metadata["compressed"] = True
        metadata["original_size"] = len(json.dumps(data))
        metadata["compressed_size"] = len(compressed_data)
        
        # Store compressed data
        return await storage_tool.store_artifact(
            data={"compressed": compressed_data.hex()},
            artifact_type=artifact_type,
            metadata=metadata,
            bucket=bucket
        )
    else:
        # Store uncompressed
        return await storage_tool.store_artifact(
            data=data,
            artifact_type=artifact_type,
            metadata=metadata,
            bucket=bucket
        )


System Tool Reference

TaskDelegationTool

Handles single-task delegation to other agents.
class TaskDelegationTool:
    def __init__(
        self,
        message_bus: MessageBus,
        artifact_storage: Storage,
        agent: Agent
    ):
        """
        Initialize task delegation tool.
        
        Args:
            message_bus: Message queue backend
            artifact_storage: Storage backend for large payloads
            agent: Current agent instance
        """
        pass
    
    async def delegate_task(
        self,
        agent_name: str,
        task_description: str,
        task: str,
        task_data: dict = None,
        timeout_seconds: int = 300
    ) -> dict:
        """
        Delegate a task to another agent.
        
        Args:
            agent_name: Name of target agent
            task_description: Human-readable task description
            task: Task type/action identifier
            task_data: Additional task parameters
            timeout_seconds: Maximum wait time for result
            
        Returns:
            Task result from target agent
        """
        pass

ParallelDelegationTool

Handles parallel multi-task delegation (fan-out pattern).
class ParallelDelegationTool:
    def __init__(
        self,
        message_bus: MessageBus,
        artifact_storage: Storage,
        agent: Agent
    ):
        """
        Initialize parallel delegation tool.
        
        Args:
            message_bus: Message queue backend
            artifact_storage: Storage backend
            agent: Current agent instance
        """
        pass
    
    async def delegate_parallel(
        self,
        agent_name: str,
        tasks: list[dict],
        timeout_seconds: int = 300
    ) -> dict:
        """
        Delegate multiple tasks in parallel to an agent.
        
        Args:
            agent_name: Name of target agent
            tasks: List of task dictionaries with 'task' and 'task_data' keys
            timeout_seconds: Maximum wait time per task
            
        Returns:
            Dictionary with 'results' list and 'total' count
        """
        pass

ArtifactStorageTool

Handles storage and retrieval of large data artifacts.
class ArtifactStorageTool:
    def __init__(
        self,
        storage_backend: Storage,
        default_bucket: str = "artifacts"
    ):
        """
        Initialize artifact storage tool.
        
        Args:
            storage_backend: Storage backend (MinIO/S3)
            default_bucket: Default bucket name
        """
        pass
    
    async def store_artifact(
        self,
        data: dict,
        artifact_type: str,
        metadata: dict = None,
        bucket: str = None
    ) -> dict:
        """
        Store a data artifact.
        
        Args:
            data: Data to store
            artifact_type: Type/category of artifact
            metadata: Additional metadata
            bucket: Storage bucket (optional)
            
        Returns:
            Dictionary with artifact_id and storage location
        """
        pass
    
    async def retrieve_artifact(
        self,
        artifact_id: str,
        bucket: str = None
    ) -> dict:
        """
        Retrieve a stored artifact.
        
        Args:
            artifact_id: Unique artifact identifier
            bucket: Storage bucket (optional)
            
        Returns:
            Original artifact data
        """
        pass


Available System Tools to Override

You can override any of the following system tools:
  • system_delegate_task - Single task delegation
  • system_delegate_parallel - Parallel task delegation
  • system_store_artifact - Store data artifact
  • system_retrieve_artifact - Retrieve data artifact

Registering Your Overrides

In Agent Files

Define your overrides in your agent Python file:
from laddr import Agent, override_system_tool, TaskDelegationTool

@override_system_tool("system_delegate_task")
async def my_custom_delegate(
    agent_name: str,
    task_description: str,
    task: str,
    task_data: dict = None,
    timeout_seconds: int = 300,
    _message_bus=None,
    _artifact_storage=None,
    _agent=None
):
    """My custom delegation logic."""
    # Your custom logic here
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    return await delegation_tool.delegate_task(...)

class MyAgent(Agent):
    """Your agent implementation."""
    pass

In Separate Override Modules

Create a dedicated module for your overrides:
# overrides/delegation.py
from laddr import override_system_tool, TaskDelegationTool

@override_system_tool("system_delegate_task")
async def custom_delegate(...):
    """Custom delegation."""
    pass

@override_system_tool("system_delegate_parallel")
async def custom_parallel_delegate(...):
    """Custom parallel delegation."""
    pass

Then import in your agent:
# my_agent.py
from laddr import Agent
import overrides.delegation  # Registers overrides on import

class MyAgent(Agent):
    pass


Testing Your Overrides

Verify Override Registration

from laddr.core.system_tools import get_tool_override, list_tool_overrides

# Check if your override is registered
override_func = get_tool_override("system_delegate_task")
print(f"Override registered: {override_func is not None}")

# List all registered overrides
all_overrides = list_tool_overrides()
print(f"All overrides: {all_overrides}")

Clear Overrides (for testing)

from laddr.core.system_tools import clear_tool_overrides

# Clear all overrides (useful in test cleanup)
clear_tool_overrides()


Best Practices

1. Always Accept Runtime-Injected Parameters

Your override functions must accept these parameters:
async def my_override(
    # ... your parameters ...
    _message_bus=None,      # Always include
    _artifact_storage=None,  # Always include
    _agent=None             # Always include
):
    pass

2. Reuse Base Tools When Possible

Don’t reimplement delegation logic - use the base tools:
# ❌ Bad - reimplementing delegation
async def my_override(...):
    # Manually publish to message bus, handle responses, etc.
    pass

# ✅ Good - reusing base tool
async def my_override(..., _message_bus=None, _artifact_storage=None, _agent=None):
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
    return await delegation_tool.delegate_task(...)

3. Preserve Function Signatures

Maintain compatibility with the original system tool signatures.

4. Handle Errors Gracefully

import logging

logger = logging.getLogger(__name__)

@override_system_tool("system_delegate_task")
async def my_override(...):
    try:
        # Your logic
        delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)
        result = await delegation_tool.delegate_task(...)
        return result
        
    except Exception as e:
        logger.error(f"Delegation failed: {e}", exc_info=True)
        # Optionally re-raise or return error result
        return {"error": str(e), "success": False}

5. Add Comprehensive Logging

import logging

logger = logging.getLogger(__name__)

@override_system_tool("system_delegate_task")
async def my_override(agent_name: str, task_description: str, ...):
    logger.info(f"Custom delegation to {agent_name}: {task_description}")
    
    try:
        result = await ...
        logger.info(f"Delegation succeeded: {result}")
        return result
        
    except Exception as e:
        logger.error(f"Delegation failed: {e}")
        raise


Troubleshooting

Override Not Being Called

  1. Check registration:
    from laddr.core.system_tools import get_tool_override
    override = get_tool_override("system_delegate_task")
    print(f"Registered: {override is not None}")
    
  2. Verify import: Make sure your override module is imported before the agent runs
  3. Check function signature: Ensure your override has the required parameters

Runtime Injection Not Working

Make sure you’re accepting the injected parameters:
# ✅ Correct
async def my_override(..., _message_bus=None, _artifact_storage=None, _agent=None):
    delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)

# ❌ Wrong - missing injected parameters
async def my_override(...):
    delegation_tool = TaskDelegationTool(???, ???, ???)  # No dependencies!

Base Tool Not Working

Ensure you’re passing the injected dependencies correctly:
# ✅ Correct
delegation_tool = TaskDelegationTool(_message_bus, _artifact_storage, _agent)

# ❌ Wrong - wrong parameter names
delegation_tool = TaskDelegationTool(message_bus, storage, agent)  # Undefined!

# ❌ Wrong - wrong order
delegation_tool = TaskDelegationTool(_agent, _message_bus, _artifact_storage)


Next Steps