AI agents represent the next evolution beyond simple LLM chatbots—systems that can plan, use tools, and work towards goals with minimal human intervention. This post explores how to build reliable agentic systems for production use.
Agent Architecture
from enum import Enum
from typing import List, Dict, Callable, Optional
from dataclasses import dataclass
class AgentState(Enum):
PLANNING = "planning"
EXECUTING = "executing"
OBSERVING = "observing"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class Task:
goal: str
constraints: List[str]
max_steps: int = 10
timeout_seconds: int = 300
@dataclass
class Action:
tool_name: str
parameters: Dict
rationale: str
class Agent:
def __init__(self, llm, tools: Dict[str, Callable], memory):
self.llm = llm
self.tools = tools
self.memory = memory
self.state = AgentState.PLANNING
async def execute_task(self, task: Task) -> Dict:
"""Main agent loop"""
self.memory.clear()
self.memory.add("system", self._get_system_prompt())
self.memory.add("user", f"Goal: {task.goal}")
for step in range(task.max_steps):
if self.state == AgentState.COMPLETE:
break
# Plan next action
action = await self._plan_action(task)
if action.tool_name == "finish":
self.state = AgentState.COMPLETE
break
# Execute action
observation = await self._execute_action(action)
# Update memory
self.memory.add("assistant", f"Action: {action.tool_name}({action.parameters})")
self.memory.add("system", f"Observation: {observation}")
return {
"status": self.state.value,
"steps": step + 1,
"result": self.memory.get_recent(1)[0] if self.state == AgentState.COMPLETE else None
}
async def _plan_action(self, task: Task) -> Action:
"""Plan next action based on current state"""
prompt = self._build_planning_prompt(task)
response = await self.llm.generate(prompt)
return self._parse_action(response)
async def _execute_action(self, action: Action) -> str:
"""Execute tool and return observation"""
if action.tool_name not in self.tools:
return f"Error: Unknown tool '{action.tool_name}'"
try:
result = await self.tools[action.tool_name](**action.parameters)
return str(result)
except Exception as e:
return f"Error executing {action.tool_name}: {str(e)}"
Tool Use and Function Calling
class ToolRegistry:
"""Manage available tools for agent"""
def __init__(self):
self.tools = {}
def register(self, name: str, description: str):
"""Decorator to register tools"""
def decorator(func):
self.tools[name] = {
"function": func,
"description": description,
"parameters": self._extract_parameters(func)
}
return func
return decorator
def get_tool_descriptions(self) -> str:
"""Format tools for LLM prompt"""
descriptions = []
for name, tool in self.tools.items():
params = ", ".join(f"{k}: {v}" for k, v in tool["parameters"].items())
descriptions.append(f"{name}({params}): {tool['description']}")
return "\n".join(descriptions)
# Example tools
registry = ToolRegistry()
@registry.register("search_web", "Search the web for information")
async def search_web(query: str) -> str:
"""Search implementation"""
# Call actual search API
return f"Search results for: {query}"
@registry.register("read_file", "Read contents of a file")
async def read_file(path: str) -> str:
"""File reading implementation"""
with open(path, 'r') as f:
return f.read()
@registry.register("execute_code", "Execute Python code")
async def execute_code(code: str) -> str:
"""Sandboxed code execution"""
# Use sandbox environment
result = run_in_sandbox(code)
return str(result)
Planning and Reasoning
class PlanningAgent(Agent):
"""Agent with explicit planning phase"""
async def execute_task(self, task: Task) -> Dict:
# Phase 1: Create plan
plan = await self._create_plan(task)
# Phase 2: Execute plan steps
results = []
for step in plan["steps"]:
result = await self._execute_step(step)
results.append(result)
# Check if replanning needed
if result["status"] == "failed":
plan = await self._replan(task, results)
return {
"plan": plan,
"results": results,
"status": "complete" if all(r["status"] == "success" for r in results) else "failed"
}
async def _create_plan(self, task: Task) -> Dict:
"""Create step-by-step plan"""
prompt = f"""Create a detailed plan to accomplish this goal: {task.goal}
Break down into specific, executable steps. For each step, specify:
1. What tool to use
2. What parameters to pass
3. What the expected outcome is
Available tools:
{self.registry.get_tool_descriptions()}
Output format:
Step 1: [tool_name](parameters) - expected outcome
Step 2: [tool_name](parameters) - expected outcome
...
Plan:"""
response = await self.llm.generate(prompt)
return self._parse_plan(response)
async def _replan(self, task: Task, completed_steps: List[Dict]) -> Dict:
"""Replan based on what's been accomplished"""
context = "\n".join([
f"Step {i+1}: {s['action']} - {'Success' if s['status'] == 'success' else f'Failed: {s.get('error')}'}"
for i, s in enumerate(completed_steps)
])
prompt = f"""Original goal: {task.goal}
Steps completed so far:
{context}
Some steps failed. Create a new plan that accounts for what's already been done and works around the failures.
New plan:"""
response = await self.llm.generate(prompt)
return self._parse_plan(response)
Memory and Context Management
class AgentMemory:
"""Manage agent's working memory and context"""
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
self.messages = []
self.important_context = []
def add(self, role: str, content: str, important: bool = False):
"""Add message to memory"""
self.messages.append({"role": role, "content": content})
if important:
self.important_context.append(content)
# Manage context window
self._manage_context()
def _manage_context(self):
"""Keep memory within token limit"""
total_tokens = self._count_tokens(self.messages)
if total_tokens > self.max_tokens:
# Summarize old messages
old_messages = self.messages[:-10] # Keep last 10
summary = self._summarize_messages(old_messages)
# Replace with summary
self.messages = [
{"role": "system", "content": f"Previous conversation summary: {summary}"}
] + self.messages[-10:]
def _summarize_messages(self, messages: List[Dict]) -> str:
"""Summarize old messages"""
# Use LLM to create concise summary
conversation = "\n".join([f"{m['role']}: {m['content']}" for m in messages])
summary_prompt = f"""Summarize this conversation history concisely:
{conversation}
Summary:"""
# This would call LLM
return "Summary placeholder"
def get_context(self) -> List[Dict]:
"""Get messages for LLM context"""
return self.messages
Safety and Guardrails
class SafeAgent(Agent):
"""Agent with safety guardrails"""
def __init__(self, llm, tools, memory, safety_config):
super().__init__(llm, tools, memory)
self.safety_config = safety_config
self.rejected_actions = []
async def _execute_action(self, action: Action) -> str:
"""Execute action with safety checks"""
# Check if action is allowed
if not self._is_action_safe(action):
self.rejected_actions.append(action)
return f"Action rejected by safety policy: {action.tool_name}"
# Execute with resource limits
result = await self._execute_with_limits(action)
# Validate output
if not self._is_output_safe(result):
return "Output rejected by safety policy"
return result
def _is_action_safe(self, action: Action) -> bool:
"""Check if action violates safety policies"""
# Check against allowlist
if self.safety_config.get("allowlist"):
if action.tool_name not in self.safety_config["allowlist"]:
return False
# Check parameter constraints
for param, value in action.parameters.items():
if not self._is_parameter_safe(param, value):
return False
# Check rate limits
if not self._check_rate_limit(action.tool_name):
return False
return True
def _is_parameter_safe(self, param: str, value: any) -> bool:
"""Validate parameter values"""
# Check for path traversal
if isinstance(value, str) and ".." in value:
return False
# Check for SQL injection patterns
if isinstance(value, str) and any(kw in value.lower() for kw in ["drop", "delete", "truncate"]):
return False
# Check size limits
if isinstance(value, str) and len(value) > 10000:
return False
return True
Multi-Agent Collaboration
class AgentTeam:
"""Coordinate multiple specialized agents"""
def __init__(self, agents: Dict[str, Agent], coordinator_llm):
self.agents = agents
self.coordinator = coordinator_llm
async def solve(self, task: Task) -> Dict:
"""Decompose task and assign to agents"""
# Break down task
subtasks = await self._decompose_task(task)
# Assign to appropriate agents
assignments = self._assign_subtasks(subtasks)
# Execute in parallel where possible
results = await self._execute_assignments(assignments)
# Synthesize results
final_result = await self._synthesize_results(task, results)
return final_result
async def _decompose_task(self, task: Task) -> List[Dict]:
"""Break task into subtasks"""
agent_descriptions = "\n".join([
f"{name}: {agent.description}"
for name, agent in self.agents.items()
])
prompt = f"""Break down this task into subtasks that can be handled by specialized agents:
Task: {task.goal}
Available agents:
{agent_descriptions}
Subtasks (specify which agent should handle each):"""
response = await self.coordinator.generate(prompt)
return self._parse_subtasks(response)
async def _execute_assignments(self, assignments: List[Dict]) -> List[Dict]:
"""Execute subtasks"""
import asyncio
tasks = []
for assignment in assignments:
agent = self.agents[assignment["agent"]]
task = Task(goal=assignment["subtask"])
tasks.append(agent.execute_task(task))
results = await asyncio.gather(*tasks)
return results
Monitoring and Debugging
class MonitoredAgent(Agent):
"""Agent with detailed logging and metrics"""
def __init__(self, llm, tools, memory, metrics):
super().__init__(llm, tools, memory)
self.metrics = metrics
self.trace = []
async def execute_task(self, task: Task) -> Dict:
"""Execute with full tracing"""
start_time = time.time()
try:
result = await super().execute_task(task)
# Record success metrics
self.metrics.record_task(
success=True,
duration=time.time() - start_time,
steps=result["steps"]
)
return result
except Exception as e:
# Record failure
self.metrics.record_task(
success=False,
duration=time.time() - start_time,
error=str(e)
)
raise
async def _execute_action(self, action: Action) -> str:
"""Execute with detailed logging"""
self.trace.append({
"timestamp": time.time(),
"action": action.tool_name,
"parameters": action.parameters,
"rationale": action.rationale
})
start_time = time.time()
result = await super()._execute_action(action)
duration = time.time() - start_time
self.trace[-1].update({
"result": result[:200], # Truncate
"duration": duration,
"success": not result.startswith("Error")
})
# Record metrics
self.metrics.record_action(
tool=action.tool_name,
duration=duration,
success=not result.startswith("Error")
)
return result
Conclusion
Production AI agents require:
- Structured planning - Don’t just react, plan ahead
- Safe tool use - Validate all actions and outputs
- Memory management - Handle long conversations
- Error recovery - Retry and replan on failures
- Monitoring - Track every action and outcome
- Guardrails - Enforce safety policies
Agents are powerful but complex. Start simple, add capabilities incrementally, and always prioritize safety and observability.