AI AgentsAdvanced
Multi-Agent System
Build a system where multiple specialized agents collaborate to solve complex problems
Multi-Agent System
Orchestrate multiple AI agents that work together on complex tasks
TL;DR
Complex tasks often need multiple perspectives: a researcher gathers info, a writer creates content, a critic reviews it. Multi-agent systems orchestrate specialized agents that collaborate, with a supervisor routing tasks and maintaining shared context.
What You'll Learn
- Multi-agent architectures and patterns
- Agent specialization and role assignment
- Inter-agent communication protocols
- Supervisor and worker patterns with LangGraph
Tech Stack
| Component | Technology |
|---|---|
| Framework | LangGraph |
| LLM | OpenAI GPT-4 |
| Orchestration | Custom supervisor |
| Communication | Message passing |
Multi-Agent Patterns
┌─────────────────────────────────────────────────────────────────────────────┐
│ MULTI-AGENT ARCHITECTURE PATTERNS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ HIERARCHICAL COLLABORATIVE PIPELINE │
│ (Supervisor/Worker) (Peer-to-Peer) (Sequential) │
│ │
│ ┌────────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Supervisor │ │ Agent 1 │◄────────────►│ Agent 1 │ │
│ └─────┬──────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌─────┼─────┐ ┌────┴────┐ ▼ │
│ │ │ │ │ │ ┌─────────┐ │
│ ▼ ▼ ▼ ▼ ▼ │ Agent 2 │ │
│ ┌───┐ ┌───┐ ┌───┐ ┌─────────┐ ┌─────────┐ └────┬────┘ │
│ │W1 │ │W2 │ │W3 │ │ Agent 2 │◄│ Agent 3 │ │ │
│ └───┘ └───┘ └───┘ └────┬────┘ └────┬────┘ ▼ │
│ └─────►─────┘ ┌─────────┐ │
│ │ Agent 3 │ │
│ └─────────┘ │
│ │
│ Best for: Best for: Best for: │
│ • Task delegation • Debates/reviews • Content creation │
│ • Clear hierarchy • Consensus building • Data processing │
│ • Parallel work • Complex decisions • Step-by-step tasks │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Project Structure
multi-agent-system/
├── src/
│ ├── __init__.py
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── researcher.py # Research agent
│ │ ├── writer.py # Writing agent
│ │ ├── critic.py # Review agent
│ │ └── coder.py # Coding agent
│ ├── supervisor.py # Agent orchestration
│ ├── graph.py # LangGraph workflow
│ └── api.py
├── tests/
└── requirements.txtImplementation
Step 1: Setup
openai>=1.0.0
langchain>=0.1.0
langchain-openai>=0.0.5
langgraph>=0.0.40
pydantic>=2.0.0
fastapi>=0.100.0
uvicorn>=0.23.0Step 2: Base Agent Class
"""
Base agent class and shared utilities.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Any
from openai import OpenAI
@dataclass
class AgentMessage:
"""Message passed between agents."""
sender: str
content: str
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class AgentResponse:
"""Response from an agent."""
agent_name: str
content: str
status: str # "complete", "needs_help", "error"
next_agent: Optional[str] = None
class BaseAgent(ABC):
"""Base class for all agents in the system."""
def __init__(
self,
name: str,
model: str = "gpt-4-turbo-preview",
temperature: float = 0.7
):
self.name = name
self.model = model
self.temperature = temperature
self.client = OpenAI()
self.system_prompt = self._get_system_prompt()
@abstractmethod
def _get_system_prompt(self) -> str:
"""Return the agent's system prompt."""
pass
@abstractmethod
def process(self, message: AgentMessage, context: dict) -> AgentResponse:
"""Process a message and return a response."""
pass
def _call_llm(self, messages: list[dict]) -> str:
"""Call the LLM with messages."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
*messages
],
temperature=self.temperature
)
return response.choices[0].message.contentStep 3: Specialized Agents
"""Research agent - gathers information."""
from . import BaseAgent, AgentMessage, AgentResponse
import httpx
class ResearcherAgent(BaseAgent):
"""Agent specialized in research and information gathering."""
def __init__(self):
super().__init__(name="researcher")
def _get_system_prompt(self) -> str:
return """You are a Research Agent specialized in gathering information.
Your responsibilities:
1. Analyze research requests and identify key topics
2. Search for relevant information
3. Summarize findings clearly and accurately
4. Cite sources when possible
When you need more specific research, indicate what's needed.
When research is complete, summarize your findings."""
def process(self, message: AgentMessage, context: dict) -> AgentResponse:
# Extract research query
query = message.content
# Perform search (simplified - use real search API in production)
search_results = self._search(query)
# Have LLM synthesize findings
synthesis_prompt = f"""Research request: {query}
Search results:
{search_results}
Provide a comprehensive research summary. If more research is needed, specify what."""
response = self._call_llm([{"role": "user", "content": synthesis_prompt}])
# Determine if research is complete
needs_more = "need more" in response.lower() or "further research" in response.lower()
return AgentResponse(
agent_name=self.name,
content=response,
status="needs_help" if needs_more else "complete",
next_agent="supervisor" if needs_more else None
)
def _search(self, query: str) -> str:
"""Search for information."""
try:
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{query.replace(' ', '_')}"
response = httpx.get(url, timeout=10.0)
if response.status_code == 200:
data = response.json()
return f"Wikipedia: {data.get('extract', 'No results')}"
return "No search results found."
except Exception as e:
return f"Search error: {e}""""Writer agent - creates content."""
from . import BaseAgent, AgentMessage, AgentResponse
class WriterAgent(BaseAgent):
"""Agent specialized in writing and content creation."""
def __init__(self):
super().__init__(name="writer")
def _get_system_prompt(self) -> str:
return """You are a Writer Agent specialized in creating content.
Your responsibilities:
1. Transform research and information into well-written content
2. Adapt tone and style to the requested format
3. Structure content logically with clear sections
4. Request clarification or more research if needed
Output formats you support:
- Blog posts
- Technical documentation
- Reports
- Summaries"""
def process(self, message: AgentMessage, context: dict) -> AgentResponse:
# Get research context if available
research = context.get("research", "")
format_type = context.get("format", "article")
writing_prompt = f"""Task: {message.content}
Research/Context provided:
{research}
Write a {format_type} based on this information.
If you need more information or research, specify what's missing."""
response = self._call_llm([{"role": "user", "content": writing_prompt}])
needs_more = "need more" in response.lower() or "missing" in response.lower()
return AgentResponse(
agent_name=self.name,
content=response,
status="needs_help" if needs_more else "complete",
next_agent="researcher" if needs_more else "critic"
)"""Critic agent - reviews and improves content."""
from . import BaseAgent, AgentMessage, AgentResponse
class CriticAgent(BaseAgent):
"""Agent specialized in reviewing and critiquing work."""
def __init__(self):
super().__init__(name="critic", temperature=0.3)
def _get_system_prompt(self) -> str:
return """You are a Critic Agent specialized in reviewing content.
Your responsibilities:
1. Review content for accuracy, clarity, and completeness
2. Identify factual errors or inconsistencies
3. Suggest specific improvements
4. Approve content when it meets quality standards
Provide structured feedback with:
- Overall assessment (approve/needs_revision)
- Specific issues found
- Suggested improvements"""
def process(self, message: AgentMessage, context: dict) -> AgentResponse:
content_to_review = context.get("draft", message.content)
original_request = context.get("original_request", "")
review_prompt = f"""Review this content:
Original request: {original_request}
Content to review:
{content_to_review}
Provide your assessment. If approved, say "APPROVED".
If revisions needed, specify exactly what to fix."""
response = self._call_llm([{"role": "user", "content": review_prompt}])
is_approved = "APPROVED" in response.upper()
return AgentResponse(
agent_name=self.name,
content=response,
status="complete" if is_approved else "needs_help",
next_agent=None if is_approved else "writer"
)Understanding Specialized Agent Design:
┌─────────────────────────────────────────────────────────────────────────────┐
│ WHY SPECIALIZED AGENTS OUTPERFORM A SINGLE "DO EVERYTHING" AGENT │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Single Agent: Specialized Agents: │
│ ┌─────────────────────────────┐ ┌─────────────────────────────────┐ │
│ │ "You are a helpful AI that │ │ Researcher: Only gathers info │ │
│ │ researches, writes, AND │ │ Writer: Only creates content │ │
│ │ reviews content" │ │ Critic: Only reviews quality │ │
│ │ │ │ │ │
│ │ Problems: │ │ Benefits: │ │
│ │ • Conflicting objectives │ │ • Clear, focused prompts │ │
│ │ • Long, confusing prompts │ │ • Easier to debug │ │
│ │ • Self-reviewing bias │ │ • Independent perspectives │ │
│ └─────────────────────────────┘ └─────────────────────────────────┘ │
│ │
│ Agent Communication Flow: │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ User: "Write about meditation" │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Researcher ─────► "Here are facts about meditation..." │ │
│ │ │ (stored in context["research"]) │ │
│ │ ▼ │ │
│ │ Writer ─────────► "## Benefits of Meditation\n..." │ │
│ │ │ (stored in context["draft"]) │ │
│ │ ▼ │ │
│ │ Critic ─────────► "APPROVED: Well-structured..." │ │
│ │ │ OR "Needs more citations" → back to Writer │ │
│ │ ▼ │ │
│ │ Final Output │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Key Design Patterns:
| Pattern | Implementation | Purpose |
|---|---|---|
status: "needs_help" | Agent signals it can't proceed | Enables graceful handoffs |
next_agent: "researcher" | Agent suggests who should help | Decentralized routing |
| Temperature variation | Critic uses 0.3, Writer uses 0.7 | Different creativity needs |
| Keyword detection | "need more", "missing" triggers re-route | Simple but effective signals |
Step 4: Supervisor Agent
"""
Supervisor that orchestrates multiple agents.
"""
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
from .agents import AgentMessage, AgentResponse
from .agents.researcher import ResearcherAgent
from .agents.writer import WriterAgent
from .agents.critic import CriticAgent
@dataclass
class WorkflowState:
"""Current state of the multi-agent workflow."""
task: str
current_agent: str
messages: list[AgentMessage] = field(default_factory=list)
context: dict = field(default_factory=dict)
iterations: int = 0
max_iterations: int = 10
status: str = "running"
class Supervisor:
"""
Orchestrates multiple agents to complete complex tasks.
The supervisor:
1. Analyzes the task and selects initial agent
2. Routes messages between agents
3. Maintains shared context
4. Determines when task is complete
"""
def __init__(self, model: str = "gpt-4-turbo-preview"):
self.client = OpenAI()
self.model = model
# Initialize agents
self.agents = {
"researcher": ResearcherAgent(),
"writer": WriterAgent(),
"critic": CriticAgent()
}
self.routing_prompt = """You are a supervisor coordinating a team of AI agents.
Available agents:
- researcher: Gathers information and does research
- writer: Creates written content
- critic: Reviews and improves content
Given a task, decide which agent should handle it first.
Respond with just the agent name: researcher, writer, or critic"""
def run(self, task: str) -> dict:
"""
Run the multi-agent workflow on a task.
Args:
task: The task to complete
Returns:
Final result and workflow history
"""
state = WorkflowState(
task=task,
current_agent=self._select_initial_agent(task),
context={"original_request": task}
)
while state.status == "running" and state.iterations < state.max_iterations:
state.iterations += 1
# Get current agent
agent = self.agents.get(state.current_agent)
if not agent:
state.status = "error"
break
# Create message for agent
message = AgentMessage(
sender="supervisor",
content=state.task if state.iterations == 1 else self._get_latest_content(state)
)
# Process with agent
response = agent.process(message, state.context)
# Record the exchange
state.messages.append(message)
state.messages.append(AgentMessage(
sender=response.agent_name,
content=response.content
))
# Update context based on agent
self._update_context(state, response)
# Check if complete
if response.status == "complete" and response.next_agent is None:
state.status = "complete"
elif response.status == "complete" and response.next_agent == "critic":
# Writer done, send to critic
state.current_agent = "critic"
elif response.next_agent:
state.current_agent = response.next_agent
else:
state.status = "complete"
return {
"task": task,
"status": state.status,
"result": self._get_final_result(state),
"iterations": state.iterations,
"agents_used": list(set(m.sender for m in state.messages if m.sender != "supervisor"))
}
def _select_initial_agent(self, task: str) -> str:
"""Select the first agent to handle the task."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.routing_prompt},
{"role": "user", "content": f"Task: {task}"}
],
temperature=0
)
agent_name = response.choices[0].message.content.strip().lower()
return agent_name if agent_name in self.agents else "researcher"
def _update_context(self, state: WorkflowState, response: AgentResponse) -> None:
"""Update shared context based on agent response."""
if response.agent_name == "researcher":
state.context["research"] = response.content
elif response.agent_name == "writer":
state.context["draft"] = response.content
elif response.agent_name == "critic":
state.context["review"] = response.content
def _get_latest_content(self, state: WorkflowState) -> str:
"""Get the most recent content for the next agent."""
if state.messages:
return state.messages[-1].content
return state.task
def _get_final_result(self, state: WorkflowState) -> str:
"""Extract final result from state."""
# Prefer approved draft, otherwise latest content
if "draft" in state.context and "APPROVED" in state.context.get("review", "").upper():
return state.context["draft"]
return self._get_latest_content(state)Understanding the Supervisor Pattern:
┌─────────────────────────────────────────────────────────────────────────────┐
│ SUPERVISOR RESPONSIBILITIES AND STATE MANAGEMENT │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ The Supervisor's Role: │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. ROUTING: "Who handles this first?" │ │
│ │ Task: "Write a blog" │ │
│ │ └──► LLM decides: "researcher" (needs facts first) │ │
│ │ │ │
│ │ 2. CONTEXT: "What does each agent know?" │ │
│ │ context = { │ │
│ │ "original_request": "Write blog about meditation", │ │
│ │ "research": "Studies show...", ◄── Updated by Researcher │ │
│ │ "draft": "# Meditation...", ◄── Updated by Writer │ │
│ │ "review": "APPROVED" ◄── Updated by Critic │ │
│ │ } │ │
│ │ │ │
│ │ 3. FLOW CONTROL: "When to stop?" │ │
│ │ • Agent says "complete" + no next_agent → Done │ │
│ │ • Iterations >= max_iterations → Force stop │ │
│ │ • Error state → Stop with error │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Why max_iterations = 10? │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Without limit: Critic rejects → Writer rewrites → Critic rejects │ │
│ │ → Writer rewrites → Critic rejects → ... forever │ │
│ │ │ │
│ │ With limit: After 10 iterations, force completion with best draft │ │
│ │ Better to return "good enough" than loop infinitely │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Supervisor vs LangGraph:
| Approach | Best For | Trade-off |
|---|---|---|
| Custom Supervisor | Full control, simple flows | More code to maintain |
| LangGraph | Complex graphs, checkpointing | Learning curve |
Step 5: LangGraph Workflow
"""
LangGraph-based multi-agent workflow.
"""
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
import operator
class AgentState(TypedDict):
"""State shared across all agents."""
messages: Annotated[Sequence[BaseMessage], operator.add]
task: str
research: str
draft: str
review: str
next_agent: str
iterations: int
def create_agent_graph():
"""Create a LangGraph workflow for multi-agent collaboration."""
llm = ChatOpenAI(model="gpt-4-turbo-preview")
def researcher_node(state: AgentState) -> dict:
"""Research agent node."""
prompt = f"""You are a researcher. Research this topic and provide findings:
Task: {state['task']}
Provide comprehensive research findings."""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"messages": [AIMessage(content=f"[Researcher]: {response.content}")],
"research": response.content,
"next_agent": "writer",
"iterations": state["iterations"] + 1
}
def writer_node(state: AgentState) -> dict:
"""Writer agent node."""
prompt = f"""You are a writer. Create content based on this research:
Task: {state['task']}
Research:
{state['research']}
Write a well-structured article."""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"messages": [AIMessage(content=f"[Writer]: {response.content}")],
"draft": response.content,
"next_agent": "critic",
"iterations": state["iterations"] + 1
}
def critic_node(state: AgentState) -> dict:
"""Critic agent node."""
prompt = f"""You are a critic. Review this draft:
Original task: {state['task']}
Draft:
{state['draft']}
If the draft is good, respond with "APPROVED: [brief praise]"
If it needs work, provide specific feedback."""
response = llm.invoke([HumanMessage(content=prompt)])
is_approved = "APPROVED" in response.content.upper()
return {
"messages": [AIMessage(content=f"[Critic]: {response.content}")],
"review": response.content,
"next_agent": "end" if is_approved else "writer",
"iterations": state["iterations"] + 1
}
def should_continue(state: AgentState) -> str:
"""Determine next node or end."""
if state["iterations"] >= 6:
return "end"
return state["next_agent"]
# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("critic", critic_node)
workflow.set_entry_point("researcher")
workflow.add_conditional_edges(
"researcher",
should_continue,
{"writer": "writer", "end": END}
)
workflow.add_conditional_edges(
"writer",
should_continue,
{"critic": "critic", "end": END}
)
workflow.add_conditional_edges(
"critic",
should_continue,
{"writer": "writer", "end": END}
)
return workflow.compile()Step 6: API
"""FastAPI application for multi-agent system."""
from fastapi import FastAPI
from pydantic import BaseModel
from .supervisor import Supervisor
from .graph import create_agent_graph
app = FastAPI(title="Multi-Agent System API")
supervisor = Supervisor()
agent_graph = create_agent_graph()
class TaskRequest(BaseModel):
task: str
use_langgraph: bool = False
class TaskResponse(BaseModel):
task: str
status: str
result: str
iterations: int
agents_used: list[str]
@app.post("/execute", response_model=TaskResponse)
async def execute_task(request: TaskRequest):
if request.use_langgraph:
# Use LangGraph workflow
result = agent_graph.invoke({
"messages": [],
"task": request.task,
"research": "",
"draft": "",
"review": "",
"next_agent": "researcher",
"iterations": 0
})
return TaskResponse(
task=request.task,
status="complete",
result=result["draft"],
iterations=result["iterations"],
agents_used=["researcher", "writer", "critic"]
)
else:
# Use custom supervisor
result = supervisor.run(request.task)
return TaskResponse(
task=request.task,
status=result["status"],
result=result["result"],
iterations=result["iterations"],
agents_used=result["agents_used"]
)Running the System
export OPENAI_API_KEY="your-key"
uvicorn src.api:app --reload --port 8000# Execute a multi-agent task
curl -X POST http://localhost:8000/execute \
-H "Content-Type: application/json" \
-d '{"task": "Write a blog post about the benefits of meditation"}'Multi-Agent Patterns
| Pattern | Use Case | Complexity |
|---|---|---|
| Supervisor | Central coordination | Medium |
| Pipeline | Sequential processing | Low |
| Collaborative | Peer discussion | High |
| Hierarchical | Complex organizations | High |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Supervisor | Agent that routes tasks to workers | Central coordination point |
| Specialized Agents | Agents with focused roles (researcher, writer) | Better quality through expertise |
| Shared Context | State accessible to all agents | Enables collaboration |
| Message Passing | How agents communicate | Clean interface between agents |
| Workflow State | Tracks progress through agents | Enables debugging and monitoring |
| Agent Selection | Choosing which agent handles what | Critical for efficiency |
| Iteration Limit | Max loops through agents | Prevents infinite cycles |
Next Steps
- Autonomous Agent - Self-directed agents
- Agent Evaluation - Test agent systems