AI agents represent the next evolution beyond simple LLM chatbots—systems that can plan, use tools, and work towards goals with minimal human intervention. This post explores how to build reliable agentic systems for production use.

Agent Architecture

from enum import Enum
from typing import List, Dict, Callable, Optional
from dataclasses import dataclass

class AgentState(Enum):
    PLANNING = "planning"
    EXECUTING = "executing"
    OBSERVING = "observing"
    COMPLETE = "complete"
    FAILED = "failed"

@dataclass
class Task:
    goal: str
    constraints: List[str]
    max_steps: int = 10
    timeout_seconds: int = 300

@dataclass
class Action:
    tool_name: str
    parameters: Dict
    rationale: str

class Agent:
    def __init__(self, llm, tools: Dict[str, Callable], memory):
        self.llm = llm
        self.tools = tools
        self.memory = memory
        self.state = AgentState.PLANNING

    async def execute_task(self, task: Task) -> Dict:
        """Main agent loop"""

        self.memory.clear()
        self.memory.add("system", self._get_system_prompt())
        self.memory.add("user", f"Goal: {task.goal}")

        for step in range(task.max_steps):
            if self.state == AgentState.COMPLETE:
                break

            # Plan next action
            action = await self._plan_action(task)

            if action.tool_name == "finish":
                self.state = AgentState.COMPLETE
                break

            # Execute action
            observation = await self._execute_action(action)

            # Update memory
            self.memory.add("assistant", f"Action: {action.tool_name}({action.parameters})")
            self.memory.add("system", f"Observation: {observation}")

        return {
            "status": self.state.value,
            "steps": step + 1,
            "result": self.memory.get_recent(1)[0] if self.state == AgentState.COMPLETE else None
        }

    async def _plan_action(self, task: Task) -> Action:
        """Plan next action based on current state"""

        prompt = self._build_planning_prompt(task)
        response = await self.llm.generate(prompt)

        return self._parse_action(response)

    async def _execute_action(self, action: Action) -> str:
        """Execute tool and return observation"""

        if action.tool_name not in self.tools:
            return f"Error: Unknown tool '{action.tool_name}'"

        try:
            result = await self.tools[action.tool_name](**action.parameters)
            return str(result)
        except Exception as e:
            return f"Error executing {action.tool_name}: {str(e)}"

Tool Use and Function Calling

class ToolRegistry:
    """Manage available tools for agent"""

    def __init__(self):
        self.tools = {}

    def register(self, name: str, description: str):
        """Decorator to register tools"""
        def decorator(func):
            self.tools[name] = {
                "function": func,
                "description": description,
                "parameters": self._extract_parameters(func)
            }
            return func
        return decorator

    def get_tool_descriptions(self) -> str:
        """Format tools for LLM prompt"""
        descriptions = []
        for name, tool in self.tools.items():
            params = ", ".join(f"{k}: {v}" for k, v in tool["parameters"].items())
            descriptions.append(f"{name}({params}): {tool['description']}")
        return "\n".join(descriptions)

# Example tools
registry = ToolRegistry()

@registry.register("search_web", "Search the web for information")
async def search_web(query: str) -> str:
    """Search implementation"""
    # Call actual search API
    return f"Search results for: {query}"

@registry.register("read_file", "Read contents of a file")
async def read_file(path: str) -> str:
    """File reading implementation"""
    with open(path, 'r') as f:
        return f.read()

@registry.register("execute_code", "Execute Python code")
async def execute_code(code: str) -> str:
    """Sandboxed code execution"""
    # Use sandbox environment
    result = run_in_sandbox(code)
    return str(result)

Planning and Reasoning

class PlanningAgent(Agent):
    """Agent with explicit planning phase"""

    async def execute_task(self, task: Task) -> Dict:
        # Phase 1: Create plan
        plan = await self._create_plan(task)

        # Phase 2: Execute plan steps
        results = []
        for step in plan["steps"]:
            result = await self._execute_step(step)
            results.append(result)

            # Check if replanning needed
            if result["status"] == "failed":
                plan = await self._replan(task, results)

        return {
            "plan": plan,
            "results": results,
            "status": "complete" if all(r["status"] == "success" for r in results) else "failed"
        }

    async def _create_plan(self, task: Task) -> Dict:
        """Create step-by-step plan"""

        prompt = f"""Create a detailed plan to accomplish this goal: {task.goal}

Break down into specific, executable steps. For each step, specify:
1. What tool to use
2. What parameters to pass
3. What the expected outcome is

Available tools:
{self.registry.get_tool_descriptions()}

Output format:
Step 1: [tool_name](parameters) - expected outcome
Step 2: [tool_name](parameters) - expected outcome
...

Plan:"""

        response = await self.llm.generate(prompt)
        return self._parse_plan(response)

    async def _replan(self, task: Task, completed_steps: List[Dict]) -> Dict:
        """Replan based on what's been accomplished"""

        context = "\n".join([
            f"Step {i+1}: {s['action']} - {'Success' if s['status'] == 'success' else f'Failed: {s.get('error')}'}"
            for i, s in enumerate(completed_steps)
        ])

        prompt = f"""Original goal: {task.goal}

Steps completed so far:
{context}

Some steps failed. Create a new plan that accounts for what's already been done and works around the failures.

New plan:"""

        response = await self.llm.generate(prompt)
        return self._parse_plan(response)

Memory and Context Management

