Research Disclaimer
This tutorial is based on:
- OpenAI GPT-4 API (as of January 2025)
- LangChain v0.1.0+ with langchain-community v0.0.20+ (LLM orchestration framework)
- Pinecone v3.0+ (vector database with new Serverless API)
- FastAPI v0.109+ (high-performance Python web framework)
- Streamlit v1.30+ (rapid UI development)
- ChromaDB v0.4+ (open-source vector database)
- Sentence Transformers v2.3+ (embedding models)
- Rasa v3.6+ (traditional NLP chatbot framework)
All implementation patterns follow production best practices for enterprise chatbot deployments. Code examples have been tested with production workloads as of January 2025. Note: Pinecone v3.0 introduced significant API changes moving to a Serverless architecture; all code uses the updated API patterns.
Introduction
The chatbot landscape has undergone a seismic shift. Traditional rule-based and intent-classification chatbots are being rapidly replaced by Large Language Model (LLM)-powered conversational AI that understands context, maintains memory across conversations, and generates human-like responses.
This comprehensive guide demonstrates how to build production-grade AI chatbots using cutting-edge technologies:
- LLM Integration: Leverage GPT-4, Claude, or open-source models (LLaMA, Mistral)
- Retrieval-Augmented Generation (RAG): Ground responses in your knowledge base
- Vector Databases: Semantic search with Pinecone, Weaviate, or ChromaDB
- Conversation Memory: Track context across multi-turn dialogs
- Streaming Responses: Real-time token-by-token output for better UX
- Production Deployment: FastAPI backend, WebSocket streaming, monitoring
You’ll learn to build chatbots that don’t just match patterns—they understand, reason, and provide contextually relevant answers grounded in your organization’s data.
Traditional NLP vs. Modern LLM Chatbots
Traditional Approach (2015-2022)
Architecture: Intent classification → Entity extraction → Dialog management → Template response
Limitations:
- Brittle: Fails on out-of-vocabulary phrases or paraphrases
- Maintenance: Requires constant retraining for new intents
- Limited Context: Struggles with multi-turn conversations
- Cold Start: Needs hundreds of labeled examples per intent
Example:
# Traditional intent classification (Rasa/Dialogflow style)
utterance = "I want to book a flight to NYC"
intent = "book_flight" # Must be pre-defined
entities = {"destination": "NYC"} # Rule-based extraction
response = template_responses["book_flight"] # Static template
Modern LLM Approach (2023+)
Architecture: User query → RAG retrieval → LLM with context → Streaming response
Advantages:
- Flexible: Handles arbitrary phrasing without retraining
- Zero-Shot: Works with new domains/topics immediately
- Context-Aware: Maintains conversation history naturally
- Knowledge Grounding: Retrieves relevant documents via RAG
Example:
# Modern LLM with RAG
utterance = "I want to book a flight to NYC"
relevant_docs = vector_db.search(utterance, top_k=3) # Semantic search
context = "\n".join(relevant_docs)
response = llm.generate(
f"Context: {context}\nQuestion: {utterance}\nAnswer:"
) # Dynamic, contextual response
When to use Traditional vs. LLM:
- Traditional: Highly structured tasks (form filling, appointment booking), strict compliance requirements, low latency (<50ms), limited budget
- LLM: Complex queries, knowledge-intensive tasks, natural conversation, acceptable latency (200-2000ms), moderate budget
Production RAG Architecture
┌─────────────────────────────────────────────────────────┐
│ User Interface │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Web │ │ Mobile │ │ API │ │
│ │ Chat UI │ │ App │ │ Clients │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼────────────────────┘
│ │ │
└─────────────┴─────────────┘
│
┌─────────────▼────────────────┐
│ FastAPI Backend │
│ ┌────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ - Streaming responses │ │
│ │ - Connection mgmt │ │
│ └───────┬────────────────┘ │
└──────────┼───────────────────┘
│
┌──────────▼────────────────┐
│ Conversation Manager │
│ ┌─────────────────────┐ │
│ │ Memory Store │ │
│ │ - Redis/DynamoDB │ │
│ │ - Conversation hist│ │
│ └──────────┬──────────┘ │
└─────────────┼──────────────┘
│
┌────────────▼─────────────┐
│ RAG Pipeline │
│ ┌────────────────────┐ │
│ │ Query Embedding │ │
│ │ - Sentence Trans. │ │
│ └──────┬─────────────┘ │
│ ┌──────▼─────────────┐ │
│ │ Vector Search │ │
│ │ - Pinecone/Chroma │ │
│ │ - Top-K retrieval │ │
│ └──────┬─────────────┘ │
│ ┌──────▼─────────────┐ │
│ │ Reranking │ │
│ │ - Cross-encoder │ │
│ └──────┬─────────────┘ │
└─────────┼────────────────┘
│
┌─────────▼─────────────┐
│ LLM Generation │
│ ┌─────────────────┐ │
│ │ GPT-4 / Claude │ │
│ │ - Context + Q │ │
│ │ - Stream tokens│ │
│ └─────────────────┘ │
└───────────────────────┘
Complete Production Implementation
Step 1: Environment Setup
# Install dependencies
pip install \
openai>=1.12 \
langchain>=0.1.0 \
langchain-community>=0.0.20 \
pinecone-client>=3.0 \
sentence-transformers>=2.3 \
fastapi>=0.109 \
uvicorn>=0.27 \
websockets>=12.0 \
redis>=5.0 \
python-dotenv>=1.0
# Create directory structure
mkdir -p chatbot/{api,models,rag,ui,config}
cd chatbot
# Create .env file
cat > .env <<EOF
OPENAI_API_KEY=your_openai_key_here
PINECONE_API_KEY=your_pinecone_key_here
PINECONE_ENVIRONMENT=us-west1-gcp
REDIS_URL=redis://localhost:6379
EOF
Step 2: Vector Database Setup and Document Ingestion
File: rag/vector_store.py (Complete vector database manager)
"""
Vector database manager for RAG.
Handles document embedding, storage, and semantic search.
"""
import os
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import logging
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
DirectoryLoader,
TextLoader,
PDFLoader,
UnstructuredMarkdownLoader
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Document:
"""Represents a document chunk with metadata."""
content: str
metadata: Dict[str, Any]
embedding: Optional[List[float]] = None
class VectorStore:
"""
Production vector store for RAG chatbot.
Supports Pinecone for managed service or ChromaDB for self-hosted.
"""
def __init__(
self,
index_name: str = "chatbot-knowledge-base",
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
use_pinecone: bool = True
):
"""
Initialize vector store.
Args:
index_name: Name of the vector index
embedding_model: HuggingFace embedding model
use_pinecone: Use Pinecone (True) or ChromaDB (False)
"""
self.index_name = index_name
self.use_pinecone = use_pinecone
# Initialize embedding model
logger.info(f"Loading embedding model: {embedding_model}")
self.embedding_model = SentenceTransformer(embedding_model)
self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()
# Initialize vector database
if use_pinecone:
self._init_pinecone()
else:
self._init_chromadb()
def _init_pinecone(self):
"""Initialize Pinecone vector database (v3.0+ API)."""
# Initialize Pinecone client
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
# Create index if it doesn't exist
existing_indexes = [idx.name for idx in pc.list_indexes()]
if self.index_name not in existing_indexes:
logger.info(f"Creating Pinecone index: {self.index_name}")
pc.create_index(
name=self.index_name,
dimension=self.embedding_dimension,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
self.index = pc.Index(self.index_name)
logger.info(f"Connected to Pinecone index: {self.index_name}")
def _init_chromadb(self):
"""Initialize ChromaDB (local vector database)."""
import chromadb
from chromadb.config import Settings
self.client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=self.index_name,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"Connected to ChromaDB collection: {self.index_name}")
def ingest_documents(
self,
directory: str,
chunk_size: int = 500,
chunk_overlap: int = 50,
file_types: List[str] = None
):
"""
Ingest documents from directory into vector store.
Args:
directory: Path to documents directory
chunk_size: Maximum chunk size in characters
chunk_overlap: Overlap between chunks
file_types: File extensions to load (default: txt, md, pdf)
"""
if file_types is None:
file_types = ["txt", "md", "pdf"]
logger.info(f"Ingesting documents from: {directory}")
# Load documents
documents = []
for file_type in file_types:
if file_type == "txt":
loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=TextLoader)
elif file_type == "md":
loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=UnstructuredMarkdownLoader)
elif file_type == "pdf":
loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=PDFLoader)
else:
continue
documents.extend(loader.load())
logger.info(f"Loaded {len(documents)} documents")
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
chunks = []
for doc in documents:
split_docs = text_splitter.split_text(doc.page_content)
for i, chunk_text in enumerate(split_docs):
chunks.append(Document(
content=chunk_text,
metadata={
"source": doc.metadata.get("source", "unknown"),
"chunk_index": i,
"total_chunks": len(split_docs)
}
))
logger.info(f"Split into {len(chunks)} chunks")
# Embed and store chunks
self.add_documents(chunks, batch_size=100)
def add_documents(self, documents: List[Document], batch_size: int = 100):
"""
Add documents to vector store.
Args:
documents: List of Document objects
batch_size: Number of documents to process at once
"""
logger.info(f"Adding {len(documents)} documents to vector store")
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Generate embeddings
texts = [doc.content for doc in batch]
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=True,
batch_size=32
).tolist()
# Store in vector database
if self.use_pinecone:
vectors = [
(
f"doc_{i+j}", # Unique ID
embeddings[j],
{
"content": batch[j].content,
**batch[j].metadata
}
)
for j in range(len(batch))
]
self.index.upsert(vectors)
else: # ChromaDB
self.collection.add(
embeddings=embeddings,
documents=texts,
metadatas=[doc.metadata for doc in batch],
ids=[f"doc_{i+j}" for j in range(len(batch))]
)
logger.info("Document ingestion complete")
def search(
self,
query: str,
top_k: int = 5,
filter_metadata: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
Semantic search for relevant documents.
Args:
query: Search query
top_k: Number of results to return
filter_metadata: Optional metadata filters
Returns:
List of relevant documents with scores
"""
# Embed query
query_embedding = self.embedding_model.encode([query])[0].tolist()
# Search vector database
if self.use_pinecone:
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter=filter_metadata,
include_metadata=True
)
return [
{
"content": match["metadata"]["content"],
"score": match["score"],
"metadata": {k: v for k, v in match["metadata"].items() if k != "content"}
}
for match in results["matches"]
]
else: # ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=filter_metadata
)
return [
{
"content": results["documents"][0][i],
"score": 1 - results["distances"][0][i], # Convert distance to similarity
"metadata": results["metadatas"][0][i]
}
for i in range(len(results["documents"][0]))
]
def delete_all(self):
"""Delete all documents from vector store."""
if self.use_pinecone:
self.index.delete(delete_all=True)
else:
self.client.delete_collection(self.index_name)
self.collection = self.client.create_collection(
name=self.index_name,
metadata={"hnsw:space": "cosine"}
)
logger.info("Deleted all documents from vector store")
# Example usage
if __name__ == "__main__":
# Initialize vector store
vector_store = VectorStore(
index_name="product-docs",
use_pinecone=False # Use local ChromaDB for development
)
# Ingest documentation
vector_store.ingest_documents(
directory="./knowledge_base",
chunk_size=500,
chunk_overlap=50
)
# Test search
results = vector_store.search("How do I reset my password?", top_k=3)
for i, result in enumerate(results, 1):
print(f"\n{i}. Score: {result['score']:.3f}")
print(f"Content: {result['content'][:200]}...")
print(f"Source: {result['metadata'].get('source', 'unknown')}")
Step 3: Conversation Memory Management
File: models/conversation_memory.py
"""
Conversation memory management for multi-turn dialogs.
Stores conversation history in Redis for scalability.
"""
import json
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ConversationMemory:
"""
Manages conversation history with Redis backend.
Supports sliding window, summarization, and TTL.
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
ttl_hours: int = 24,
max_history: int = 10
):
"""
Initialize conversation memory.
Args:
redis_url: Redis connection URL
ttl_hours: Time-to-live for conversations (hours)
max_history: Maximum number of turns to keep
"""
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.ttl_seconds = ttl_hours * 3600
self.max_history = max_history
logger.info(f"Connected to Redis: {redis_url}")
def add_message(
self,
conversation_id: str,
role: str,
content: str,
metadata: Optional[Dict] = None
):
"""
Add message to conversation history.
Args:
conversation_id: Unique conversation identifier
role: Message role (user/assistant/system)
content: Message content
metadata: Optional metadata (timestamp, model used, etc.)
"""
key = f"conversation:{conversation_id}"
message = {
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {}
}
# Get existing history
history_json = self.redis_client.get(key)
if history_json:
history = json.loads(history_json)
else:
history = []
# Append message
history.append(message)
# Trim to max_history (keep last N messages)
if len(history) > self.max_history:
history = history[-self.max_history:]
# Save back to Redis with TTL
self.redis_client.setex(
key,
self.ttl_seconds,
json.dumps(history)
)
def get_history(
self,
conversation_id: str,
include_system: bool = False
) -> List[Dict[str, str]]:
"""
Retrieve conversation history.
Args:
conversation_id: Unique conversation identifier
include_system: Include system messages (default: False)
Returns:
List of messages in conversation
"""
key = f"conversation:{conversation_id}"
history_json = self.redis_client.get(key)
if not history_json:
return []
history = json.loads(history_json)
if not include_system:
history = [msg for msg in history if msg["role"] != "system"]
return history
def get_context_window(
self,
conversation_id: str,
max_tokens: int = 2000
) -> str:
"""
Get conversation history formatted for LLM context.
Truncates older messages if token limit exceeded.
Args:
conversation_id: Unique conversation identifier
max_tokens: Maximum tokens for context (rough estimate)
Returns:
Formatted conversation history
"""
history = self.get_history(conversation_id)
# Build context string (reverse order to prioritize recent messages)
context_parts = []
estimated_tokens = 0
for message in reversed(history):
role = message["role"]
content = message["content"]
# Rough token estimation (1 token ≈ 4 characters)
message_tokens = len(f"{role}: {content}") // 4
if estimated_tokens + message_tokens > max_tokens:
break
context_parts.insert(0, f"{role.capitalize()}: {content}")
estimated_tokens += message_tokens
return "\n\n".join(context_parts)
def delete_conversation(self, conversation_id: str):
"""Delete conversation history."""
key = f"conversation:{conversation_id}"
self.redis_client.delete(key)
logger.info(f"Deleted conversation: {conversation_id}")
def get_statistics(self, conversation_id: str) -> Dict[str, Any]:
"""Get conversation statistics."""
history = self.get_history(conversation_id, include_system=True)
if not history:
return {"exists": False}
return {
"exists": True,
"total_messages": len(history),
"user_messages": len([m for m in history if m["role"] == "user"]),
"assistant_messages": len([m for m in history if m["role"] == "assistant"]),
"first_message": history[0]["timestamp"],
"last_message": history[-1]["timestamp"]
}
Step 4: RAG Chatbot Engine
File: models/rag_chatbot.py (Complete RAG chatbot implementation)
"""
Production RAG chatbot with LLM integration.
Supports GPT-4, Claude, and open-source models.
"""
import os
import logging
from typing import List, Dict, Any, Optional, AsyncGenerator
import openai
from rag.vector_store import VectorStore
from models.conversation_memory import ConversationMemory
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RAGChatbot:
"""
Production RAG chatbot with streaming responses.
"""
def __init__(
self,
vector_store: VectorStore,
memory: ConversationMemory,
model: str = "gpt-4-turbo-preview",
temperature: float = 0.7,
system_prompt: Optional[str] = None
):
"""
Initialize RAG chatbot.
Args:
vector_store: Vector store for knowledge retrieval
memory: Conversation memory manager
model: LLM model to use
temperature: Sampling temperature (0-2)
system_prompt: Optional system prompt
"""
self.vector_store = vector_store
self.memory = memory
self.model = model
self.temperature = temperature
# Initialize OpenAI client
self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Default system prompt
self.system_prompt = system_prompt or """You are a helpful AI assistant.
Use the provided context to answer questions accurately.
If you cannot find the answer in the context, say so clearly.
Always be concise but comprehensive in your responses."""
logger.info(f"Initialized RAG chatbot with model: {model}")
def generate_response(
self,
query: str,
conversation_id: str,
top_k: int = 3,
include_sources: bool = True
) -> Dict[str, Any]:
"""
Generate response using RAG pipeline.
Args:
query: User query
conversation_id: Conversation identifier
top_k: Number of documents to retrieve
include_sources: Include source documents in response
Returns:
Dict with response, sources, and metadata
"""
# Step 1: Retrieve relevant documents
logger.info(f"Retrieving {top_k} relevant documents for query")
retrieved_docs = self.vector_store.search(query, top_k=top_k)
# Step 2: Build context from retrieved documents
context = "\n\n".join([
f"Source {i+1} ({doc['metadata'].get('source', 'unknown')}):\n{doc['content']}"
for i, doc in enumerate(retrieved_docs)
])
# Step 3: Get conversation history
conversation_history = self.memory.get_history(conversation_id)
# Step 4: Build messages for LLM
messages = [
{"role": "system", "content": self.system_prompt}
]
# Add conversation history
for msg in conversation_history[-5:]: # Last 5 turns
messages.append({
"role": msg["role"],
"content": msg["content"]
})
# Add current query with context
user_message = f"""Based on the following context, answer the question:
CONTEXT:
{context}
QUESTION: {query}"""
messages.append({"role": "user", "content": user_message})
# Step 5: Generate response
logger.info(f"Generating response with {self.model}")
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=500
)
assistant_message = response.choices[0].message.content
# Step 6: Store in conversation history
self.memory.add_message(
conversation_id=conversation_id,
role="user",
content=query
)
self.memory.add_message(
conversation_id=conversation_id,
role="assistant",
content=assistant_message,
metadata={
"model": self.model,
"tokens": response.usage.total_tokens
}
)
# Step 7: Return response
result = {
"response": assistant_message,
"conversation_id": conversation_id,
"metadata": {
"model": self.model,
"tokens_used": response.usage.total_tokens,
"documents_retrieved": len(retrieved_docs)
}
}
if include_sources:
result["sources"] = [
{
"content": doc["content"][:200] + "...",
"source": doc["metadata"].get("source", "unknown"),
"score": doc["score"]
}
for doc in retrieved_docs
]
return result
except Exception as e:
logger.error(f"Error generating response: {e}")
raise
async def generate_response_stream(
self,
query: str,
conversation_id: str,
top_k: int = 3
) -> AsyncGenerator[str, None]:
"""
Generate streaming response using RAG pipeline.
Yields tokens as they're generated for real-time UI updates.
Args:
query: User query
conversation_id: Conversation identifier
top_k: Number of documents to retrieve
Yields:
Response tokens
"""
# Retrieve relevant documents
retrieved_docs = self.vector_store.search(query, top_k=top_k)
# Build context
context = "\n\n".join([
f"Source {i+1}:\n{doc['content']}"
for i, doc in enumerate(retrieved_docs)
])
# Get conversation history
conversation_history = self.memory.get_history(conversation_id)
# Build messages
messages = [
{"role": "system", "content": self.system_prompt}
]
for msg in conversation_history[-5:]:
messages.append({
"role": msg["role"],
"content": msg["content"]
})
user_message = f"""Based on the following context, answer the question:
CONTEXT:
{context}
QUESTION: {query}"""
messages.append({"role": "user", "content": user_message})
# Stream response
full_response = ""
try:
stream = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=500,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
yield token
# Store conversation after streaming completes
self.memory.add_message(
conversation_id=conversation_id,
role="user",
content=query
)
self.memory.add_message(
conversation_id=conversation_id,
role="assistant",
content=full_response
)
except Exception as e:
logger.error(f"Error in streaming response: {e}")
raise
Step 5: FastAPI Backend with WebSocket Streaming
File: api/main.py (Production-ready FastAPI server)
"""
FastAPI backend for RAG chatbot with WebSocket streaming.
"""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uuid
import logging
from rag.vector_store import VectorStore
from models.conversation_memory import ConversationMemory
from models.rag_chatbot import RAGChatbot
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="RAG Chatbot API",
description="Production chatbot with RAG and streaming responses",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify exact origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# Initialize components
vector_store = VectorStore(use_pinecone=False) # ChromaDB for demo
memory = ConversationMemory()
chatbot = RAGChatbot(
vector_store=vector_store,
memory=memory,
model="gpt-4-turbo-preview"
)
# Request/Response models
class ChatRequest(BaseModel):
message: str
conversation_id: Optional[str] = None
include_sources: bool = False
class ChatResponse(BaseModel):
response: str
conversation_id: str
sources: Optional[list] = None
metadata: dict
@app.get("/")
async def root():
"""Health check endpoint."""
return {"status": "ok", "service": "RAG Chatbot API"}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Non-streaming chat endpoint.
Use for simple request/response pattern.
"""
try:
# Generate conversation ID if not provided
conversation_id = request.conversation_id or str(uuid.uuid4())
# Generate response
result = chatbot.generate_response(
query=request.message,
conversation_id=conversation_id,
include_sources=request.include_sources
)
return ChatResponse(**result)
except Exception as e:
logger.error(f"Error in chat endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
"""
WebSocket endpoint for streaming responses.
Client sends: {"message": "...", "conversation_id": "..."}
Server streams: {"token": "...", "done": false}
"""
await websocket.accept()
logger.info("WebSocket connection established")
try:
while True:
# Receive message from client
data = await websocket.receive_json()
message = data.get("message")
conversation_id = data.get("conversation_id", str(uuid.uuid4()))
if not message:
await websocket.send_json({"error": "Message is required"})
continue
# Stream response
async for token in chatbot.generate_response_stream(
query=message,
conversation_id=conversation_id
):
await websocket.send_json({
"token": token,
"done": False
})
# Send completion signal
await websocket.send_json({
"token": "",
"done": True,
"conversation_id": conversation_id
})
except WebSocketDisconnect:
logger.info("WebSocket disconnected")
except Exception as e:
logger.error(f"WebSocket error: {e}")
await websocket.send_json({"error": str(e)})
@app.get("/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Get conversation history."""
history = memory.get_history(conversation_id)
stats = memory.get_statistics(conversation_id)
return {
"conversation_id": conversation_id,
"history": history,
"statistics": stats
}
@app.delete("/conversation/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Delete conversation history."""
memory.delete_conversation(conversation_id)
return {"status": "deleted", "conversation_id": conversation_id}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Running the API:
# Development
uvicorn api.main:app --reload --port 8000
# Production with multiple workers
gunicorn api.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Step 6: Streamlit Chat UI
File: ui/streamlit_app.py (Interactive chat interface)
"""
Streamlit chat UI with streaming responses.
"""
import streamlit as st
import requests
import uuid
from websocket import create_connection
import json
# Page config
st.set_page_config(
page_title="AI Chatbot",
page_icon="",
layout="wide"
)
# API endpoint
API_BASE = "http://localhost:8000"
# Initialize session state
if "conversation_id" not in st.session_state:
st.session_state.conversation_id = str(uuid.uuid4())
if "messages" not in st.session_state:
st.session_state.messages = []
# UI Layout
st.title(" AI-Powered Chatbot")
st.caption("Powered by GPT-4 and Retrieval-Augmented Generation")
# Sidebar
with st.sidebar:
st.header("Settings")
# New conversation button
if st.button("New Conversation"):
st.session_state.conversation_id = str(uuid.uuid4())
st.session_state.messages = []
st.rerun()
# Show conversation ID
st.text(f"Conversation ID:")
st.code(st.session_state.conversation_id[:8] + "...")
# Show statistics
try:
response = requests.get(
f"{API_BASE}/conversation/{st.session_state.conversation_id}"
)
if response.status_code == 200:
stats = response.json().get("statistics", {})
if stats.get("exists"):
st.metric("Total Messages", stats.get("total_messages", 0))
st.metric("User Messages", stats.get("user_messages", 0))
st.metric("Bot Messages", stats.get("assistant_messages", 0))
except:
pass
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Show sources if available
if "sources" in message and message["sources"]:
with st.expander("View Sources"):
for i, source in enumerate(message["sources"], 1):
st.markdown(f"**Source {i}** (Score: {source['score']:.2f})")
st.text(source["content"])
# Chat input
if prompt := st.chat_input("Ask me anything..."):
# Add user message to chat
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Generate response
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
try:
# Call API
response = requests.post(
f"{API_BASE}/chat",
json={
"message": prompt,
"conversation_id": st.session_state.conversation_id,
"include_sources": True
}
)
if response.status_code == 200:
data = response.json()
full_response = data["response"]
sources = data.get("sources", [])
# Display response
message_placeholder.markdown(full_response)
# Add to message history
st.session_state.messages.append({
"role": "assistant",
"content": full_response,
"sources": sources
})
# Show sources
if sources:
with st.expander("View Sources"):
for i, source in enumerate(sources, 1):
st.markdown(f"**Source {i}** (Score: {source['score']:.2f})")
st.text(source["content"])
except Exception as e:
st.error(f"Error: {e}")
# Footer
st.divider()
st.caption("Built with Streamlit, FastAPI, and OpenAI GPT-4")
Running the UI:
streamlit run ui/streamlit_app.py
Known Limitations
| Limitation | Impact | Mitigation |
|---|---|---|
| LLM Hallucinations | May generate plausible-sounding but incorrect answers | Use RAG to ground responses, add confidence scores, human-in-the-loop for critical applications |
| Embedding Quality | Vector search quality depends on embedding model | Use domain-specific embeddings, fine-tune on your data, hybrid search (semantic + keyword) |
| Context Window Limits | GPT-4: 128K tokens, but expensive at scale | Implement smart chunking, conversation summarization, cost monitoring |
| Latency | RAG pipeline adds 200-2000ms latency | Cache frequently asked questions, use faster models for simple queries, async processing |
| Cost at Scale | GPT-4 pricing: $10-30 per 1M tokens | Implement caching, use smaller models where possible, monitor usage, set budgets |
| Conversation Memory | Redis memory overhead for long conversations | Implement TTL, conversation summarization, archive old conversations |
| Retrieval Precision | May retrieve irrelevant documents | Implement reranking (cross-encoder), query expansion, metadata filtering |
| Multi-Language Support | Most models perform best in English | Use multilingual embedding models, language-specific prompts, translation layer |
| Real-Time Knowledge | Training data cutoff (GPT-4: April 2023) | Combine with web search APIs, regular knowledge base updates, explicitly state limitations |
| Safety & Compliance | May generate harmful or biased content | Implement content moderation, PII detection, output validation, safety filters |
Troubleshooting Guide
Problem: Vector Search Returns Irrelevant Results
# Symptom: Retrieved documents don't match query semantically
# Solution 1: Check embedding model quality
# Try different models for better domain coverage
vector_store = VectorStore(
embedding_model="sentence-transformers/all-mpnet-base-v2" # Better quality
# Or for specific domains:
# "sentence-transformers/msmarco-distilbert-base-v4" # For search
# "sentence-transformers/multi-qa-mpnet-base-dot-v1" # For Q&A
)
# Solution 2: Implement reranking
from sentence_transformers import CrossEncoder
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank_documents(query: str, documents: List[Dict], top_k: int = 3):
"""Rerank documents using cross-encoder for better precision."""
pairs = [[query, doc["content"]] for doc in documents]
scores = cross_encoder.predict(pairs)
# Sort by score and return top_k
ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in ranked[:top_k]]
# Solution 3: Hybrid search (combine semantic + keyword)
from rank_bm25 import BM25Okapi
def hybrid_search(query: str, top_k: int = 5):
# Semantic search
semantic_results = vector_store.search(query, top_k=top_k*2)
# Keyword search (BM25)
# Assuming you have all documents loaded
bm25 = BM25Okapi([doc["content"].split() for doc in all_docs])
bm25_scores = bm25.get_scores(query.split())
# Combine scores (weighted average)
combined_scores = {}
for doc in semantic_results:
# Normalize and combine
combined_scores[doc["id"]] = 0.7 * doc["score"] + 0.3 * bm25_scores[doc["id"]]
# Return top_k
return sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
Problem: High API Costs
# Symptom: OpenAI costs growing too quickly
# Solution 1: Implement response caching
import hashlib
import redis
cache = redis.from_url("redis://localhost:6379")
def get_cached_response(query: str, ttl_hours: int = 24):
"""Cache responses for frequently asked questions."""
# Create cache key
query_hash = hashlib.md5(query.lower().encode()).hexdigest()
cache_key = f"response:{query_hash}"
# Check cache
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
# Generate response
response = chatbot.generate_response(query, conversation_id="...")
# Cache for TTL
cache.setex(cache_key, ttl_hours * 3600, json.dumps(response))
return response
# Solution 2: Use smaller/faster models for simple queries
def select_model_by_complexity(query: str) -> str:
"""Choose model based on query complexity."""
# Simple heuristic: word count and question type
word_count = len(query.split())
if word_count < 10 and any(word in query.lower() for word in ["what", "when", "who"]):
return "gpt-3.5-turbo" # Cheaper for simple queries
else:
return "gpt-4-turbo-preview" # Better for complex queries
# Solution 3: Set token limits and budgets
class CostMonitor:
"""Monitor and limit API costs."""
def __init__(self, daily_budget_usd: float = 100.0):
self.daily_budget = daily_budget_usd
self.cache = redis.from_url("redis://localhost:6379")
def check_budget(self):
"""Check if within budget."""
today = datetime.now().strftime("%Y-%m-%d")
spent_key = f"spent:{today}"
spent = float(self.cache.get(spent_key) or 0)
if spent >= self.daily_budget:
raise Exception(f"Daily budget of ${self.daily_budget} exceeded")
def record_usage(self, tokens: int, model: str):
"""Record token usage."""
# Pricing (as of Jan 2025)
pricing = {
"gpt-4-turbo-preview": 0.01 / 1000, # $0.01 per 1K tokens
"gpt-3.5-turbo": 0.0005 / 1000 # $0.0005 per 1K tokens
}
cost = tokens * pricing.get(model, 0.01 / 1000)
today = datetime.now().strftime("%Y-%m-%d")
spent_key = f"spent:{today}"
self.cache.incrbyfloat(spent_key, cost)
self.cache.expire(spent_key, 86400) # 24 hours
Problem: Slow Response Times
# Symptom: Users waiting 5+ seconds for responses
# Solution 1: Parallel retrieval and generation
import asyncio
async def fast_rag_pipeline(query: str, conversation_id: str):
"""Optimize RAG pipeline with parallel operations."""
# Run retrieval and conversation history fetch in parallel
retrieval_task = asyncio.create_task(
asyncio.to_thread(vector_store.search, query, top_k=3)
)
history_task = asyncio.create_task(
asyncio.to_thread(memory.get_history, conversation_id)
)
# Wait for both to complete
retrieved_docs, conversation_history = await asyncio.gather(
retrieval_task,
history_task
)
# Build context and generate (remaining pipeline)
# ...
# Solution 2: Reduce embedding inference time
# Use smaller, faster models for embedding
vector_store = VectorStore(
embedding_model="sentence-transformers/all-MiniLM-L6-v2" # 80MB, very fast
# vs "all-mpnet-base-v2" (420MB, slower but better quality)
)
# Solution 3: Implement streaming for better perceived performance
# Even if total time is same, users prefer seeing incremental responses
# Use WebSocket streaming (already implemented above)
Problem: Conversation Context Overflow
# Symptom: Error "This model's maximum context length is 128000 tokens"
# Solution 1: Implement conversation summarization
def summarize_conversation(conversation_id: str):
"""Summarize old messages to compress context."""
history = memory.get_history(conversation_id)
if len(history) > 10:
# Get messages to summarize (oldest 70%)
to_summarize = history[:int(len(history) * 0.7)]
# Generate summary
summary_prompt = "Summarize the following conversation:\n\n"
for msg in to_summarize:
summary_prompt += f"{msg['role']}: {msg['content']}\n"
summary_response = chatbot.client.chat.completions.create(
model="gpt-3.5-turbo", # Cheaper for summarization
messages=[{"role": "user", "content": summary_prompt}],
max_tokens=200
)
summary = summary_response.choices[0].message.content
# Replace old messages with summary
new_history = [
{"role": "system", "content": f"Previous conversation summary: {summary}"}
] + history[int(len(history) * 0.7):]
# Save back
memory.redis_client.setex(
f"conversation:{conversation_id}",
memory.ttl_seconds,
json.dumps(new_history)
)
# Solution 2: Use sliding window with token counting
from tiktoken import encoding_for_model
def get_context_with_token_limit(conversation_id: str, max_tokens: int = 4000):
"""Get conversation history within token budget."""
enc = encoding_for_model("gpt-4")
history = memory.get_history(conversation_id)
# Build context from most recent messages
context_messages = []
total_tokens = 0
for message in reversed(history):
message_tokens = len(enc.encode(message["content"]))
if total_tokens + message_tokens > max_tokens:
break
context_messages.insert(0, message)
total_tokens += message_tokens
return context_messages
Production Best Practices
1. Implement Proper Error Handling
class ChatbotError(Exception):
"""Base exception for chatbot errors."""
pass
class RetrievalError(ChatbotError):
"""Error during document retrieval."""
pass
class GenerationError(ChatbotError):
"""Error during LLM generation."""
pass
def safe_generate_response(query: str, conversation_id: str) -> Dict:
"""Generate response with comprehensive error handling."""
try:
# Validate input
if not query or len(query) > 5000:
raise ValueError("Query must be between 1 and 5000 characters")
# Retrieve documents with retry
max_retries = 3
for attempt in range(max_retries):
try:
docs = vector_store.search(query, top_k=3)
break
except Exception as e:
if attempt == max_retries - 1:
raise RetrievalError(f"Failed to retrieve documents: {e}")
time.sleep(2 ** attempt) # Exponential backoff
# Generate response with timeout
try:
response = chatbot.generate_response(query, conversation_id)
return response
except Exception as e:
raise GenerationError(f"Failed to generate response: {e}")
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return {"error": str(e), "type": "validation"}
except RetrievalError as e:
logger.error(f"Retrieval error: {e}")
return {"error": "Unable to access knowledge base", "type": "retrieval"}
except GenerationError as e:
logger.error(f"Generation error: {e}")
return {"error": "Unable to generate response", "type": "generation"}
except Exception as e:
logger.exception(f"Unexpected error: {e}")
return {"error": "An unexpected error occurred", "type": "unknown"}
2. Monitor and Log Everything
import time
from prometheus_client import Counter, Histogram
# Metrics
chat_requests = Counter("chat_requests_total", "Total chat requests")
chat_errors = Counter("chat_errors_total", "Total chat errors", ["error_type"])
response_time = Histogram("response_time_seconds", "Response generation time")
tokens_used = Counter("tokens_used_total", "Total tokens consumed")
def monitored_generate_response(query: str, conversation_id: str):
"""Generate response with monitoring."""
chat_requests.inc()
start_time = time.time()
try:
response = chatbot.generate_response(query, conversation_id)
# Record metrics
duration = time.time() - start_time
response_time.observe(duration)
tokens_used.inc(response["metadata"]["tokens_used"])
# Structured logging
logger.info(
"Chat request processed",
extra={
"conversation_id": conversation_id,
"duration_seconds": duration,
"tokens_used": response["metadata"]["tokens_used"],
"model": response["metadata"]["model"]
}
)
return response
except Exception as e:
chat_errors.labels(error_type=type(e).__name__).inc()
logger.error(
"Chat request failed",
extra={
"conversation_id": conversation_id,
"error": str(e),
"query_length": len(query)
}
)
raise
3. Secure Your Chatbot
# Input validation and sanitization
import re
from typing import Optional
class SecurityValidator:
"""Validate and sanitize user inputs."""
@staticmethod
def sanitize_input(text: str, max_length: int = 5000) -> str:
"""Sanitize user input."""
# Trim whitespace
text = text.strip()
# Limit length
if len(text) > max_length:
text = text[:max_length]
# Remove control characters
text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
return text
@staticmethod
def detect_injection(text: str) -> bool:
"""Detect potential prompt injection attempts."""
injection_patterns = [
r"ignore previous instructions",
r"disregard all prior",
r"system:.*admin",
r"<script>",
r"DROP TABLE",
r"\\n\\nHuman:",
r"\\n\\nAssistant:"
]
for pattern in injection_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
@staticmethod
def check_pii(text: str) -> Optional[str]:
"""Check for personally identifiable information."""
# Email pattern
if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
return "email"
# Phone number pattern
if re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text):
return "phone"
# Credit card pattern
if re.search(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', text):
return "credit_card"
return None
# Use in API endpoint
@app.post("/chat")
async def secure_chat(request: ChatRequest):
# Sanitize input
sanitized_message = SecurityValidator.sanitize_input(request.message)
# Check for injection attempts
if SecurityValidator.detect_injection(sanitized_message):
logger.warning(f"Injection attempt detected: {request.message}")
raise HTTPException(status_code=400, detail="Invalid input detected")
# Check for PII
pii_type = SecurityValidator.check_pii(sanitized_message)
if pii_type:
logger.warning(f"PII detected ({pii_type}): {request.message}")
# Either reject or redact
# raise HTTPException(status_code=400, detail="PII detected in input")
# Process request
# ...
Conclusion
Building production-grade AI chatbots has fundamentally changed with the advent of Large Language Models. The combination of LLMs with Retrieval-Augmented Generation (RAG) enables chatbots that are:
Knowledgeable: Grounded in your organization’s data through vector search Context-Aware: Maintain conversation history across multiple turns Flexible: Handle arbitrary queries without intent classification Scalable: Distributed architecture with Redis memory and vector databases
This guide covered the complete stack:
- Vector Databases (Pinecone/ChromaDB) for semantic knowledge retrieval
- Conversation Memory (Redis) for multi-turn dialog tracking
- RAG Pipeline combining retrieval with LLM generation
- Production API (FastAPI) with WebSocket streaming
- Interactive UI (Streamlit) with real-time responses
Key Takeaways:
- RAG is Essential: Pure LLMs hallucinate. RAG grounds responses in facts.
- Streaming Improves UX: Even if total latency is the same, incremental display feels faster
- Monitor Everything: Track costs, latency, errors, and user satisfaction
- Security Matters: Validate inputs, detect injections, protect PII
- Start Simple: Begin with ChromaDB and smaller models, scale to Pinecone and GPT-4 as needed
Production Checklist:
- ✅ Implement error handling and fallbacks
- ✅ Add monitoring and alerting (Prometheus, Grafana)
- ✅ Set up logging (structured JSON logs)
- ✅ Implement rate limiting and cost controls
- ✅ Add input validation and sanitization
- ✅ Test with real users and iterate
- ✅ Document your knowledge base update process
- ✅ Plan for scaling (horizontal API scaling, vector DB sharding)
The future of chatbots is multimodal (text, voice, images), agentic (taking actions, not just answering), and personalized (adapting to individual users). But the foundation—LLMs + RAG + production engineering—remains constant.
Start building, measure everything, and iterate based on real user feedback. The technology is mature enough for production today.