MCP Tool Integration
Build AI agents with Model Context Protocol for standardized tool integration and multi-system connectivity
MCP Tool Integration
| Property | Value |
|---|---|
| Difficulty | Intermediate |
| Time | ~6 hours |
| Code Size | ~550 LOC |
| Prerequisites | Tool Calling Agent |
TL;DR
Model Context Protocol (MCP) is an open standard that lets LLMs connect to tools, data sources, and systems through a universal interface. Build MCP servers to expose tools, resources, and prompts. Build MCP clients to discover and use them. No more custom integrations per tool.
Tech Stack
| Technology | Purpose |
|---|---|
| MCP Python SDK | Model Context Protocol |
| LangGraph | Agent orchestration |
| LangChain | Tool integration |
| FastAPI | Custom MCP server |
| OpenAI | LLM provider |
Prerequisites
- Python 3.10+
- OpenAI API key
pip install mcp langchain langchain-openai langgraph fastapi uvicornWhat You'll Learn
- Understand the Model Context Protocol (MCP) standard
- Build MCP servers that expose tools and resources
- Create MCP clients that discover and use tools
- Integrate MCP with LangGraph agents
What is MCP?
Model Context Protocol is an open standard introduced by Anthropic in late 2024 that standardizes how LLMs connect to external tools, data sources, and systems.
| Before MCP | With MCP |
|---|---|
| Custom integrations per tool | Standardized protocol |
| Fragmented implementations | Universal tool discovery |
| Tight coupling | Loose coupling |
| Provider-specific | Provider-agnostic |
┌─────────────────────────────────────────────────────────────────────────────┐
│ BEFORE MCP vs WITH MCP │
├───────────────────────────────────┬─────────────────────────────────────────┤
│ BEFORE MCP │ WITH MCP │
│ │ │
│ ┌─────────┐ │ ┌─────────┐ │
│ │ Agent │ │ │ Agent │ │
│ └────┬────┘ │ └────┬────┘ │
│ │ │ │ │
│ ├──────► Tool 1 (custom) │ │ MCP Protocol │
│ │ │ ▼ │
│ ├──────► Tool 2 (custom) │ ┌──────────┐ │
│ │ │ │ Registry │ │
│ └──────► Tool 3 (custom) │ └────┬─────┘ │
│ │ │ │
│ Every tool = custom integration │ ├──► MCP Server 1 (DB tools) │
│ Different APIs, different code │ ├──► MCP Server 2 (File tools) │
│ │ └──► MCP Server 3 (API tools) │
│ │ │
│ │ One protocol, dynamic discovery │
└───────────────────────────────────┴─────────────────────────────────────────┘MCP Core Concepts
| Concept | Description |
|---|---|
| Server | Exposes tools, resources, and prompts |
| Client | Discovers and invokes server capabilities |
| Tool | Executable function with parameters |
| Resource | Data that can be read (files, configs) |
| Prompt | Reusable prompt templates |
| Transport | Communication layer (stdio, HTTP, SSE) |
Project Structure
mcp-tool-integration/
├── config.py # Configuration
├── mcp_server.py # Custom MCP server
├── mcp_client.py # MCP client wrapper
├── mcp_agent.py # LangGraph agent with MCP
├── tools/ # Tool implementations
│ ├── __init__.py
│ ├── database.py # Database tool
│ ├── api.py # API tool
│ └── file.py # File tool
├── app.py # FastAPI application
└── requirements.txtStep 1: Configuration
# config.py
from pydantic_settings import BaseSettings
from pydantic import Field
from functools import lru_cache
from typing import List, Optional
from enum import Enum
class TransportType(str, Enum):
STDIO = "stdio"
HTTP = "streamable-http"
SSE = "sse"
class Settings(BaseSettings):
# API Keys
openai_api_key: str
# MCP Server
mcp_server_name: str = "AI Tools Server"
mcp_server_port: int = 8080
mcp_transport: TransportType = TransportType.HTTP
# LLM
llm_model: str = "gpt-4o"
temperature: float = 0.1
# Database (for demo tools)
database_url: str = "sqlite:///demo.db"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()Step 2: Build an MCP Server
Create an MCP server that exposes tools, resources, and prompts:
# mcp_server.py
from mcp.server.fastmcp import FastMCP
from typing import Dict, Any, List, Optional
import json
import sqlite3
from datetime import datetime
from config import get_settings
settings = get_settings()
# Initialize MCP server
mcp = FastMCP(
settings.mcp_server_name,
json_response=True
)
# ============================================================
# TOOLS - Executable functions the LLM can call
# ============================================================
@mcp.tool()
def search_database(
query: str,
table: str = "documents",
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Search the database for records matching a query.
Args:
query: Search term to look for
table: Table name to search in
limit: Maximum number of results
Returns:
List of matching records
"""
conn = sqlite3.connect(settings.database_url.replace("sqlite:///", ""))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Simple LIKE search (production would use FTS)
cursor.execute(
f"SELECT * FROM {table} WHERE content LIKE ? LIMIT ?",
(f"%{query}%", limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
except sqlite3.Error as e:
return [{"error": str(e)}]
finally:
conn.close()
@mcp.tool()
def execute_sql(
sql: str,
params: Optional[List[Any]] = None
) -> Dict[str, Any]:
"""
Execute a read-only SQL query.
Args:
sql: SQL SELECT query to execute
params: Optional query parameters
Returns:
Query results or error message
"""
# Security: Only allow SELECT queries
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT"):
return {"error": "Only SELECT queries are allowed"}
conn = sqlite3.connect(settings.database_url.replace("sqlite:///", ""))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute(sql, params or [])
rows = cursor.fetchall()
return {
"success": True,
"row_count": len(rows),
"data": [dict(row) for row in rows]
}
except sqlite3.Error as e:
return {"error": str(e)}
finally:
conn.close()
@mcp.tool()
def fetch_api(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
body: Optional[str] = None
) -> Dict[str, Any]:
"""
Make an HTTP API request.
Args:
url: URL to fetch
method: HTTP method (GET, POST)
headers: Optional headers
body: Optional request body for POST
Returns:
API response or error
"""
import urllib.request
import urllib.error
# Security: Block internal URLs
if any(internal in url.lower() for internal in ["localhost", "127.0.0.1", "internal"]):
return {"error": "Internal URLs are not allowed"}
try:
req = urllib.request.Request(
url,
data=body.encode() if body else None,
headers=headers or {},
method=method
)
with urllib.request.urlopen(req, timeout=10) as response:
content = response.read().decode()
try:
data = json.loads(content)
except json.JSONDecodeError:
data = content
return {
"status": response.status,
"data": data
}
except urllib.error.URLError as e:
return {"error": str(e)}
@mcp.tool()
def read_file(
path: str,
encoding: str = "utf-8"
) -> Dict[str, Any]:
"""
Read contents of a file.
Args:
path: File path to read
encoding: File encoding
Returns:
File contents or error
"""
import os
# Security: Only allow files in specific directory
allowed_dir = os.path.abspath("./data")
abs_path = os.path.abspath(path)
if not abs_path.startswith(allowed_dir):
return {"error": "Access denied: path outside allowed directory"}
try:
with open(abs_path, encoding=encoding) as f:
content = f.read()
return {
"path": path,
"size": len(content),
"content": content[:10000] # Limit content size
}
except FileNotFoundError:
return {"error": f"File not found: {path}"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def get_current_time(
timezone: str = "UTC"
) -> Dict[str, str]:
"""
Get the current date and time.
Args:
timezone: Timezone name (e.g., 'UTC', 'US/Eastern')
Returns:
Current datetime information
"""
from datetime import datetime
import pytz
try:
tz = pytz.timezone(timezone)
now = datetime.now(tz)
return {
"timezone": timezone,
"datetime": now.isoformat(),
"date": now.strftime("%Y-%m-%d"),
"time": now.strftime("%H:%M:%S"),
"day_of_week": now.strftime("%A")
}
except Exception as e:
return {"error": str(e), "timezone": timezone}
@mcp.tool()
def calculate(
expression: str
) -> Dict[str, Any]:
"""
Evaluate a mathematical expression safely.
Args:
expression: Math expression to evaluate (e.g., "2 + 2 * 3")
Returns:
Calculation result
"""
import ast
import operator
# Safe operators
ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
}
def eval_expr(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
return ops[type(node.op)](eval_expr(node.left), eval_expr(node.right))
elif isinstance(node, ast.UnaryOp):
return ops[type(node.op)](eval_expr(node.operand))
else:
raise ValueError(f"Unsupported operation: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
result = eval_expr(tree.body)
return {"expression": expression, "result": result}
except Exception as e:
return {"error": str(e), "expression": expression}
# ============================================================
# RESOURCES - Data that can be read by the LLM
# ============================================================
@mcp.resource("config://settings")
def get_app_settings() -> str:
"""Expose application settings as a resource."""
return json.dumps({
"server_name": settings.mcp_server_name,
"available_tools": ["search_database", "execute_sql", "fetch_api", "read_file", "get_current_time", "calculate"],
"version": "1.0.0"
}, indent=2)
@mcp.resource("schema://database")
def get_database_schema() -> str:
"""Expose database schema information."""
conn = sqlite3.connect(settings.database_url.replace("sqlite:///", ""))
cursor = conn.cursor()
try:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
schema = {}
for (table_name,) in tables:
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
schema[table_name] = [
{"name": col[1], "type": col[2]}
for col in columns
]
return json.dumps(schema, indent=2)
finally:
conn.close()
@mcp.resource("docs://api-reference")
def get_api_docs() -> str:
"""Provide API documentation."""
return """
# API Reference
## Available Tools
### search_database
Search for records in the database using keyword matching.
- query: Search term
- table: Table name (default: documents)
- limit: Max results (default: 10)
### execute_sql
Run read-only SQL queries against the database.
- sql: SELECT query
- params: Query parameters
### fetch_api
Make HTTP requests to external APIs.
- url: Target URL
- method: GET or POST
- headers: Optional headers
- body: Optional request body
### read_file
Read file contents from the data directory.
- path: Relative file path
- encoding: File encoding
### get_current_time
Get current date and time.
- timezone: Timezone name
### calculate
Evaluate mathematical expressions.
- expression: Math expression
"""
# ============================================================
# PROMPTS - Reusable prompt templates
# ============================================================
@mcp.prompt()
def analyze_data(data_description: str) -> str:
"""Generate a prompt for data analysis."""
return f"""Analyze the following data and provide insights:
Data Description: {data_description}
Please:
1. Identify key patterns and trends
2. Highlight any anomalies
3. Provide actionable recommendations
4. Summarize findings in a clear format
"""
@mcp.prompt()
def query_builder(natural_language: str, table_name: str = "documents") -> str:
"""Generate a prompt for SQL query building."""
return f"""Convert this natural language request into a SQL query:
Request: {natural_language}
Target Table: {table_name}
Requirements:
- Use only SELECT statements
- Be efficient with column selection
- Add appropriate WHERE clauses
- Include ORDER BY if relevant
- Limit results appropriately
"""
# ============================================================
# Server entry point
# ============================================================
def run_server():
"""Run the MCP server."""
mcp.run(transport=settings.mcp_transport.value)
if __name__ == "__main__":
run_server()★ Insight ─────────────────────────────────────
MCP's decorator-based API (@mcp.tool(), @mcp.resource(), @mcp.prompt()) makes exposing capabilities simple. The key insight: tools are for actions, resources are for data, and prompts are for templates. This separation enables LLMs to understand what each capability is for.
─────────────────────────────────────────────────
Step 3: Build an MCP Client
Create a client that discovers and uses MCP servers:
# mcp_client.py
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from mcp import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.streamablehttp import streamablehttp_client
@dataclass
class MCPTool:
"""Represents a discovered MCP tool."""
name: str
description: str
parameters: Dict[str, Any]
def to_openai_format(self) -> Dict[str, Any]:
"""Convert to OpenAI function calling format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
}
@dataclass
class MCPResource:
"""Represents a discovered MCP resource."""
uri: str
name: str
description: str
mime_type: Optional[str] = None
class MCPClient:
"""
Client for connecting to MCP servers.
Supports:
- Tool discovery and execution
- Resource reading
- Prompt retrieval
- Multiple transport types
"""
def __init__(self):
self._session: Optional[ClientSession] = None
self._tools: List[MCPTool] = []
self._resources: List[MCPResource] = []
async def connect_stdio(
self,
command: str,
args: List[str]
) -> None:
"""
Connect to an MCP server via stdio transport.
Args:
command: Command to run (e.g., "python")
args: Command arguments (e.g., ["mcp_server.py"])
"""
server_params = StdioServerParameters(
command=command,
args=args
)
read_stream, write_stream = await stdio_client(server_params).__aenter__()
self._session = ClientSession(read_stream, write_stream)
await self._session.__aenter__()
await self._session.initialize()
await self._discover_capabilities()
async def connect_http(self, url: str) -> None:
"""
Connect to an MCP server via HTTP transport.
Args:
url: Server URL (e.g., "http://localhost:8080/mcp")
"""
read_stream, write_stream, _ = await streamablehttp_client(url).__aenter__()
self._session = ClientSession(read_stream, write_stream)
await self._session.__aenter__()
await self._session.initialize()
await self._discover_capabilities()
async def _discover_capabilities(self) -> None:
"""Discover available tools and resources."""
if not self._session:
return
# Discover tools
tools_response = await self._session.list_tools()
self._tools = [
MCPTool(
name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema or {}
)
for tool in tools_response.tools
]
# Discover resources
resources_response = await self._session.list_resources()
self._resources = [
MCPResource(
uri=resource.uri,
name=resource.name,
description=resource.description or "",
mime_type=resource.mimeType
)
for resource in resources_response.resources
]
@property
def tools(self) -> List[MCPTool]:
"""Get discovered tools."""
return self._tools
@property
def resources(self) -> List[MCPResource]:
"""Get discovered resources."""
return self._resources
def get_tools_for_openai(self) -> List[Dict[str, Any]]:
"""Get tools in OpenAI function calling format."""
return [tool.to_openai_format() for tool in self._tools]
async def call_tool(
self,
name: str,
arguments: Dict[str, Any]
) -> Any:
"""
Call an MCP tool.
Args:
name: Tool name
arguments: Tool arguments
Returns:
Tool result
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
result = await self._session.call_tool(name, arguments)
return result.content
async def read_resource(self, uri: str) -> str:
"""
Read an MCP resource.
Args:
uri: Resource URI
Returns:
Resource content
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
result = await self._session.read_resource(uri)
return result.contents[0].text if result.contents else ""
async def get_prompt(
self,
name: str,
arguments: Optional[Dict[str, str]] = None
) -> str:
"""
Get a prompt template.
Args:
name: Prompt name
arguments: Prompt arguments
Returns:
Rendered prompt
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
result = await self._session.get_prompt(name, arguments or {})
return result.messages[0].content.text if result.messages else ""
async def close(self) -> None:
"""Close the connection."""
if self._session:
await self._session.__aexit__(None, None, None)
self._session = None
class MCPToolRegistry:
"""
Registry for multiple MCP servers.
Aggregates tools from multiple servers and routes calls appropriately.
"""
def __init__(self):
self._clients: Dict[str, MCPClient] = {}
self._tool_to_server: Dict[str, str] = {}
async def add_server(
self,
name: str,
url: Optional[str] = None,
command: Optional[str] = None,
args: Optional[List[str]] = None
) -> None:
"""
Add an MCP server to the registry.
Args:
name: Server identifier
url: HTTP URL (for HTTP transport)
command: Command (for stdio transport)
args: Command args (for stdio transport)
"""
client = MCPClient()
if url:
await client.connect_http(url)
elif command:
await client.connect_stdio(command, args or [])
else:
raise ValueError("Must provide url or command")
self._clients[name] = client
# Map tools to server
for tool in client.tools:
self._tool_to_server[tool.name] = name
def get_all_tools(self) -> List[MCPTool]:
"""Get all tools from all servers."""
tools = []
for client in self._clients.values():
tools.extend(client.tools)
return tools
def get_tools_for_openai(self) -> List[Dict[str, Any]]:
"""Get all tools in OpenAI format."""
return [tool.to_openai_format() for tool in self.get_all_tools()]
async def call_tool(
self,
name: str,
arguments: Dict[str, Any]
) -> Any:
"""Call a tool, routing to the correct server."""
server_name = self._tool_to_server.get(name)
if not server_name:
raise ValueError(f"Unknown tool: {name}")
client = self._clients[server_name]
return await client.call_tool(name, arguments)
async def close_all(self) -> None:
"""Close all server connections."""
for client in self._clients.values():
await client.close()Step 4: LangGraph Agent with MCP
Build an agent that uses MCP tools:
# mcp_agent.py
import asyncio
from typing import TypedDict, Annotated, List, Dict, Any, Optional
import json
import operator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, SystemMessage
from langgraph.graph import StateGraph, END
from mcp_client import MCPClient, MCPToolRegistry
from config import get_settings
class AgentState(TypedDict):
"""State for the MCP agent."""
messages: Annotated[List[Any], operator.add]
tool_calls: List[Dict[str, Any]]
tool_results: List[Dict[str, Any]]
final_response: Optional[str]
iteration: int
class MCPAgent:
"""
LangGraph agent that uses MCP tools.
Flow:
1. Receive user message
2. Decide which tools to use
3. Execute tools via MCP
4. Process results
5. Generate response or continue
"""
def __init__(self, max_iterations: int = 10):
self.settings = get_settings()
self.max_iterations = max_iterations
self.registry = MCPToolRegistry()
self.llm = ChatOpenAI(
model=self.settings.llm_model,
api_key=self.settings.openai_api_key,
temperature=self.settings.temperature
)
self._workflow = None
async def initialize(self, servers: List[Dict[str, Any]]) -> None:
"""
Initialize the agent with MCP servers.
Args:
servers: List of server configs
[{"name": "tools", "url": "http://localhost:8080/mcp"}]
"""
for server in servers:
await self.registry.add_server(
name=server["name"],
url=server.get("url"),
command=server.get("command"),
args=server.get("args")
)
# Build workflow after tools are discovered
self._workflow = self._build_workflow()
def _build_workflow(self) -> StateGraph:
"""Build the LangGraph workflow."""
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("think", self._think_node)
workflow.add_node("execute_tools", self._execute_tools_node)
workflow.add_node("process_results", self._process_results_node)
workflow.add_node("respond", self._respond_node)
# Set entry point
workflow.set_entry_point("think")
# Add edges
workflow.add_conditional_edges(
"think",
self._should_use_tools,
{
"use_tools": "execute_tools",
"respond": "respond"
}
)
workflow.add_edge("execute_tools", "process_results")
workflow.add_conditional_edges(
"process_results",
self._should_continue,
{
"continue": "think",
"respond": "respond"
}
)
workflow.add_edge("respond", END)
return workflow.compile()
async def _think_node(self, state: AgentState) -> Dict[str, Any]:
"""Decide what to do next."""
# Get tools in OpenAI format
tools = self.registry.get_tools_for_openai()
# Build messages
messages = state["messages"]
# Add system message with tool context
system_msg = SystemMessage(content="""You are a helpful AI assistant with access to various tools.
Use the available tools to help answer user questions accurately.
When you need information, use the appropriate tool.
After getting tool results, synthesize them into a helpful response.
Available capabilities:
- Database queries and searches
- API requests
- File reading
- Calculations
- Time/date information
""")
all_messages = [system_msg] + messages
# Call LLM with tools
response = await asyncio.to_thread(
self.llm.bind(tools=tools).invoke,
all_messages
)
# Extract tool calls if any
tool_calls = []
if response.tool_calls:
tool_calls = [
{
"id": tc["id"],
"name": tc["name"],
"arguments": tc["args"]
}
for tc in response.tool_calls
]
return {
"messages": [response],
"tool_calls": tool_calls,
"iteration": state["iteration"] + 1
}
def _should_use_tools(self, state: AgentState) -> str:
"""Determine if we should use tools."""
if state["tool_calls"]:
return "use_tools"
return "respond"
async def _execute_tools_node(self, state: AgentState) -> Dict[str, Any]:
"""Execute MCP tools."""
results = []
for tool_call in state["tool_calls"]:
try:
result = await self.registry.call_tool(
tool_call["name"],
tool_call["arguments"]
)
# Handle MCP result format
if hasattr(result, '__iter__') and not isinstance(result, (str, dict)):
result_text = json.dumps([
item.text if hasattr(item, 'text') else str(item)
for item in result
])
else:
result_text = json.dumps(result) if isinstance(result, dict) else str(result)
results.append({
"tool_call_id": tool_call["id"],
"name": tool_call["name"],
"result": result_text,
"success": True
})
except Exception as e:
results.append({
"tool_call_id": tool_call["id"],
"name": tool_call["name"],
"result": f"Error: {str(e)}",
"success": False
})
return {"tool_results": results}
async def _process_results_node(self, state: AgentState) -> Dict[str, Any]:
"""Process tool results and add to messages."""
tool_messages = []
for result in state["tool_results"]:
tool_messages.append(
ToolMessage(
content=result["result"],
tool_call_id=result["tool_call_id"]
)
)
return {
"messages": tool_messages,
"tool_calls": [], # Clear for next iteration
"tool_results": []
}
def _should_continue(self, state: AgentState) -> str:
"""Determine if we should continue reasoning."""
if state["iteration"] >= self.max_iterations:
return "respond"
return "continue"
async def _respond_node(self, state: AgentState) -> Dict[str, Any]:
"""Generate final response."""
messages = state["messages"]
# Get last AI message
for msg in reversed(messages):
if isinstance(msg, AIMessage) and msg.content:
return {"final_response": msg.content}
# Fallback: generate response
response = await asyncio.to_thread(
self.llm.invoke,
messages
)
return {"final_response": response.content}
async def run(self, user_message: str) -> str:
"""
Run the agent with a user message.
Args:
user_message: User's input
Returns:
Agent's response
"""
if not self._workflow:
raise RuntimeError("Agent not initialized. Call initialize() first.")
initial_state: AgentState = {
"messages": [HumanMessage(content=user_message)],
"tool_calls": [],
"tool_results": [],
"final_response": None,
"iteration": 0
}
final_state = await self._workflow.ainvoke(initial_state)
return final_state["final_response"] or "I couldn't generate a response."
async def close(self) -> None:
"""Clean up resources."""
await self.registry.close_all()
# Convenience function for quick usage
async def create_mcp_agent(
servers: List[Dict[str, Any]],
max_iterations: int = 10
) -> MCPAgent:
"""
Create and initialize an MCP agent.
Args:
servers: List of MCP server configurations
max_iterations: Maximum reasoning iterations
Returns:
Initialized MCPAgent
"""
agent = MCPAgent(max_iterations=max_iterations)
await agent.initialize(servers)
return agent★ Insight ─────────────────────────────────────
The MCP agent uses LangGraph's conditional edges to create a flexible reasoning loop. The key pattern: think → execute_tools → process_results → think continues until the LLM decides no more tools are needed. This mirrors how humans work - iteratively gathering information until the answer is clear.
─────────────────────────────────────────────────
Step 5: FastAPI Application
# app.py
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from contextlib import asynccontextmanager
import json
from mcp_agent import MCPAgent, create_mcp_agent
from mcp_server import mcp
from config import get_settings
settings = get_settings()
# Global agent instance
agent: Optional[MCPAgent] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize agent on startup."""
global agent
# Initialize agent with local MCP server
agent = await create_mcp_agent([
{
"name": "local_tools",
"url": f"http://localhost:{settings.mcp_server_port}/mcp"
}
])
yield
# Cleanup
if agent:
await agent.close()
app = FastAPI(
title="MCP Agent API",
description="AI agent with Model Context Protocol tools",
version="1.0.0",
lifespan=lifespan
)
# Request/Response models
class ChatRequest(BaseModel):
message: str
class ChatResponse(BaseModel):
response: str
tools_used: List[str] = []
class ToolListResponse(BaseModel):
tools: List[Dict[str, Any]]
# Endpoints
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Chat with the MCP agent."""
if not agent:
raise HTTPException(500, "Agent not initialized")
response = await agent.run(request.message)
return ChatResponse(
response=response,
tools_used=[] # Could track tools used in agent state
)
@app.get("/tools", response_model=ToolListResponse)
async def list_tools():
"""List available MCP tools."""
if not agent:
raise HTTPException(500, "Agent not initialized")
tools = agent.registry.get_all_tools()
return ToolListResponse(
tools=[
{
"name": t.name,
"description": t.description,
"parameters": t.parameters
}
for t in tools
]
)
@app.get("/health")
async def health():
"""Health check."""
return {
"status": "healthy",
"agent_ready": agent is not None
}
# Run MCP server in background
def run_mcp_server():
"""Run the MCP server."""
mcp.run(transport=settings.mcp_transport.value)
if __name__ == "__main__":
import uvicorn
import threading
# Start MCP server in background thread
mcp_thread = threading.Thread(target=run_mcp_server, daemon=True)
mcp_thread.start()
# Start FastAPI
uvicorn.run(app, host="0.0.0.0", port=8000)Step 6: Database Setup
# setup_db.py
"""Initialize demo database for MCP tools."""
import sqlite3
def setup_database(db_path: str = "demo.db"):
"""Create demo database with sample data."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
role TEXT DEFAULT 'user'
)
""")
# Insert sample data
documents = [
("MCP Introduction", "Model Context Protocol (MCP) is an open standard for connecting LLMs to external systems.", "documentation"),
("Tool Integration Guide", "This guide covers how to integrate custom tools with MCP servers.", "tutorial"),
("Security Best Practices", "When building MCP servers, always validate inputs and restrict access.", "security"),
("Performance Optimization", "Use connection pooling and caching to improve MCP server performance.", "performance"),
]
cursor.executemany(
"INSERT OR IGNORE INTO documents (title, content, category) VALUES (?, ?, ?)",
documents
)
users = [
("Alice Smith", "alice@example.com", "admin"),
("Bob Johnson", "bob@example.com", "user"),
("Carol White", "carol@example.com", "user"),
]
cursor.executemany(
"INSERT OR IGNORE INTO users (name, email, role) VALUES (?, ?, ?)",
users
)
conn.commit()
conn.close()
print(f"Database initialized: {db_path}")
if __name__ == "__main__":
setup_database()Requirements
# requirements.txt
mcp>=1.0.0
langchain>=0.3.0
langchain-openai>=0.2.0
langgraph>=0.2.0
openai>=1.50.0
fastapi>=0.115.0
uvicorn>=0.32.0
pydantic>=2.9.0
pydantic-settings>=2.6.0
pytz>=2024.1Usage Examples
Start the Server
# Initialize database
python setup_db.py
# Start MCP server (in one terminal)
python mcp_server.py
# Start API server (in another terminal)
python app.pyChat with Agent
# Ask about data
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{"message": "Search for documents about security"}'
# Use calculation
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{"message": "What is 15% of 250?"}'
# Get current time
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{"message": "What time is it in Tokyo?"}'List Available Tools
curl "http://localhost:8000/tools"Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────────┐
│ MCP AGENT ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CLIENT LAYER │ │
│ │ User ──────► FastAPI (/chat, /tools) │ │
│ └───────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AGENT LAYER (LangGraph) │ │
│ │ ┌──────────┐ ┌───────────────┐ ┌─────────────────┐ │ │
│ │ │ Think │───►│ Execute Tools │───►│ Process Results │──┐ │ │
│ │ │ Node │ │ Node │ │ Node │ │ │ │
│ │ └──────────┘ └───────┬───────┘ └─────────────────┘ │ │ │
│ │ ▲ │ │ │ │
│ │ └──────────────────┼──────────────────────────────────┘ │ │
│ └───────────────────────────┼─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MCP LAYER │ │
│ │ MCP Client ──► Tool Registry ──┬──► MCP Server 1 (DB, Files) │ │
│ │ └──► MCP Server 2 (External APIs) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TOOL LAYER │ │
│ │ [Database] [File System] [External APIs] [Calculations] │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Key Takeaways
- MCP standardizes tool integration - One protocol for all external systems
- Servers expose capabilities - Tools, resources, and prompts
- Clients discover dynamically - No hardcoded tool definitions
- LangGraph orchestrates reasoning - Flexible multi-step tool use
- Registry aggregates servers - Scale to many tool sources
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| MCP | Model Context Protocol | Open standard for LLM-tool integration |
| MCP Server | Exposes tools, resources, prompts | Makes capabilities discoverable |
| MCP Client | Discovers and calls server capabilities | Universal interface to any server |
| Tool | Executable function with parameters | Actions the LLM can take |
| Resource | Read-only data (files, configs) | Context the LLM can access |
| Prompt | Reusable prompt template | Standardized prompts across tools |
| Transport | Communication layer (stdio, HTTP) | Flexible deployment options |
| Registry | Aggregates multiple servers | Scale to many tool sources |