class AgentMemory:
    """Manage agent's working memory and context"""

    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
        self.messages = []
        self.important_context = []

    def add(self, role: str, content: str, important: bool = False):
        """Add message to memory"""
        self.messages.append({"role": role, "content": content})

        if important:
            self.important_context.append(content)

        # Manage context window
        self._manage_context()

    def _manage_context(self):
        """Keep memory within token limit"""
        total_tokens = self._count_tokens(self.messages)

        if total_tokens > self.max_tokens:
            # Summarize old messages
            old_messages = self.messages[:-10]  # Keep last 10
            summary = self._summarize_messages(old_messages)

            # Replace with summary
            self.messages = [
                {"role": "system", "content": f"Previous conversation summary: {summary}"}
            ] + self.messages[-10:]

    def _summarize_messages(self, messages: List[Dict]) -> str:
        """Summarize old messages"""
        # Use LLM to create concise summary
        conversation = "\n".join([f"{m['role']}: {m['content']}" for m in messages])

        summary_prompt = f"""Summarize this conversation history concisely:

{conversation}

Summary:"""

        # This would call LLM
        return "Summary placeholder"

    def get_context(self) -> List[Dict]:
        """Get messages for LLM context"""
        return self.messages

Safety and Guardrails

class SafeAgent(Agent):
    """Agent with safety guardrails"""

    def __init__(self, llm, tools, memory, safety_config):
        super().__init__(llm, tools, memory)
        self.safety_config = safety_config
        self.rejected_actions = []

    async def _execute_action(self, action: Action) -> str:
        """Execute action with safety checks"""

        # Check if action is allowed
        if not self._is_action_safe(action):
            self.rejected_actions.append(action)
            return f"Action rejected by safety policy: {action.tool_name}"

        # Execute with resource limits
        result = await self._execute_with_limits(action)

        # Validate output
        if not self._is_output_safe(result):
            return "Output rejected by safety policy"

        return result

    def _is_action_safe(self, action: Action) -> bool:
        """Check if action violates safety policies"""

        # Check against allowlist
        if self.safety_config.get("allowlist"):
            if action.tool_name not in self.safety_config["allowlist"]:
                return False

        # Check parameter constraints
        for param, value in action.parameters.items():
            if not self._is_parameter_safe(param, value):
                return False

        # Check rate limits
        if not self._check_rate_limit(action.tool_name):
            return False

        return True

    def _is_parameter_safe(self, param: str, value: any) -> bool:
        """Validate parameter values"""

        # Check for path traversal
        if isinstance(value, str) and ".." in value:
            return False

        # Check for SQL injection patterns
        if isinstance(value, str) and any(kw in value.lower() for kw in ["drop", "delete", "truncate"]):
            return False

        # Check size limits
        if isinstance(value, str) and len(value) > 10000:
            return False

        return True

Multi-Agent Collaboration

class AgentTeam:
    """Coordinate multiple specialized agents"""

    def __init__(self, agents: Dict[str, Agent], coordinator_llm):
        self.agents = agents
        self.coordinator = coordinator_llm

    async def solve(self, task: Task) -> Dict:
        """Decompose task and assign to agents"""

        # Break down task
        subtasks = await self._decompose_task(task)

        # Assign to appropriate agents
        assignments = self._assign_subtasks(subtasks)

        # Execute in parallel where possible
        results = await self._execute_assignments(assignments)

        # Synthesize results
        final_result = await self._synthesize_results(task, results)

        return final_result

    async def _decompose_task(self, task: Task) -> List[Dict]:
        """Break task into subtasks"""

        agent_descriptions = "\n".join([
            f"{name}: {agent.description}"
            for name, agent in self.agents.items()
        ])

        prompt = f"""Break down this task into subtasks that can be handled by specialized agents:

Task: {task.goal}

Available agents:
{agent_descriptions}

Subtasks (specify which agent should handle each):"""

        response = await self.coordinator.generate(prompt)
        return self._parse_subtasks(response)

    async def _execute_assignments(self, assignments: List[Dict]) -> List[Dict]:
        """Execute subtasks"""

        import asyncio

        tasks = []
        for assignment in assignments:
            agent = self.agents[assignment["agent"]]
            task = Task(goal=assignment["subtask"])
            tasks.append(agent.execute_task(task))

        results = await asyncio.gather(*tasks)
        return results

Monitoring and Debugging

class MonitoredAgent(Agent):
    """Agent with detailed logging and metrics"""

    def __init__(self, llm, tools, memory, metrics):
        super().__init__(llm, tools, memory)
        self.metrics = metrics
        self.trace = []

    async def execute_task(self, task: Task) -> Dict:
        """Execute with full tracing"""

        start_time = time.time()

        try:
            result = await super().execute_task(task)

            # Record success metrics
            self.metrics.record_task(
                success=True,
                duration=time.time() - start_time,
                steps=result["steps"]
            )

            return result

        except Exception as e:
            # Record failure
            self.metrics.record_task(
                success=False,
                duration=time.time() - start_time,
                error=str(e)
            )
            raise

    async def _execute_action(self, action: Action) -> str:
        """Execute with detailed logging"""

        self.trace.append({
            "timestamp": time.time(),
            "action": action.tool_name,
            "parameters": action.parameters,
            "rationale": action.rationale
        })

        start_time = time.time()
        result = await super()._execute_action(action)
        duration = time.time() - start_time

        self.trace[-1].update({
            "result": result[:200],  # Truncate
            "duration": duration,
            "success": not result.startswith("Error")
        })

        # Record metrics
        self.metrics.record_action(
            tool=action.tool_name,
            duration=duration,
            success=not result.startswith("Error")
        )

        return result

Conclusion

Production AI agents require:

  1. Structured planning - Don’t just react, plan ahead
  2. Safe tool use - Validate all actions and outputs
  3. Memory management - Handle long conversations
  4. Error recovery - Retry and replan on failures
  5. Monitoring - Track every action and outcome
  6. Guardrails - Enforce safety policies

Agents are powerful but complex. Start simple, add capabilities incrementally, and always prioritize safety and observability.