Enterprise Customer Service Chatbot
Build a production-grade AI chatbot handling millions of customer conversations with intelligent routing and human handoff
Enterprise Customer Service Chatbot
Build the #1 deployed AI application in enterprises - an intelligent customer service chatbot that handles inquiries 24/7, reduces costs by 40%, and seamlessly escalates to human agents when needed.
| Industry | SaaS / E-commerce / Enterprise |
| Difficulty | Advanced |
| Time | 2 weeks |
| Code | ~1500 lines |
TL;DR
Build an enterprise chatbot using intent classification (detect what customers want), RAG knowledge retrieval (answer from your docs), sentiment analysis (detect frustration), and intelligent escalation (hand off to humans when needed). Uses Redis for session state, ChromaDB for knowledge, and WebSockets for real-time chat.
What You'll Build
A production customer service chatbot that:
- Handles multi-turn conversations - Maintains context across messages
- Answers from knowledge base - Uses RAG for accurate, sourced responses
- Detects intent and sentiment - Routes based on customer needs
- Escalates intelligently - Hands off to humans with full context
- Supports multiple channels - Web widget, Slack, email, API
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ CUSTOMER SERVICE CHATBOT ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CUSTOMER CHANNELS │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Web Widget│ │ Slack │ │ Email │ │ API │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ └────────┴────────────┴────────────┴────────────┴─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MESSAGE GATEWAY │ │
│ │ Channel Router ──► Session Manager ──► Message Queue │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ INTENT & CONTEXT │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │Intent │ │Sentiment │ │Entity │ │ │
│ │ │Classifier │ │Analyzer │ │Extractor │ │ │
│ │ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │ │
│ │ └─────────────────┴─────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Context Builder │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE GENERATION │ │
│ │ Knowledge RAG ──┬──► LLM Generator ──► Guardrails │ │
│ │ Templates ──────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ INTELLIGENT ROUTING │ │
│ │ Confidence Scorer ──► Escalation Rules │ │
│ │ │ │ │
│ │ ┌───────────────┴───────────────┐ │ │
│ │ ▼ ▼ │ │
│ │ [Auto Response] [Human Handoff] │ │
│ │ │ │ │ │
│ └──────────────┼───────────────────────────────┼──────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────────────────┐ │
│ │ CUSTOMER CHANNELS │ │ BACKEND SERVICES │ │
│ │ (Response Delivered) │ │ CRM ── Tickets ── Analytics │ │
│ └─────────────────────────┘ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Project Structure
customer-service-chatbot/
├── src/
│ ├── __init__.py
│ ├── config.py
│ ├── channels/
│ │ ├── __init__.py
│ │ ├── web_widget.py # Web chat widget
│ │ ├── slack_bot.py # Slack integration
│ │ ├── email_handler.py # Email processing
│ │ └── api_gateway.py # REST API
│ ├── understanding/
│ │ ├── __init__.py
│ │ ├── intent.py # Intent classification
│ │ ├── sentiment.py # Sentiment analysis
│ │ ├── entities.py # Entity extraction
│ │ └── context.py # Context management
│ ├── knowledge/
│ │ ├── __init__.py
│ │ ├── indexer.py # Knowledge base indexing
│ │ ├── retriever.py # RAG retrieval
│ │ └── sources.py # Knowledge sources
│ ├── response/
│ │ ├── __init__.py
│ │ ├── generator.py # Response generation
│ │ ├── templates.py # Response templates
│ │ └── guardrails.py # Safety checks
│ ├── routing/
│ │ ├── __init__.py
│ │ ├── confidence.py # Confidence scoring
│ │ ├── escalation.py # Escalation logic
│ │ └── handoff.py # Human handoff
│ ├── integrations/
│ │ ├── __init__.py
│ │ ├── crm.py # CRM integration
│ │ └── ticketing.py # Ticket system
│ └── api/
│ ├── __init__.py
│ └── main.py # FastAPI application
├── widget/ # Embeddable web widget
├── tests/
└── requirements.txtTech Stack
| Technology | Purpose |
|---|---|
| LangChain | Conversation orchestration |
| OpenAI GPT-4o | Response generation |
| ChromaDB | Knowledge vector store |
| Redis | Session & cache |
| FastAPI | API backend |
| WebSocket | Real-time chat |
| Celery | Async processing |
Implementation
Configuration
# src/config.py
from pydantic_settings import BaseSettings
from typing import List, Dict, Optional
from enum import Enum
class EscalationTrigger(str, Enum):
LOW_CONFIDENCE = "low_confidence"
NEGATIVE_SENTIMENT = "negative_sentiment"
EXPLICIT_REQUEST = "explicit_request"
SENSITIVE_TOPIC = "sensitive_topic"
REPEATED_FAILURE = "repeated_failure"
class Settings(BaseSettings):
# LLM Settings
openai_api_key: str
model: str = "gpt-4o"
temperature: float = 0.3
# Knowledge Base
chroma_persist_dir: str = "./data/chroma"
embedding_model: str = "text-embedding-3-small"
# Session Management
redis_url: str = "redis://localhost:6379"
session_ttl: int = 3600 # 1 hour
max_history_length: int = 20
# Confidence Thresholds
auto_respond_threshold: float = 0.85
escalation_threshold: float = 0.5
sentiment_escalation_threshold: float = -0.6
# Rate Limiting
max_messages_per_minute: int = 20
max_sessions_per_user: int = 5
# Escalation Settings
escalation_triggers: List[EscalationTrigger] = [
EscalationTrigger.LOW_CONFIDENCE,
EscalationTrigger.NEGATIVE_SENTIMENT,
EscalationTrigger.EXPLICIT_REQUEST
]
# Business Hours (for human handoff)
business_hours_start: int = 9
business_hours_end: int = 18
timezone: str = "UTC"
class Config:
env_file = ".env"
settings = Settings()Understanding the Configuration:
┌─────────────────────────────────────────────────────────────┐
│ CONFIDENCE-BASED ROUTING │
├─────────────────────────────────────────────────────────────┤
│ │
│ Customer Message │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Confidence Score from LLM │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ├── Score ≥ 0.85 ──► AUTO-RESPOND (bot handles) │
│ │ │
│ ├── 0.50 ≤ Score < 0.85 ──► RESPOND WITH CAUTION │
│ │ │
│ └── Score < 0.50 ──► ESCALATE TO HUMAN │
│ │
└─────────────────────────────────────────────────────────────┘| Setting | Value | Why This Matters |
|---|---|---|
temperature: 0.3 | Low creativity | Customer service needs consistent, predictable responses |
auto_respond_threshold: 0.85 | High bar | Only auto-respond when very confident to avoid errors |
escalation_threshold: 0.5 | Safety net | Low confidence triggers human review |
sentiment_escalation_threshold: -0.6 | Frustration detector | Negative sentiment (-1 to 1 scale) triggers escalation |
max_history_length: 20 | Memory limit | Keeps context manageable, prevents token overflow |
Intent Classification
# src/understanding/intent.py
from typing import List, Tuple
from enum import Enum
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from ..config import settings
class CustomerIntent(str, Enum):
BILLING = "billing"
TECHNICAL_SUPPORT = "technical_support"
ACCOUNT_MANAGEMENT = "account_management"
PRODUCT_INQUIRY = "product_inquiry"
COMPLAINT = "complaint"
CANCELLATION = "cancellation"
FEEDBACK = "feedback"
GENERAL_QUESTION = "general_question"
HUMAN_REQUEST = "human_request"
class IntentResult(BaseModel):
primary_intent: CustomerIntent
confidence: float = Field(ge=0, le=1)
secondary_intents: List[CustomerIntent] = []
requires_authentication: bool = False
urgency: str = Field(default="normal") # low, normal, high, critical
class IntentClassifier:
"""Classifies customer message intent."""
def __init__(self):
self.llm = ChatOpenAI(
model=settings.model,
api_key=settings.openai_api_key,
temperature=0
).with_structured_output(IntentResult)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a customer service intent classifier.
Classify the customer message into one of these intents:
- billing: Payment, invoices, charges, refunds
- technical_support: Bugs, errors, how-to questions
- account_management: Profile, settings, password
- product_inquiry: Features, pricing, availability
- complaint: Dissatisfaction, problems, issues
- cancellation: Cancel subscription, close account
- feedback: Suggestions, praise, general feedback
- general_question: Other inquiries
- human_request: Explicit request to speak with human
Also determine:
- Confidence (0-1)
- If authentication is required
- Urgency level (low/normal/high/critical)"""),
("human", """Customer message: {message}
Conversation context: {context}
Classify this message.""")
])
async def classify(
self,
message: str,
context: str = ""
) -> IntentResult:
"""Classify customer intent."""
chain = self.prompt | self.llm
result = await chain.ainvoke({
"message": message,
"context": context
})
return result
class SentimentAnalyzer:
"""Analyzes customer sentiment."""
def __init__(self):
self.llm = ChatOpenAI(
model="gpt-3.5-turbo", # Faster for sentiment
api_key=settings.openai_api_key,
temperature=0
)
async def analyze(self, message: str) -> Tuple[float, str]:
"""Analyze sentiment. Returns (score, label).
Score: -1 (very negative) to 1 (very positive)
"""
prompt = ChatPromptTemplate.from_messages([
("system", """Analyze the sentiment of this customer message.
Return JSON: {"score": -1 to 1, "label": "positive/neutral/negative/frustrated/angry"}"""),
("human", "{message}")
])
chain = prompt | self.llm
result = await chain.ainvoke({"message": message})
import json
try:
data = json.loads(result.content)
return data["score"], data["label"]
except:
return 0.0, "neutral"Why Intent + Sentiment Together:
┌─────────────────────────────────────────────────────────────┐
│ DUAL ANALYSIS PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ │
│ "I've been charged twice and no one is helping!" │
│ │ │
│ ├──────────────────┬───────────────────────────── │
│ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ INTENT │ │ SENTIMENT │ │
│ │ "billing" │ │ score: -0.8 │ │
│ │ urgency: high │ │ label: angry │ │
│ │ confidence: │ └───────────────┘ │
│ │ 0.95 │ │ │
│ └───────────────┘ │ │
│ │ │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ DECISION: Route to billing + escalate (angry) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘| Component | Model Used | Why |
|---|---|---|
| IntentClassifier | GPT-4o | Needs nuanced understanding of customer requests |
| SentimentAnalyzer | GPT-3.5-turbo | Simpler task, faster response, lower cost |
Intent categories are chosen based on common support patterns:
- billing - Payment issues (highest urgency, often frustrated customers)
- technical_support - How-to and bug reports (needs knowledge base)
- cancellation - Churn risk (requires retention specialist)
- human_request - Explicit escalation request (always honor)
Session and Context Management
# src/understanding/context.py
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
import redis.asyncio as redis
from ..config import settings
@dataclass
class Message:
role: str # user, assistant, system
content: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict = field(default_factory=dict)
@dataclass
class CustomerContext:
session_id: str
customer_id: Optional[str] = None
channel: str = "web"
messages: List[Message] = field(default_factory=list)
intent_history: List[str] = field(default_factory=list)
sentiment_trend: List[float] = field(default_factory=list)
escalated: bool = False
agent_id: Optional[str] = None
metadata: Dict = field(default_factory=dict)
class SessionManager:
"""Manages customer conversation sessions."""
def __init__(self):
self.redis = redis.from_url(settings.redis_url)
async def get_session(self, session_id: str) -> Optional[CustomerContext]:
"""Get or create session."""
data = await self.redis.get(f"session:{session_id}")
if data:
return self._deserialize(data)
return None
async def create_session(
self,
session_id: str,
customer_id: str = None,
channel: str = "web"
) -> CustomerContext:
"""Create new session."""
context = CustomerContext(
session_id=session_id,
customer_id=customer_id,
channel=channel
)
await self.save_session(context)
return context
async def save_session(self, context: CustomerContext):
"""Save session to Redis."""
await self.redis.setex(
f"session:{context.session_id}",
settings.session_ttl,
self._serialize(context)
)
async def add_message(
self,
session_id: str,
role: str,
content: str,
metadata: Dict = None
):
"""Add message to session."""
context = await self.get_session(session_id)
if not context:
context = await self.create_session(session_id)
context.messages.append(Message(
role=role,
content=content,
metadata=metadata or {}
))
# Trim history if needed
if len(context.messages) > settings.max_history_length:
context.messages = context.messages[-settings.max_history_length:]
await self.save_session(context)
return context
def build_context_string(
self,
context: CustomerContext,
max_messages: int = 10
) -> str:
"""Build context string for LLM."""
recent = context.messages[-max_messages:]
lines = []
for msg in recent:
role = "Customer" if msg.role == "user" else "Assistant"
lines.append(f"{role}: {msg.content}")
return "\n".join(lines)
def _serialize(self, context: CustomerContext) -> str:
"""Serialize context to JSON."""
data = {
"session_id": context.session_id,
"customer_id": context.customer_id,
"channel": context.channel,
"messages": [
{
"role": m.role,
"content": m.content,
"timestamp": m.timestamp.isoformat(),
"metadata": m.metadata
}
for m in context.messages
],
"intent_history": context.intent_history,
"sentiment_trend": context.sentiment_trend,
"escalated": context.escalated,
"agent_id": context.agent_id,
"metadata": context.metadata
}
return json.dumps(data)
def _deserialize(self, data: str) -> CustomerContext:
"""Deserialize context from JSON."""
d = json.loads(data)
return CustomerContext(
session_id=d["session_id"],
customer_id=d.get("customer_id"),
channel=d.get("channel", "web"),
messages=[
Message(
role=m["role"],
content=m["content"],
timestamp=datetime.fromisoformat(m["timestamp"]),
metadata=m.get("metadata", {})
)
for m in d.get("messages", [])
],
intent_history=d.get("intent_history", []),
sentiment_trend=d.get("sentiment_trend", []),
escalated=d.get("escalated", False),
agent_id=d.get("agent_id"),
metadata=d.get("metadata", {})
)Why Redis for Session Management:
┌─────────────────────────────────────────────────────────────┐
│ SESSION LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Customer opens chat │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ create_session(session_id) │ │
│ │ • Generate unique ID (UUID) │ │
│ │ • Store in Redis with TTL: 1 hour │ │
│ │ • Link to customer_id if authenticated │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ Each message │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ add_message(session_id, role, content) │ │
│ │ • Append to messages[] │ │
│ │ • Trim if > max_history_length (20) │ │
│ │ • Refresh TTL on save │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ For LLM context │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ build_context_string(context, max_messages=10) │ │
│ │ • Format: "Customer: ... \n Assistant: ..." │ │
│ │ • Only last 10 turns (keeps prompt short) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘| Design Decision | Why |
|---|---|
| Redis vs Database | Sub-millisecond reads, auto-expiry (TTL), horizontal scaling |
| JSON serialization | Simple, debuggable, no ORM overhead |
| Trim history at 20 | Prevents runaway costs from long conversations |
| Build context with 10 | LLM context window optimization (fewer tokens) |
| Track intent_history | Enables "this customer keeps asking about billing" patterns |
| Track sentiment_trend | Detect escalating frustration (3 negatives = escalate) |
Knowledge RAG
# src/knowledge/retriever.py
from typing import List, Dict
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from ..config import settings
class KnowledgeRetriever:
"""RAG retrieval for knowledge base."""
def __init__(self):
self.embeddings = OpenAIEmbeddings(
model=settings.embedding_model,
api_key=settings.openai_api_key
)
self.vectorstore = Chroma(
persist_directory=settings.chroma_persist_dir,
embedding_function=self.embeddings,
collection_name="knowledge_base"
)
async def retrieve(
self,
query: str,
intent: str = None,
k: int = 5
) -> List[Dict]:
"""Retrieve relevant knowledge."""
# Build filter based on intent
filter_dict = None
if intent:
filter_dict = {"category": intent}
results = self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=filter_dict
)
return [
{
"content": doc.page_content,
"source": doc.metadata.get("source", "knowledge_base"),
"category": doc.metadata.get("category", "general"),
"score": float(score),
"metadata": doc.metadata
}
for doc, score in results
]
async def get_faq_answer(self, query: str) -> Dict:
"""Get direct FAQ answer if available."""
# Search FAQ collection specifically
faq_store = Chroma(
persist_directory=settings.chroma_persist_dir,
embedding_function=self.embeddings,
collection_name="faqs"
)
results = faq_store.similarity_search_with_score(query, k=1)
if results and results[0][1] < 0.3: # High similarity
doc, score = results[0]
return {
"question": doc.metadata.get("question"),
"answer": doc.page_content,
"confidence": 1 - score,
"source": "faq"
}
return NoneTwo-Tier Knowledge Retrieval:
┌─────────────────────────────────────────────────────────────┐
│ KNOWLEDGE RETRIEVAL FLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ Customer: "How do I reset my password?" │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. Check FAQ Collection (fast path) │ │
│ │ • Exact match search in "faqs" collection │ │
│ │ • If similarity score < 0.3 → return FAQ answer │ │
│ │ • Confidence = 1 - score (high confidence) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ No FAQ match │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 2. Search Knowledge Base (full RAG) │ │
│ │ • Vector search in "knowledge_base" collection │ │
│ │ • Filter by intent category (billing, tech...) │ │
│ │ • Return top 5 relevant documents │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘| Strategy | When to Use | Latency | Cost |
|---|---|---|---|
| FAQ lookup | Common questions with exact answers | ~50ms | Very low |
| Full RAG | Complex questions, unique situations | ~500ms | Higher |
| Intent filtering | Narrow search to relevant docs | ~400ms | Medium |
Why separate collections:
- faqs: Small, curated Q&A pairs → faster, more accurate
- knowledge_base: Large docs (manuals, policies) → comprehensive but slower
Response Generation
# src/response/generator.py
from typing import Dict, List, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from ..config import settings
from ..knowledge.retriever import KnowledgeRetriever
class GeneratedResponse(BaseModel):
content: str
confidence: float = Field(ge=0, le=1)
sources_used: List[str] = []
suggested_actions: List[str] = []
needs_clarification: bool = False
clarification_question: Optional[str] = None
class ResponseGenerator:
"""Generates customer service responses."""
def __init__(self):
self.llm = ChatOpenAI(
model=settings.model,
api_key=settings.openai_api_key,
temperature=settings.temperature
).with_structured_output(GeneratedResponse)
self.retriever = KnowledgeRetriever()
self.system_prompt = """You are a helpful, professional customer service assistant for {company_name}.
Your role:
- Answer customer questions accurately using the provided knowledge
- Be empathetic and understanding
- Keep responses concise but complete
- Always cite sources when using specific information
- Ask clarifying questions when the request is ambiguous
- Never make up information - if unsure, say so
- For sensitive topics (billing disputes, cancellations), be extra careful
Tone: Professional, friendly, and helpful.
Knowledge Base Context:
{knowledge_context}
Previous Conversation:
{conversation_history}"""
self.prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
("human", "{message}")
])
async def generate(
self,
message: str,
intent: str,
context: str,
company_name: str = "our company"
) -> GeneratedResponse:
"""Generate response for customer message."""
# Retrieve relevant knowledge
knowledge = await self.retriever.retrieve(message, intent)
knowledge_context = "\n\n".join([
f"[{k['source']}]: {k['content']}"
for k in knowledge[:3]
])
chain = self.prompt | self.llm
result = await chain.ainvoke({
"company_name": company_name,
"knowledge_context": knowledge_context,
"conversation_history": context,
"message": message
})
# Add sources
result.sources_used = [k["source"] for k in knowledge[:3]]
return result
class ResponseTemplates:
"""Pre-defined response templates."""
GREETING = "Hello! Welcome to {company_name} support. How can I help you today?"
ESCALATION = """I understand you'd like to speak with a human agent.
I'm connecting you now. Our current wait time is approximately {wait_time} minutes.
Is there anything else I can help you with while you wait?"""
AFTER_HOURS = """Thank you for contacting us. Our support team is currently offline.
Business hours: {hours}
I can still help with common questions, or you can leave a message and we'll respond as soon as possible."""
CLARIFICATION = "I want to make sure I help you correctly. Could you please clarify: {question}"
TICKET_CREATED = """I've created a support ticket for your issue.
Ticket number: {ticket_id}
A member of our team will follow up within {response_time}.
Is there anything else I can help you with?"""
SATISFACTION = """Before you go, we'd love to hear your feedback.
How would you rate your experience today? (1-5 stars)"""Understanding Response Generation:
┌─────────────────────────────────────────────────────────────┐
│ RESPONSE GENERATION PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Input: "How do I upgrade my subscription?" │
│ │ │
│ ├── Intent: "account_management" │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. RETRIEVE KNOWLEDGE │ │
│ │ retrieve(query, intent="account_management", k=5)│ │
│ │ Returns: upgrade docs, pricing page, FAQ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 2. BUILD CONTEXT │ │
│ │ [pricing_page]: "Plans: Basic $10, Pro $25..." │ │
│ │ [help_docs]: "To upgrade, go to Settings..." │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 3. GENERATE WITH LLM │ │
│ │ System: "You are a helpful CS assistant..." │ │
│ │ + Knowledge context │ │
│ │ + Conversation history │ │
│ │ + Customer message │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Output: GeneratedResponse │
│ • content: "To upgrade your subscription..." │
│ • confidence: 0.92 │
│ • sources_used: ["pricing_page", "help_docs"] │
│ • needs_clarification: false │
│ │
└─────────────────────────────────────────────────────────────┘| Structured Output Field | Purpose |
|---|---|
confidence | Drives escalation decision (low = escalate) |
sources_used | Enables "Learn more" links, audit trail |
suggested_actions | Proactive help ("You might also want to...") |
needs_clarification | Triggers follow-up question instead of answer |
Template Pattern - Pre-defined responses for common scenarios reduce latency and ensure consistency (greeting, escalation, after-hours).
Escalation and Human Handoff
# src/routing/escalation.py
from typing import Optional, Tuple
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
from ..config import settings, EscalationTrigger
from ..understanding.context import CustomerContext
class EscalationReason(str, Enum):
LOW_CONFIDENCE = "AI confidence too low"
NEGATIVE_SENTIMENT = "Customer frustration detected"
EXPLICIT_REQUEST = "Customer requested human"
SENSITIVE_TOPIC = "Sensitive issue detected"
REPEATED_FAILURE = "Multiple failed attempts"
COMPLEX_ISSUE = "Issue requires human judgment"
VIP_CUSTOMER = "Priority customer"
@dataclass
class EscalationDecision:
should_escalate: bool
reason: Optional[EscalationReason] = None
priority: str = "normal" # low, normal, high, urgent
suggested_department: Optional[str] = None
context_summary: Optional[str] = None
class EscalationManager:
"""Manages escalation decisions."""
SENSITIVE_KEYWORDS = [
"cancel", "refund", "lawsuit", "lawyer", "sue",
"scam", "fraud", "stolen", "unauthorized",
"complaint", "manager", "supervisor"
]
HUMAN_REQUEST_PHRASES = [
"speak to human", "talk to person", "real person",
"human agent", "live agent", "speak to someone",
"talk to agent", "representative"
]
def __init__(self):
self.failure_counts = {} # session_id -> count
async def evaluate(
self,
context: CustomerContext,
response_confidence: float,
sentiment_score: float,
message: str
) -> EscalationDecision:
"""Evaluate if escalation is needed."""
message_lower = message.lower()
# Check explicit human request
if any(phrase in message_lower for phrase in self.HUMAN_REQUEST_PHRASES):
return EscalationDecision(
should_escalate=True,
reason=EscalationReason.EXPLICIT_REQUEST,
priority="normal"
)
# Check sensitive keywords
if any(word in message_lower for word in self.SENSITIVE_KEYWORDS):
return EscalationDecision(
should_escalate=True,
reason=EscalationReason.SENSITIVE_TOPIC,
priority="high",
suggested_department=self._get_department(message_lower)
)
# Check confidence threshold
if response_confidence < settings.escalation_threshold:
return EscalationDecision(
should_escalate=True,
reason=EscalationReason.LOW_CONFIDENCE,
priority="normal"
)
# Check sentiment
if sentiment_score < settings.sentiment_escalation_threshold:
# Check sentiment trend
if len(context.sentiment_trend) >= 3:
recent = context.sentiment_trend[-3:]
if all(s < 0 for s in recent): # Consistently negative
return EscalationDecision(
should_escalate=True,
reason=EscalationReason.NEGATIVE_SENTIMENT,
priority="high"
)
# Check repeated failures
session_failures = self.failure_counts.get(context.session_id, 0)
if session_failures >= 3:
return EscalationDecision(
should_escalate=True,
reason=EscalationReason.REPEATED_FAILURE,
priority="normal"
)
return EscalationDecision(should_escalate=False)
def record_failure(self, session_id: str):
"""Record a response failure."""
self.failure_counts[session_id] = self.failure_counts.get(session_id, 0) + 1
def _get_department(self, message: str) -> str:
"""Suggest department based on message content."""
if any(w in message for w in ["refund", "charge", "billing", "payment"]):
return "billing"
if any(w in message for w in ["cancel", "subscription"]):
return "retention"
if any(w in message for w in ["bug", "error", "broken", "not working"]):
return "technical"
return "general"
class HumanHandoff:
"""Handles handoff to human agents."""
async def initiate_handoff(
self,
context: CustomerContext,
decision: EscalationDecision
) -> Dict:
"""Initiate handoff to human agent."""
# Build context summary for agent
summary = self._build_summary(context)
handoff_data = {
"session_id": context.session_id,
"customer_id": context.customer_id,
"channel": context.channel,
"priority": decision.priority,
"reason": decision.reason.value if decision.reason else "Unknown",
"department": decision.suggested_department or "general",
"summary": summary,
"conversation_history": [
{"role": m.role, "content": m.content}
for m in context.messages[-10:]
],
"customer_sentiment": context.sentiment_trend[-1] if context.sentiment_trend else 0,
"intents_detected": context.intent_history[-5:],
"timestamp": datetime.now().isoformat()
}
# Here you would integrate with your ticketing/routing system
# Example: await self.zendesk.create_ticket(handoff_data)
return handoff_data
def _build_summary(self, context: CustomerContext) -> str:
"""Build conversation summary for agent."""
if not context.messages:
return "No conversation history"
# Get key points from conversation
user_messages = [m.content for m in context.messages if m.role == "user"]
summary_parts = [
f"Customer contacted via {context.channel}.",
f"Main topics: {', '.join(context.intent_history[-3:]) if context.intent_history else 'Unknown'}.",
f"Last message: {user_messages[-1][:200] if user_messages else 'N/A'}..."
]
return " ".join(summary_parts)Understanding Escalation Logic:
┌─────────────────────────────────────────────────────────────┐
│ ESCALATION DECISION TREE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Evaluate Escalation │
│ │ │
│ ├── "talk to a human" ──► EXPLICIT_REQUEST (normal) │
│ │ │
│ ├── "cancel", "refund", "lawyer" ──► SENSITIVE_TOPIC│
│ │ (high priority) │
│ │ │
│ ├── confidence < 0.5 ──► LOW_CONFIDENCE (normal) │
│ │ │
│ ├── sentiment < -0.6 AND │
│ │ last 3 sentiments negative ──► NEGATIVE_SENTIMENT│
│ │ (high priority) │
│ │ │
│ ├── 3+ failures in session ──► REPEATED_FAILURE │
│ │ │
│ └── None matched ──► NO ESCALATION (bot continues) │
│ │
└─────────────────────────────────────────────────────────────┘| Escalation Trigger | Detection Method | Priority |
|---|---|---|
| EXPLICIT_REQUEST | Phrase matching ("speak to human") | Normal |
| SENSITIVE_TOPIC | Keyword detection (cancel, refund, lawyer) | High |
| LOW_CONFIDENCE | Response confidence below 0.5 | Normal |
| NEGATIVE_SENTIMENT | Score below -0.6 + trend analysis | High |
| REPEATED_FAILURE | 3+ failures in one session | Normal |
| VIP_CUSTOMER | Customer metadata flag | Urgent |
Human Handoff Data:
- Full conversation history (last 10 messages)
- Detected intents and sentiment trend
- Summary for quick agent onboarding
- Suggested department routing (billing, retention, technical)
Main Chatbot Orchestrator
# src/api/chatbot.py
from typing import Dict, Optional
from dataclasses import dataclass
from ..understanding.intent import IntentClassifier, SentimentAnalyzer
from ..understanding.context import SessionManager, CustomerContext
from ..response.generator import ResponseGenerator, ResponseTemplates
from ..routing.escalation import EscalationManager, HumanHandoff
from ..config import settings
@dataclass
class ChatResponse:
message: str
session_id: str
confidence: float
escalated: bool = False
escalation_reason: Optional[str] = None
sources: list = None
suggested_actions: list = None
class CustomerServiceChatbot:
"""Main chatbot orchestrator."""
def __init__(self, company_name: str = "Acme Corp"):
self.company_name = company_name
self.intent_classifier = IntentClassifier()
self.sentiment_analyzer = SentimentAnalyzer()
self.session_manager = SessionManager()
self.response_generator = ResponseGenerator()
self.escalation_manager = EscalationManager()
self.handoff = HumanHandoff()
async def process_message(
self,
message: str,
session_id: str,
customer_id: str = None,
channel: str = "web"
) -> ChatResponse:
"""Process incoming customer message."""
# Get or create session
context = await self.session_manager.get_session(session_id)
if not context:
context = await self.session_manager.create_session(
session_id, customer_id, channel
)
# Add user message to context
context = await self.session_manager.add_message(
session_id, "user", message
)
# Build context string
context_str = self.session_manager.build_context_string(context)
# Classify intent
intent_result = await self.intent_classifier.classify(message, context_str)
context.intent_history.append(intent_result.primary_intent.value)
# Analyze sentiment
sentiment_score, sentiment_label = await self.sentiment_analyzer.analyze(message)
context.sentiment_trend.append(sentiment_score)
# Generate response
response = await self.response_generator.generate(
message=message,
intent=intent_result.primary_intent.value,
context=context_str,
company_name=self.company_name
)
# Check for escalation
escalation = await self.escalation_manager.evaluate(
context=context,
response_confidence=response.confidence,
sentiment_score=sentiment_score,
message=message
)
if escalation.should_escalate:
# Initiate human handoff
handoff_data = await self.handoff.initiate_handoff(context, escalation)
context.escalated = True
await self.session_manager.save_session(context)
return ChatResponse(
message=ResponseTemplates.ESCALATION.format(
wait_time=self._estimate_wait_time()
),
session_id=session_id,
confidence=1.0,
escalated=True,
escalation_reason=escalation.reason.value if escalation.reason else None
)
# Add assistant message to context
await self.session_manager.add_message(
session_id, "assistant", response.content
)
return ChatResponse(
message=response.content,
session_id=session_id,
confidence=response.confidence,
escalated=False,
sources=response.sources_used,
suggested_actions=response.suggested_actions
)
def _estimate_wait_time(self) -> int:
"""Estimate wait time for human agent."""
# In production, integrate with queue management
return 5 # Default 5 minutesUnderstanding the Orchestration Flow:
┌─────────────────────────────────────────────────────────────┐
│ CHATBOT ORCHESTRATOR - SINGLE MESSAGE FLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ process_message(message, session_id) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. SESSION MANAGEMENT │ │
│ │ • Get or create session from Redis │ │
│ │ • Add user message to history │ │
│ │ • Build context string for LLM │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 2. UNDERSTAND │ │
│ │ • Classify intent (billing, support, etc.) │ │
│ │ • Analyze sentiment (-1 to 1) │ │
│ │ • Track intent and sentiment history │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 3. GENERATE RESPONSE │ │
│ │ • Retrieve relevant knowledge (RAG) │ │
│ │ • Generate response with confidence score │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 4. EVALUATE ESCALATION │ │
│ │ • Check confidence, sentiment, keywords │ │
│ │ • If escalate → hand off to human │ │
│ │ • If not → return AI response │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ├── Escalation needed ──► Return escalation message │
│ │ + initiate handoff │
│ │ │
│ └── No escalation ──► Save assistant message │
│ + return response │
│ │
└─────────────────────────────────────────────────────────────┘Design Pattern: Layered Processing
| Layer | Responsibility | Latency Budget |
|---|---|---|
| Session | State management | ~10ms (Redis) |
| Understanding | Intent + Sentiment | ~200ms (GPT-3.5) |
| Generation | Knowledge retrieval + LLM | ~800ms (RAG + GPT-4) |
| Routing | Escalation decision | ~5ms (rule-based) |
| Total | End-to-end response | ~1 second |
FastAPI Application
# src/api/main.py
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uuid
from .chatbot import CustomerServiceChatbot
app = FastAPI(
title="Customer Service Chatbot API",
description="AI-powered customer service chatbot"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
chatbot = CustomerServiceChatbot(company_name="Acme Corp")
class MessageRequest(BaseModel):
message: str
session_id: Optional[str] = None
customer_id: Optional[str] = None
channel: str = "api"
class MessageResponse(BaseModel):
message: str
session_id: str
confidence: float
escalated: bool
escalation_reason: Optional[str]
sources: Optional[list]
@app.post("/chat", response_model=MessageResponse)
async def chat(request: MessageRequest):
"""Send a message and get a response."""
session_id = request.session_id or str(uuid.uuid4())
response = await chatbot.process_message(
message=request.message,
session_id=session_id,
customer_id=request.customer_id,
channel=request.channel
)
return MessageResponse(
message=response.message,
session_id=response.session_id,
confidence=response.confidence,
escalated=response.escalated,
escalation_reason=response.escalation_reason,
sources=response.sources
)
@app.websocket("/ws/{session_id}")
async def websocket_chat(websocket: WebSocket, session_id: str):
"""WebSocket endpoint for real-time chat."""
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data.get("message", "")
response = await chatbot.process_message(
message=message,
session_id=session_id,
channel="websocket"
)
await websocket.send_json({
"message": response.message,
"confidence": response.confidence,
"escalated": response.escalated
})
except Exception as e:
await websocket.close()
@app.get("/health")
async def health():
return {"status": "healthy"}API Design Decisions:
┌─────────────────────────────────────────────────────────────┐
│ TWO API PATTERNS │
├─────────────────────────────────────────────────────────────┤
│ │
│ PATTERN 1: REST (POST /chat) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • Request/Response model │ │
│ │ • Client sends message, waits for response │ │
│ │ • Good for: API integrations, email processing │ │
│ │ • Latency: ~1 second per message │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ PATTERN 2: WebSocket (WS /ws/{session_id}) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • Persistent connection │ │
│ │ • Bi-directional communication │ │
│ │ • Good for: Web widget, real-time chat │ │
│ │ • Lower overhead for multi-turn conversations │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘| Endpoint | Method | Use Case |
|---|---|---|
POST /chat | REST | Slack bots, email integration, API clients |
WS /ws/{session_id} | WebSocket | Web widget, mobile apps (real-time) |
GET /health | REST | Load balancer health checks, monitoring |
Session ID Strategy:
- Auto-generated (UUID): For anonymous users starting fresh
- Provided by client: For returning users, cross-channel continuity
- Based on customer_id: For authenticated users (link to CRM)
Deployment
Docker Configuration
# docker-compose.yml
version: '3.8'
services:
chatbot-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- chroma
redis:
image: redis:7-alpine
ports:
- "6379:6379"
chroma:
image: chromadb/chroma:latest
ports:
- "8001:8000"
volumes:
- chroma_data:/chroma/chroma
widget:
build: ./widget
ports:
- "3000:3000"
volumes:
chroma_data:Business Impact
| Metric | Before | After | Improvement |
|---|---|---|---|
| Response time | 4 hours avg | 30 seconds | 99% faster |
| Cost per interaction | $6.00 | $0.50 | 92% reduction |
| First contact resolution | 45% | 72% | 60% improvement |
| Customer satisfaction | 3.2/5 | 4.4/5 | 38% higher |
| Agent handling time | 15 min | 8 min | 47% reduction |
| 24/7 availability | No | Yes | Always on |
Key Learnings
- Intent classification is critical - Accurate intent detection routes to the right response strategy
- Sentiment tracking prevents churn - Detecting frustration early enables proactive escalation
- Context is king - Maintaining conversation history dramatically improves responses
- Graceful escalation builds trust - Seamless handoff to humans maintains customer confidence
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Intent Classification | Detect what customer wants (billing, support, cancellation) | Routes to right knowledge base, triggers appropriate workflows |
| Sentiment Analysis | Score from -1 (angry) to +1 (happy) with trend tracking | Enables proactive escalation before customer churns |
| Session Management | Redis-backed conversation state with TTL | Maintains context across messages, enables multi-turn conversations |
| RAG Knowledge Retrieval | Two-tier: FAQ lookup (fast) + full knowledge search (comprehensive) | Accurate, sourced answers without hallucination |
| Confidence Scoring | LLM self-rates response quality (0-1) | Drives automation vs escalation decision |
| Escalation Triggers | Rules: low confidence, negative sentiment, sensitive keywords, explicit request | Ensures humans handle what AI can't |
| Human Handoff | Package: conversation history, intents, sentiment, summary | Agent gets full context, no customer repeat |
| WebSocket API | Persistent connection for real-time chat | Lower latency, better UX for web widgets |
Next Steps
- Add voice channel support with speech-to-text
- Implement proactive outreach for common issues
- Build analytics dashboard for conversation insights
- Add multilingual support