Chatbot
Build a conversational AI chatbot with streaming responses and memory
Chatbot
| Property | Value |
|---|---|
| Difficulty | Beginner |
| Time | ~2 hours |
| Code Size | ~200 LOC |
| Prerequisites | Python 3.10+, OpenAI API key |
TL;DR
Build a chatbot with streaming responses (tokens appear as they're generated) and conversation memory (remembers previous messages). Use Server-Sent Events (SSE) to push tokens to the frontend in real-time for a ChatGPT-like experience.
Why Streaming and Memory Matter
Users abandon chatbots that take more than 3 seconds to respond. Without streaming, the user stares at a blank screen while the LLM generates the full response. With streaming, the first token appears in ~200ms -- the user starts reading immediately.
Without memory, every message is a fresh conversation. The user says "My name is Alice" and three messages later the bot has forgotten. Memory makes multi-turn conversation possible.
The User Experience Gap
No streaming, no memory
3-second blank screen per message. Every question is standalone. User must repeat context. Feels broken.
Streaming + conversation memory
RecommendedFirst token in ~200ms. Bot remembers previous messages. Natural, fluid conversation. Feels like ChatGPT.
What You'll Learn
- Setting up OpenAI/Anthropic API clients
- Implementing streaming responses
- Managing conversation history
- Building a chat UI with real-time updates
Tech Stack
| Component | Technology | Why |
|---|---|---|
| LLM | OpenAI GPT-4o / Anthropic Claude | Best quality for conversational AI |
| Backend | FastAPI | Native async support for streaming |
| Frontend | Streamlit | Rapid prototyping with built-in chat components |
| Streaming | Server-Sent Events | Simpler than WebSockets for one-way token push |
Architecture
Streaming Chatbot Architecture
Project Structure
chatbot/
├── src/
│ ├── __init__.py
│ ├── chat.py # Chat logic
│ ├── memory.py # Conversation memory
│ ├── api.py # FastAPI application
│ └── prompts.py # System prompts
├── frontend/
│ └── app.py # Streamlit UI
├── tests/
│ └── test_chat.py
├── requirements.txt
└── README.mdImplementation
Step 1: Project Setup
mkdir chatbot && cd chatbot
python -m venv venv
source venv/bin/activateopenai>=1.0.0
anthropic>=0.18.0
fastapi>=0.100.0
uvicorn>=0.23.0
sse-starlette>=1.6.0
streamlit>=1.28.0
pydantic>=2.0.0
python-dotenv>=1.0.0pip install -r requirements.txtStep 2: Chat Engine
"""
Core chat functionality with streaming support.
"""
from typing import AsyncIterator, Optional
from dataclasses import dataclass, field
from openai import OpenAI, AsyncOpenAI
import anthropic
@dataclass
class Message:
"""A chat message."""
role: str # "user", "assistant", "system"
content: str
@dataclass
class ChatConfig:
"""Chat configuration."""
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 2000
system_prompt: str = "You are a helpful AI assistant."
class ChatEngine:
"""
Chat engine with streaming support.
Supports both OpenAI and Anthropic models.
"""
def __init__(self, config: ChatConfig = None, provider: str = "openai"):
self.config = config or ChatConfig()
self.provider = provider
if provider == "openai":
self.client = OpenAI()
self.async_client = AsyncOpenAI()
else:
self.client = anthropic.Anthropic()
self.async_client = anthropic.AsyncAnthropic()
def chat(self, messages: list[Message]) -> str:
"""
Send a chat message and get a complete response.
Args:
messages: Conversation history
Returns:
Assistant's response
"""
if self.provider == "openai":
return self._chat_openai(messages)
return self._chat_anthropic(messages)
async def chat_stream(
self,
messages: list[Message]
) -> AsyncIterator[str]:
"""
Stream a chat response token by token.
Args:
messages: Conversation history
Yields:
Response tokens as they arrive
"""
if self.provider == "openai":
async for token in self._stream_openai(messages):
yield token
else:
async for token in self._stream_anthropic(messages):
yield token
def _chat_openai(self, messages: list[Message]) -> str:
"""OpenAI chat completion."""
formatted = self._format_messages_openai(messages)
response = self.client.chat.completions.create(
model=self.config.model,
messages=formatted,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
return response.choices[0].message.content
async def _stream_openai(
self,
messages: list[Message]
) -> AsyncIterator[str]:
"""Stream OpenAI response."""
formatted = self._format_messages_openai(messages)
stream = await self.async_client.chat.completions.create(
model=self.config.model,
messages=formatted,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def _chat_anthropic(self, messages: list[Message]) -> str:
"""Anthropic chat completion."""
formatted = self._format_messages_anthropic(messages)
response = self.client.messages.create(
model=self.config.model,
system=self.config.system_prompt,
messages=formatted,
max_tokens=self.config.max_tokens
)
return response.content[0].text
async def _stream_anthropic(
self,
messages: list[Message]
) -> AsyncIterator[str]:
"""Stream Anthropic response."""
formatted = self._format_messages_anthropic(messages)
async with self.async_client.messages.stream(
model=self.config.model,
system=self.config.system_prompt,
messages=formatted,
max_tokens=self.config.max_tokens
) as stream:
async for text in stream.text_stream:
yield text
def _format_messages_openai(self, messages: list[Message]) -> list[dict]:
"""Format messages for OpenAI API."""
formatted = [{"role": "system", "content": self.config.system_prompt}]
for msg in messages:
formatted.append({"role": msg.role, "content": msg.content})
return formatted
def _format_messages_anthropic(self, messages: list[Message]) -> list[dict]:
"""Format messages for Anthropic API."""
return [{"role": msg.role, "content": msg.content} for msg in messages]Understanding the Chat Engine:
Streaming vs Non-Streaming: What's the Difference?
Non-Streaming (chat method)
Request → LLM generates entire response → Return all at once. User waits 3 seconds, then sees the full answer. Simple but feels slow.
Streaming (chat_stream method)
RecommendedRequest → LLM yields tokens as generated → Push each token. User sees text appearing character by character with instant feedback. First token arrives in ~200ms vs 3s for full response. User can start reading immediately. Mimics natural typing/speaking.
OpenAI vs Anthropic API Differences:
OpenAI vs Anthropic API Format
OpenAI
System prompt included in the messages array as a message with role "system". All messages (system, user, assistant) live in one list.
Anthropic
System prompt passed as a separate system parameter. The messages array only contains user and assistant messages.
| Parameter | Purpose | Typical Value |
|---|---|---|
temperature | Randomness (0=deterministic, 1=creative) | 0.7 for chat |
max_tokens | Maximum response length | 2000 for conversations |
model | Which LLM to use | gpt-4o-mini, claude-3-5-haiku |
Step 3: Conversation Memory
"""
Conversation memory management.
"""
from dataclasses import dataclass, field
from typing import Optional
import json
from pathlib import Path
from .chat import Message
@dataclass
class Conversation:
"""A conversation with history."""
id: str
messages: list[Message] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class ConversationMemory:
"""
Manages conversation history with persistence.
"""
def __init__(self, storage_path: Optional[str] = None):
self.conversations: dict[str, Conversation] = {}
self.storage_path = Path(storage_path) if storage_path else None
if self.storage_path:
self._load_conversations()
def create_conversation(self, conversation_id: str = None) -> str:
"""Create a new conversation."""
import uuid
conv_id = conversation_id or str(uuid.uuid4())
self.conversations[conv_id] = Conversation(id=conv_id)
return conv_id
def add_message(
self,
conversation_id: str,
role: str,
content: str
) -> None:
"""Add a message to a conversation."""
if conversation_id not in self.conversations:
self.create_conversation(conversation_id)
self.conversations[conversation_id].messages.append(
Message(role=role, content=content)
)
if self.storage_path:
self._save_conversations()
def get_messages(
self,
conversation_id: str,
limit: Optional[int] = None
) -> list[Message]:
"""Get messages from a conversation."""
if conversation_id not in self.conversations:
return []
messages = self.conversations[conversation_id].messages
if limit:
return messages[-limit:]
return messages
def clear_conversation(self, conversation_id: str) -> None:
"""Clear a conversation's history."""
if conversation_id in self.conversations:
self.conversations[conversation_id].messages = []
def delete_conversation(self, conversation_id: str) -> None:
"""Delete a conversation."""
if conversation_id in self.conversations:
del self.conversations[conversation_id]
def _save_conversations(self) -> None:
"""Save conversations to disk."""
if not self.storage_path:
return
self.storage_path.mkdir(parents=True, exist_ok=True)
data = {
conv_id: {
"id": conv.id,
"messages": [
{"role": m.role, "content": m.content}
for m in conv.messages
],
"metadata": conv.metadata
}
for conv_id, conv in self.conversations.items()
}
with open(self.storage_path / "conversations.json", "w") as f:
json.dump(data, f, indent=2)
def _load_conversations(self) -> None:
"""Load conversations from disk."""
if not self.storage_path:
return
conv_file = self.storage_path / "conversations.json"
if not conv_file.exists():
return
with open(conv_file, "r") as f:
data = json.load(f)
for conv_id, conv_data in data.items():
self.conversations[conv_id] = Conversation(
id=conv_data["id"],
messages=[
Message(role=m["role"], content=m["content"])
for m in conv_data["messages"]
],
metadata=conv_data.get("metadata", {})
)Understanding Conversation Memory:
Why Memory Matters: LLMs Are Stateless
Without Memory
User: "My name is Alice" → Bot: "Nice to meet you, Alice!" → User: "What's my name?" → Bot: "I don't know your name." The LLM has no memory between requests and cannot recall previous turns.
With Memory
RecommendedStored history is sent with each new request. The full conversation [user: "My name is Alice", assistant: "Nice to meet you..."] is included alongside the new message. Bot correctly answers: "Your name is Alice!" because the LLM sees full context.
Memory Limit Strategy:
Why Limit to Last N Messages?
Unlimited History
LLMs have limited context windows (GPT-4-turbo: 128K tokens, Claude 3: 200K tokens). Sending all messages increases cost and slows responses. Eventually exceeds the context window entirely.
Limited History (limit=20)
RecommendedKeep only the last 20 messages via get_messages(conv_id, limit=20). Messages 1-81 are discarded, messages 82-100 are kept. Old context is lost, but costs stay predictable and responses remain fast.
| Method | Purpose | When to Use |
|---|---|---|
create_conversation() | Start new chat session | User clicks "New Chat" |
add_message() | Save user/assistant message | After each turn |
get_messages(limit=20) | Retrieve recent history | Before calling LLM |
clear_conversation() | Reset but keep ID | User wants fresh start |
Step 4: FastAPI Application
"""
FastAPI application with streaming support.
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional
from sse_starlette.sse import EventSourceResponse
import asyncio
from .chat import ChatEngine, ChatConfig, Message
from .memory import ConversationMemory
app = FastAPI(
title="Chatbot API",
description="Conversational AI with streaming responses",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
chat_engine = ChatEngine(ChatConfig())
memory = ConversationMemory(storage_path="./data/conversations")
class ChatRequest(BaseModel):
"""Chat request model."""
message: str = Field(..., min_length=1, max_length=10000)
conversation_id: Optional[str] = None
stream: bool = True
class ChatResponse(BaseModel):
"""Chat response model."""
response: str
conversation_id: str
@app.get("/")
async def health():
"""Health check."""
return {"status": "healthy", "service": "chatbot"}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Send a chat message and get a response.
Set stream=false for non-streaming response.
"""
# Get or create conversation
conv_id = request.conversation_id or memory.create_conversation()
# Add user message to history
memory.add_message(conv_id, "user", request.message)
# Get conversation history
messages = memory.get_messages(conv_id, limit=20)
# Generate response
response = chat_engine.chat(messages)
# Save assistant response
memory.add_message(conv_id, "assistant", response)
return ChatResponse(
response=response,
conversation_id=conv_id
)
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
Stream a chat response using Server-Sent Events.
"""
conv_id = request.conversation_id or memory.create_conversation()
# Add user message
memory.add_message(conv_id, "user", request.message)
# Get history
messages = memory.get_messages(conv_id, limit=20)
async def generate():
full_response = ""
async for token in chat_engine.chat_stream(messages):
full_response += token
yield {"event": "token", "data": token}
# Save complete response
memory.add_message(conv_id, "assistant", full_response)
yield {
"event": "done",
"data": {"conversation_id": conv_id}
}
return EventSourceResponse(generate())
@app.get("/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Get conversation history."""
messages = memory.get_messages(conversation_id)
if not messages:
raise HTTPException(status_code=404, detail="Conversation not found")
return {
"conversation_id": conversation_id,
"messages": [
{"role": m.role, "content": m.content}
for m in messages
]
}
@app.delete("/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Delete a conversation."""
memory.delete_conversation(conversation_id)
return {"status": "deleted"}Understanding Server-Sent Events (SSE):
SSE: How Streaming Works Over HTTP
Traditional HTTP
Client sends request, waits for server to process, receives a single complete response, then the connection closes. The client must wait for the entire response before seeing anything.
Server-Sent Events (SSE)
RecommendedClient sends request, connection stays open, and server pushes multiple events: event: token data: "Hello", event: token data: " world", event: token data: "!", event: done data: conv_id. Each SSE message is separated by an empty line.
The Streaming Endpoint Flow:
/chat/stream Endpoint: Step by Step
| Endpoint | Method | Streaming | Use Case |
|---|---|---|---|
/chat | POST | No | Simple integrations, testing |
/chat/stream | POST | Yes (SSE) | Real-time UI, ChatGPT-like UX |
/conversations/{id} | GET | No | Load chat history |
/conversations/{id} | DELETE | No | Clear user data |
Step 5: Streamlit Frontend
"""
Streamlit chat interface.
"""
import streamlit as st
import requests
import json
API_URL = "http://localhost:8000"
st.set_page_config(
page_title="AI Chatbot",
page_icon="🤖",
layout="centered"
)
st.title("🤖 AI Chatbot")
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "conversation_id" not in st.session_state:
st.session_state.conversation_id = None
def stream_response(message: str):
"""Stream response from API."""
response = requests.post(
f"{API_URL}/chat/stream",
json={
"message": message,
"conversation_id": st.session_state.conversation_id
},
stream=True
)
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data:"):
data = line[5:].strip()
if data and not data.startswith("{"):
full_response += data
yield data
elif data.startswith("{"):
parsed = json.loads(data)
if "conversation_id" in parsed:
st.session_state.conversation_id = parsed["conversation_id"]
return full_response
# Display chat history
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# Chat input
if prompt := st.chat_input("Type your message..."):
# Add user message
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Stream assistant response
with st.chat_message("assistant"):
response_placeholder = st.empty()
full_response = ""
for token in stream_response(prompt):
full_response += token
response_placeholder.markdown(full_response + "▌")
response_placeholder.markdown(full_response)
# Save assistant response
st.session_state.messages.append({
"role": "assistant",
"content": full_response
})
# Sidebar
with st.sidebar:
st.header("Settings")
if st.button("New Conversation"):
st.session_state.messages = []
st.session_state.conversation_id = None
st.rerun()
if st.session_state.conversation_id:
st.caption(f"ID: {st.session_state.conversation_id[:8]}...")Understanding the Streamlit Chat UI:
Streamlit Session State: Why It Matters
Without session_state
Streamlit reruns the entire script on every interaction. User types message → script reruns → messages = [] → all chat history is lost.
With session_state
RecommendedUser types message → script reruns → st.session_state.messages persists across reruns. Session state stores: messages (list of all chat messages) and conversation_id (links to backend storage).
Streaming Response Display:
How the Typing Effect Works
| Component | Purpose | Why |
|---|---|---|
st.session_state | Persist data across reruns | Streamlit reruns script on every action |
st.empty() | Updatable placeholder | Allows replacing content as tokens arrive |
st.chat_message() | Message bubble styling | Provides chat-like appearance |
st.chat_input() | Chat input box | Fixed at bottom, auto-focus |
Running the Application
# Terminal 1: Start the API
export OPENAI_API_KEY="your-key"
uvicorn src.api:app --reload --port 8000
# Terminal 2: Start the frontend
streamlit run frontend/app.pyTesting
# Test non-streaming
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Hello! How are you?", "stream": false}'
# Test streaming (will show SSE events)
curl -X POST http://localhost:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "Tell me a short joke"}'Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Streaming | Send tokens as they're generated | Better UX - user sees response forming |
| SSE | Server-Sent Events protocol | Push data from server without polling |
| Conversation Memory | Store message history | Enables multi-turn conversations |
| Context Window | Max tokens LLM can see | Must limit history to fit |
| AsyncIterator | Python async generator | Enables efficient token streaming |
| Conversation ID | Unique session identifier | Links messages together |
Next Steps
- Text Summarization - Summarize documents
- Structured Extraction - Extract data from text