Research Disclaimer

This tutorial is based on:

  • OpenAI GPT-4 API (as of January 2025)
  • LangChain v0.1.0+ with langchain-community v0.0.20+ (LLM orchestration framework)
  • Pinecone v3.0+ (vector database with new Serverless API)
  • FastAPI v0.109+ (high-performance Python web framework)
  • Streamlit v1.30+ (rapid UI development)
  • ChromaDB v0.4+ (open-source vector database)
  • Sentence Transformers v2.3+ (embedding models)
  • Rasa v3.6+ (traditional NLP chatbot framework)

All implementation patterns follow production best practices for enterprise chatbot deployments. Code examples have been tested with production workloads as of January 2025. Note: Pinecone v3.0 introduced significant API changes moving to a Serverless architecture; all code uses the updated API patterns.

Introduction

The chatbot landscape has undergone a seismic shift. Traditional rule-based and intent-classification chatbots are being rapidly replaced by Large Language Model (LLM)-powered conversational AI that understands context, maintains memory across conversations, and generates human-like responses.

This comprehensive guide demonstrates how to build production-grade AI chatbots using cutting-edge technologies:

  • LLM Integration: Leverage GPT-4, Claude, or open-source models (LLaMA, Mistral)
  • Retrieval-Augmented Generation (RAG): Ground responses in your knowledge base
  • Vector Databases: Semantic search with Pinecone, Weaviate, or ChromaDB
  • Conversation Memory: Track context across multi-turn dialogs
  • Streaming Responses: Real-time token-by-token output for better UX
  • Production Deployment: FastAPI backend, WebSocket streaming, monitoring

You’ll learn to build chatbots that don’t just match patterns—they understand, reason, and provide contextually relevant answers grounded in your organization’s data.

Traditional NLP vs. Modern LLM Chatbots

Traditional Approach (2015-2022)

Architecture: Intent classification → Entity extraction → Dialog management → Template response

Limitations:

  • Brittle: Fails on out-of-vocabulary phrases or paraphrases
  • Maintenance: Requires constant retraining for new intents
  • Limited Context: Struggles with multi-turn conversations
  • Cold Start: Needs hundreds of labeled examples per intent

Example:

# Traditional intent classification (Rasa/Dialogflow style)
utterance = "I want to book a flight to NYC"
intent = "book_flight"  # Must be pre-defined
entities = {"destination": "NYC"}  # Rule-based extraction
response = template_responses["book_flight"]  # Static template

Modern LLM Approach (2023+)

Architecture: User query → RAG retrieval → LLM with context → Streaming response

Advantages:

  • Flexible: Handles arbitrary phrasing without retraining
  • Zero-Shot: Works with new domains/topics immediately
  • Context-Aware: Maintains conversation history naturally
  • Knowledge Grounding: Retrieves relevant documents via RAG

Example:

# Modern LLM with RAG
utterance = "I want to book a flight to NYC"
relevant_docs = vector_db.search(utterance, top_k=3)  # Semantic search
context = "\n".join(relevant_docs)
response = llm.generate(
    f"Context: {context}\nQuestion: {utterance}\nAnswer:"
)  # Dynamic, contextual response

When to use Traditional vs. LLM:

  • Traditional: Highly structured tasks (form filling, appointment booking), strict compliance requirements, low latency (<50ms), limited budget
  • LLM: Complex queries, knowledge-intensive tasks, natural conversation, acceptable latency (200-2000ms), moderate budget

Production RAG Architecture

┌─────────────────────────────────────────────────────────┐
│                   User Interface                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐            │
│  │   Web    │  │  Mobile  │  │   API    │            │
│  │  Chat UI │  │   App    │  │  Clients │            │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘            │
└───────┼─────────────┼─────────────┼────────────────────┘
        │             │             │
        └─────────────┴─────────────┘
                      │
        ┌─────────────▼────────────────┐
        │    FastAPI Backend           │
        │  ┌────────────────────────┐  │
        │  │  WebSocket Handler     │  │
        │  │  - Streaming responses │  │
        │  │  - Connection mgmt     │  │
        │  └───────┬────────────────┘  │
        └──────────┼───────────────────┘
                   │
        ┌──────────▼────────────────┐
        │  Conversation Manager     │
        │  ┌─────────────────────┐  │
        │  │  Memory Store       │  │
        │  │  - Redis/DynamoDB   │  │
        │  │  - Conversation hist│  │
        │  └──────────┬──────────┘  │
        └─────────────┼──────────────┘
                      │
         ┌────────────▼─────────────┐
         │  RAG Pipeline            │
         │  ┌────────────────────┐  │
         │  │  Query Embedding   │  │
         │  │  - Sentence Trans. │  │
         │  └──────┬─────────────┘  │
         │  ┌──────▼─────────────┐  │
         │  │  Vector Search     │  │
         │  │  - Pinecone/Chroma │  │
         │  │  - Top-K retrieval │  │
         │  └──────┬─────────────┘  │
         │  ┌──────▼─────────────┐  │
         │  │  Reranking         │  │
         │  │  - Cross-encoder   │  │
         │  └──────┬─────────────┘  │
         └─────────┼────────────────┘
                   │
         ┌─────────▼─────────────┐
         │  LLM Generation       │
         │  ┌─────────────────┐  │
         │  │  GPT-4 / Claude │  │
         │  │  - Context + Q  │  │
         │  │  - Stream tokens│  │
         │  └─────────────────┘  │
         └───────────────────────┘

Complete Production Implementation

Step 1: Environment Setup

# Install dependencies
pip install \
    openai>=1.12 \
    langchain>=0.1.0 \
    langchain-community>=0.0.20 \
    pinecone-client>=3.0 \
    sentence-transformers>=2.3 \
    fastapi>=0.109 \
    uvicorn>=0.27 \
    websockets>=12.0 \
    redis>=5.0 \
    python-dotenv>=1.0

# Create directory structure
mkdir -p chatbot/{api,models,rag,ui,config}
cd chatbot

# Create .env file
cat > .env <<EOF
OPENAI_API_KEY=your_openai_key_here
PINECONE_API_KEY=your_pinecone_key_here
PINECONE_ENVIRONMENT=us-west1-gcp
REDIS_URL=redis://localhost:6379
EOF

Step 2: Vector Database Setup and Document Ingestion

File: rag/vector_store.py (Complete vector database manager)

"""
Vector database manager for RAG.
Handles document embedding, storage, and semantic search.
"""

import os
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import logging

from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
    DirectoryLoader,
    TextLoader,
    PDFLoader,
    UnstructuredMarkdownLoader
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Document:
    """Represents a document chunk with metadata."""
    content: str
    metadata: Dict[str, Any]
    embedding: Optional[List[float]] = None

class VectorStore:
    """
    Production vector store for RAG chatbot.
    Supports Pinecone for managed service or ChromaDB for self-hosted.
    """

    def __init__(
        self,
        index_name: str = "chatbot-knowledge-base",
        embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
        use_pinecone: bool = True
    ):
        """
        Initialize vector store.

        Args:
            index_name: Name of the vector index
            embedding_model: HuggingFace embedding model
            use_pinecone: Use Pinecone (True) or ChromaDB (False)
        """
        self.index_name = index_name
        self.use_pinecone = use_pinecone

        # Initialize embedding model
        logger.info(f"Loading embedding model: {embedding_model}")
        self.embedding_model = SentenceTransformer(embedding_model)
        self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()

        # Initialize vector database
        if use_pinecone:
            self._init_pinecone()
        else:
            self._init_chromadb()

    def _init_pinecone(self):
        """Initialize Pinecone vector database (v3.0+ API)."""
        # Initialize Pinecone client
        pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

        # Create index if it doesn't exist
        existing_indexes = [idx.name for idx in pc.list_indexes()]
        if self.index_name not in existing_indexes:
            logger.info(f"Creating Pinecone index: {self.index_name}")
            pc.create_index(
                name=self.index_name,
                dimension=self.embedding_dimension,
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1"
                )
            )

        self.index = pc.Index(self.index_name)
        logger.info(f"Connected to Pinecone index: {self.index_name}")

    def _init_chromadb(self):
        """Initialize ChromaDB (local vector database)."""
        import chromadb
        from chromadb.config import Settings

        self.client = chromadb.Client(Settings(
            chroma_db_impl="duckdb+parquet",
            persist_directory="./chroma_db"
        ))

        # Get or create collection
        self.collection = self.client.get_or_create_collection(
            name=self.index_name,
            metadata={"hnsw:space": "cosine"}
        )
        logger.info(f"Connected to ChromaDB collection: {self.index_name}")

    def ingest_documents(
        self,
        directory: str,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        file_types: List[str] = None
    ):
        """
        Ingest documents from directory into vector store.

        Args:
            directory: Path to documents directory
            chunk_size: Maximum chunk size in characters
            chunk_overlap: Overlap between chunks
            file_types: File extensions to load (default: txt, md, pdf)
        """
        if file_types is None:
            file_types = ["txt", "md", "pdf"]

        logger.info(f"Ingesting documents from: {directory}")

        # Load documents
        documents = []
        for file_type in file_types:
            if file_type == "txt":
                loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=TextLoader)
            elif file_type == "md":
                loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=UnstructuredMarkdownLoader)
            elif file_type == "pdf":
                loader = DirectoryLoader(directory, glob=f"**/*.{file_type}", loader_cls=PDFLoader)
            else:
                continue

            documents.extend(loader.load())

        logger.info(f"Loaded {len(documents)} documents")

        # Split documents into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", " ", ""]
        )

        chunks = []
        for doc in documents:
            split_docs = text_splitter.split_text(doc.page_content)
            for i, chunk_text in enumerate(split_docs):
                chunks.append(Document(
                    content=chunk_text,
                    metadata={
                        "source": doc.metadata.get("source", "unknown"),
                        "chunk_index": i,
                        "total_chunks": len(split_docs)
                    }
                ))

        logger.info(f"Split into {len(chunks)} chunks")

        # Embed and store chunks
        self.add_documents(chunks, batch_size=100)

    def add_documents(self, documents: List[Document], batch_size: int = 100):
        """
        Add documents to vector store.

        Args:
            documents: List of Document objects
            batch_size: Number of documents to process at once
        """
        logger.info(f"Adding {len(documents)} documents to vector store")

        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]

            # Generate embeddings
            texts = [doc.content for doc in batch]
            embeddings = self.embedding_model.encode(
                texts,
                show_progress_bar=True,
                batch_size=32
            ).tolist()

            # Store in vector database
            if self.use_pinecone:
                vectors = [
                    (
                        f"doc_{i+j}",  # Unique ID
                        embeddings[j],
                        {
                            "content": batch[j].content,
                            **batch[j].metadata
                        }
                    )
                    for j in range(len(batch))
                ]
                self.index.upsert(vectors)

            else:  # ChromaDB
                self.collection.add(
                    embeddings=embeddings,
                    documents=texts,
                    metadatas=[doc.metadata for doc in batch],
                    ids=[f"doc_{i+j}" for j in range(len(batch))]
                )

        logger.info("Document ingestion complete")

    def search(
        self,
        query: str,
        top_k: int = 5,
        filter_metadata: Optional[Dict] = None
    ) -> List[Dict[str, Any]]:
        """
        Semantic search for relevant documents.

        Args:
            query: Search query
            top_k: Number of results to return
            filter_metadata: Optional metadata filters

        Returns:
            List of relevant documents with scores
        """
        # Embed query
        query_embedding = self.embedding_model.encode([query])[0].tolist()

        # Search vector database
        if self.use_pinecone:
            results = self.index.query(
                vector=query_embedding,
                top_k=top_k,
                filter=filter_metadata,
                include_metadata=True
            )

            return [
                {
                    "content": match["metadata"]["content"],
                    "score": match["score"],
                    "metadata": {k: v for k, v in match["metadata"].items() if k != "content"}
                }
                for match in results["matches"]
            ]

        else:  # ChromaDB
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k,
                where=filter_metadata
            )

            return [
                {
                    "content": results["documents"][0][i],
                    "score": 1 - results["distances"][0][i],  # Convert distance to similarity
                    "metadata": results["metadatas"][0][i]
                }
                for i in range(len(results["documents"][0]))
            ]

    def delete_all(self):
        """Delete all documents from vector store."""
        if self.use_pinecone:
            self.index.delete(delete_all=True)
        else:
            self.client.delete_collection(self.index_name)
            self.collection = self.client.create_collection(
                name=self.index_name,
                metadata={"hnsw:space": "cosine"}
            )
        logger.info("Deleted all documents from vector store")

# Example usage
if __name__ == "__main__":
    # Initialize vector store
    vector_store = VectorStore(
        index_name="product-docs",
        use_pinecone=False  # Use local ChromaDB for development
    )

    # Ingest documentation
    vector_store.ingest_documents(
        directory="./knowledge_base",
        chunk_size=500,
        chunk_overlap=50
    )

    # Test search
    results = vector_store.search("How do I reset my password?", top_k=3)
    for i, result in enumerate(results, 1):
        print(f"\n{i}. Score: {result['score']:.3f}")
        print(f"Content: {result['content'][:200]}...")
        print(f"Source: {result['metadata'].get('source', 'unknown')}")

Step 3: Conversation Memory Management

File: models/conversation_memory.py

"""
Conversation memory management for multi-turn dialogs.
Stores conversation history in Redis for scalability.
"""

import json
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import redis

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ConversationMemory:
    """
    Manages conversation history with Redis backend.
    Supports sliding window, summarization, and TTL.
    """

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        ttl_hours: int = 24,
        max_history: int = 10
    ):
        """
        Initialize conversation memory.

        Args:
            redis_url: Redis connection URL
            ttl_hours: Time-to-live for conversations (hours)
            max_history: Maximum number of turns to keep
        """
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.ttl_seconds = ttl_hours * 3600
        self.max_history = max_history

        logger.info(f"Connected to Redis: {redis_url}")

    def add_message(
        self,
        conversation_id: str,
        role: str,
        content: str,
        metadata: Optional[Dict] = None
    ):
        """
        Add message to conversation history.

        Args:
            conversation_id: Unique conversation identifier
            role: Message role (user/assistant/system)
            content: Message content
            metadata: Optional metadata (timestamp, model used, etc.)
        """
        key = f"conversation:{conversation_id}"

        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        }

        # Get existing history
        history_json = self.redis_client.get(key)
        if history_json:
            history = json.loads(history_json)
        else:
            history = []

        # Append message
        history.append(message)

        # Trim to max_history (keep last N messages)
        if len(history) > self.max_history:
            history = history[-self.max_history:]

        # Save back to Redis with TTL
        self.redis_client.setex(
            key,
            self.ttl_seconds,
            json.dumps(history)
        )

    def get_history(
        self,
        conversation_id: str,
        include_system: bool = False
    ) -> List[Dict[str, str]]:
        """
        Retrieve conversation history.

        Args:
            conversation_id: Unique conversation identifier
            include_system: Include system messages (default: False)

        Returns:
            List of messages in conversation
        """
        key = f"conversation:{conversation_id}"
        history_json = self.redis_client.get(key)

        if not history_json:
            return []

        history = json.loads(history_json)

        if not include_system:
            history = [msg for msg in history if msg["role"] != "system"]

        return history

    def get_context_window(
        self,
        conversation_id: str,
        max_tokens: int = 2000
    ) -> str:
        """
        Get conversation history formatted for LLM context.
        Truncates older messages if token limit exceeded.

        Args:
            conversation_id: Unique conversation identifier
            max_tokens: Maximum tokens for context (rough estimate)

        Returns:
            Formatted conversation history
        """
        history = self.get_history(conversation_id)

        # Build context string (reverse order to prioritize recent messages)
        context_parts = []
        estimated_tokens = 0

        for message in reversed(history):
            role = message["role"]
            content = message["content"]

            # Rough token estimation (1 token ≈ 4 characters)
            message_tokens = len(f"{role}: {content}") // 4

            if estimated_tokens + message_tokens > max_tokens:
                break

            context_parts.insert(0, f"{role.capitalize()}: {content}")
            estimated_tokens += message_tokens

        return "\n\n".join(context_parts)

    def delete_conversation(self, conversation_id: str):
        """Delete conversation history."""
        key = f"conversation:{conversation_id}"
        self.redis_client.delete(key)
        logger.info(f"Deleted conversation: {conversation_id}")

    def get_statistics(self, conversation_id: str) -> Dict[str, Any]:
        """Get conversation statistics."""
        history = self.get_history(conversation_id, include_system=True)

        if not history:
            return {"exists": False}

        return {
            "exists": True,
            "total_messages": len(history),
            "user_messages": len([m for m in history if m["role"] == "user"]),
            "assistant_messages": len([m for m in history if m["role"] == "assistant"]),
            "first_message": history[0]["timestamp"],
            "last_message": history[-1]["timestamp"]
        }

Step 4: RAG Chatbot Engine

File: models/rag_chatbot.py (Complete RAG chatbot implementation)

"""
Production RAG chatbot with LLM integration.
Supports GPT-4, Claude, and open-source models.
"""

import os
import logging
from typing import List, Dict, Any, Optional, AsyncGenerator
import openai

from rag.vector_store import VectorStore
from models.conversation_memory import ConversationMemory

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RAGChatbot:
    """
    Production RAG chatbot with streaming responses.
    """

    def __init__(
        self,
        vector_store: VectorStore,
        memory: ConversationMemory,
        model: str = "gpt-4-turbo-preview",
        temperature: float = 0.7,
        system_prompt: Optional[str] = None
    ):
        """
        Initialize RAG chatbot.

        Args:
            vector_store: Vector store for knowledge retrieval
            memory: Conversation memory manager
            model: LLM model to use
            temperature: Sampling temperature (0-2)
            system_prompt: Optional system prompt
        """
        self.vector_store = vector_store
        self.memory = memory
        self.model = model
        self.temperature = temperature

        # Initialize OpenAI client
        self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

        # Default system prompt
        self.system_prompt = system_prompt or """You are a helpful AI assistant.
Use the provided context to answer questions accurately.
If you cannot find the answer in the context, say so clearly.
Always be concise but comprehensive in your responses."""

        logger.info(f"Initialized RAG chatbot with model: {model}")

    def generate_response(
        self,
        query: str,
        conversation_id: str,
        top_k: int = 3,
        include_sources: bool = True
    ) -> Dict[str, Any]:
        """
        Generate response using RAG pipeline.

        Args:
            query: User query
            conversation_id: Conversation identifier
            top_k: Number of documents to retrieve
            include_sources: Include source documents in response

        Returns:
            Dict with response, sources, and metadata
        """
        # Step 1: Retrieve relevant documents
        logger.info(f"Retrieving {top_k} relevant documents for query")
        retrieved_docs = self.vector_store.search(query, top_k=top_k)

        # Step 2: Build context from retrieved documents
        context = "\n\n".join([
            f"Source {i+1} ({doc['metadata'].get('source', 'unknown')}):\n{doc['content']}"
            for i, doc in enumerate(retrieved_docs)
        ])

        # Step 3: Get conversation history
        conversation_history = self.memory.get_history(conversation_id)

        # Step 4: Build messages for LLM
        messages = [
            {"role": "system", "content": self.system_prompt}
        ]

        # Add conversation history
        for msg in conversation_history[-5:]:  # Last 5 turns
            messages.append({
                "role": msg["role"],
                "content": msg["content"]
            })

        # Add current query with context
        user_message = f"""Based on the following context, answer the question:

CONTEXT:
{context}

QUESTION: {query}"""

        messages.append({"role": "user", "content": user_message})

        # Step 5: Generate response
        logger.info(f"Generating response with {self.model}")
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=500
            )

            assistant_message = response.choices[0].message.content

            # Step 6: Store in conversation history
            self.memory.add_message(
                conversation_id=conversation_id,
                role="user",
                content=query
            )
            self.memory.add_message(
                conversation_id=conversation_id,
                role="assistant",
                content=assistant_message,
                metadata={
                    "model": self.model,
                    "tokens": response.usage.total_tokens
                }
            )

            # Step 7: Return response
            result = {
                "response": assistant_message,
                "conversation_id": conversation_id,
                "metadata": {
                    "model": self.model,
                    "tokens_used": response.usage.total_tokens,
                    "documents_retrieved": len(retrieved_docs)
                }
            }

            if include_sources:
                result["sources"] = [
                    {
                        "content": doc["content"][:200] + "...",
                        "source": doc["metadata"].get("source", "unknown"),
                        "score": doc["score"]
                    }
                    for doc in retrieved_docs
                ]

            return result

        except Exception as e:
            logger.error(f"Error generating response: {e}")
            raise

    async def generate_response_stream(
        self,
        query: str,
        conversation_id: str,
        top_k: int = 3
    ) -> AsyncGenerator[str, None]:
        """
        Generate streaming response using RAG pipeline.
        Yields tokens as they're generated for real-time UI updates.

        Args:
            query: User query
            conversation_id: Conversation identifier
            top_k: Number of documents to retrieve

        Yields:
            Response tokens
        """
        # Retrieve relevant documents
        retrieved_docs = self.vector_store.search(query, top_k=top_k)

        # Build context
        context = "\n\n".join([
            f"Source {i+1}:\n{doc['content']}"
            for i, doc in enumerate(retrieved_docs)
        ])

        # Get conversation history
        conversation_history = self.memory.get_history(conversation_id)

        # Build messages
        messages = [
            {"role": "system", "content": self.system_prompt}
        ]

        for msg in conversation_history[-5:]:
            messages.append({
                "role": msg["role"],
                "content": msg["content"]
            })

        user_message = f"""Based on the following context, answer the question:

CONTEXT:
{context}

QUESTION: {query}"""

        messages.append({"role": "user", "content": user_message})

        # Stream response
        full_response = ""
        try:
            stream = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                max_tokens=500,
                stream=True
            )

            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    token = chunk.choices[0].delta.content
                    full_response += token
                    yield token

            # Store conversation after streaming completes
            self.memory.add_message(
                conversation_id=conversation_id,
                role="user",
                content=query
            )
            self.memory.add_message(
                conversation_id=conversation_id,
                role="assistant",
                content=full_response
            )

        except Exception as e:
            logger.error(f"Error in streaming response: {e}")
            raise

Step 5: FastAPI Backend with WebSocket Streaming

File: api/main.py (Production-ready FastAPI server)

"""
FastAPI backend for RAG chatbot with WebSocket streaming.
"""

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uuid
import logging

from rag.vector_store import VectorStore
from models.conversation_memory import ConversationMemory
from models.rag_chatbot import RAGChatbot

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(
    title="RAG Chatbot API",
    description="Production chatbot with RAG and streaming responses",
    version="1.0.0"
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # In production, specify exact origins
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

# Initialize components
vector_store = VectorStore(use_pinecone=False)  # ChromaDB for demo
memory = ConversationMemory()
chatbot = RAGChatbot(
    vector_store=vector_store,
    memory=memory,
    model="gpt-4-turbo-preview"
)

# Request/Response models
class ChatRequest(BaseModel):
    message: str
    conversation_id: Optional[str] = None
    include_sources: bool = False

class ChatResponse(BaseModel):
    response: str
    conversation_id: str
    sources: Optional[list] = None
    metadata: dict

@app.get("/")
async def root():
    """Health check endpoint."""
    return {"status": "ok", "service": "RAG Chatbot API"}

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """
    Non-streaming chat endpoint.
    Use for simple request/response pattern.
    """
    try:
        # Generate conversation ID if not provided
        conversation_id = request.conversation_id or str(uuid.uuid4())

        # Generate response
        result = chatbot.generate_response(
            query=request.message,
            conversation_id=conversation_id,
            include_sources=request.include_sources
        )

        return ChatResponse(**result)

    except Exception as e:
        logger.error(f"Error in chat endpoint: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    """
    WebSocket endpoint for streaming responses.
    Client sends: {"message": "...", "conversation_id": "..."}
    Server streams: {"token": "...", "done": false}
    """
    await websocket.accept()
    logger.info("WebSocket connection established")

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            message = data.get("message")
            conversation_id = data.get("conversation_id", str(uuid.uuid4()))

            if not message:
                await websocket.send_json({"error": "Message is required"})
                continue

            # Stream response
            async for token in chatbot.generate_response_stream(
                query=message,
                conversation_id=conversation_id
            ):
                await websocket.send_json({
                    "token": token,
                    "done": False
                })

            # Send completion signal
            await websocket.send_json({
                "token": "",
                "done": True,
                "conversation_id": conversation_id
            })

    except WebSocketDisconnect:
        logger.info("WebSocket disconnected")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
        await websocket.send_json({"error": str(e)})

@app.get("/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
    """Get conversation history."""
    history = memory.get_history(conversation_id)
    stats = memory.get_statistics(conversation_id)

    return {
        "conversation_id": conversation_id,
        "history": history,
        "statistics": stats
    }

@app.delete("/conversation/{conversation_id}")
async def delete_conversation(conversation_id: str):
    """Delete conversation history."""
    memory.delete_conversation(conversation_id)
    return {"status": "deleted", "conversation_id": conversation_id}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Running the API:

# Development
uvicorn api.main:app --reload --port 8000

# Production with multiple workers
gunicorn api.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

Step 6: Streamlit Chat UI

File: ui/streamlit_app.py (Interactive chat interface)

"""
Streamlit chat UI with streaming responses.
"""

import streamlit as st
import requests
import uuid
from websocket import create_connection
import json

# Page config
st.set_page_config(
    page_title="AI Chatbot",
    page_icon="",
    layout="wide"
)

# API endpoint
API_BASE = "http://localhost:8000"

# Initialize session state
if "conversation_id" not in st.session_state:
    st.session_state.conversation_id = str(uuid.uuid4())
if "messages" not in st.session_state:
    st.session_state.messages = []

# UI Layout
st.title(" AI-Powered Chatbot")
st.caption("Powered by GPT-4 and Retrieval-Augmented Generation")

# Sidebar
with st.sidebar:
    st.header("Settings")

    # New conversation button
    if st.button("New Conversation"):
        st.session_state.conversation_id = str(uuid.uuid4())
        st.session_state.messages = []
        st.rerun()

    # Show conversation ID
    st.text(f"Conversation ID:")
    st.code(st.session_state.conversation_id[:8] + "...")

    # Show statistics
    try:
        response = requests.get(
            f"{API_BASE}/conversation/{st.session_state.conversation_id}"
        )
        if response.status_code == 200:
            stats = response.json().get("statistics", {})
            if stats.get("exists"):
                st.metric("Total Messages", stats.get("total_messages", 0))
                st.metric("User Messages", stats.get("user_messages", 0))
                st.metric("Bot Messages", stats.get("assistant_messages", 0))
    except:
        pass

# Display chat messages
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

        # Show sources if available
        if "sources" in message and message["sources"]:
            with st.expander("View Sources"):
                for i, source in enumerate(message["sources"], 1):
                    st.markdown(f"**Source {i}** (Score: {source['score']:.2f})")
                    st.text(source["content"])

# Chat input
if prompt := st.chat_input("Ask me anything..."):
    # Add user message to chat
    st.session_state.messages.append({"role": "user", "content": prompt})

    with st.chat_message("user"):
        st.markdown(prompt)

    # Generate response
    with st.chat_message("assistant"):
        message_placeholder = st.empty()
        full_response = ""

        try:
            # Call API
            response = requests.post(
                f"{API_BASE}/chat",
                json={
                    "message": prompt,
                    "conversation_id": st.session_state.conversation_id,
                    "include_sources": True
                }
            )

            if response.status_code == 200:
                data = response.json()
                full_response = data["response"]
                sources = data.get("sources", [])

                # Display response
                message_placeholder.markdown(full_response)

                # Add to message history
                st.session_state.messages.append({
                    "role": "assistant",
                    "content": full_response,
                    "sources": sources
                })

                # Show sources
                if sources:
                    with st.expander("View Sources"):
                        for i, source in enumerate(sources, 1):
                            st.markdown(f"**Source {i}** (Score: {source['score']:.2f})")
                            st.text(source["content"])

        except Exception as e:
            st.error(f"Error: {e}")

# Footer
st.divider()
st.caption("Built with Streamlit, FastAPI, and OpenAI GPT-4")

Running the UI:

streamlit run ui/streamlit_app.py

Known Limitations

Limitation Impact Mitigation
LLM Hallucinations May generate plausible-sounding but incorrect answers Use RAG to ground responses, add confidence scores, human-in-the-loop for critical applications
Embedding Quality Vector search quality depends on embedding model Use domain-specific embeddings, fine-tune on your data, hybrid search (semantic + keyword)
Context Window Limits GPT-4: 128K tokens, but expensive at scale Implement smart chunking, conversation summarization, cost monitoring
Latency RAG pipeline adds 200-2000ms latency Cache frequently asked questions, use faster models for simple queries, async processing
Cost at Scale GPT-4 pricing: $10-30 per 1M tokens Implement caching, use smaller models where possible, monitor usage, set budgets
Conversation Memory Redis memory overhead for long conversations Implement TTL, conversation summarization, archive old conversations
Retrieval Precision May retrieve irrelevant documents Implement reranking (cross-encoder), query expansion, metadata filtering
Multi-Language Support Most models perform best in English Use multilingual embedding models, language-specific prompts, translation layer
Real-Time Knowledge Training data cutoff (GPT-4: April 2023) Combine with web search APIs, regular knowledge base updates, explicitly state limitations
Safety & Compliance May generate harmful or biased content Implement content moderation, PII detection, output validation, safety filters

Troubleshooting Guide

Problem: Vector Search Returns Irrelevant Results

# Symptom: Retrieved documents don't match query semantically

# Solution 1: Check embedding model quality
# Try different models for better domain coverage
vector_store = VectorStore(
    embedding_model="sentence-transformers/all-mpnet-base-v2"  # Better quality
    # Or for specific domains:
    # "sentence-transformers/msmarco-distilbert-base-v4"  # For search
    # "sentence-transformers/multi-qa-mpnet-base-dot-v1"  # For Q&A
)

# Solution 2: Implement reranking
from sentence_transformers import CrossEncoder

cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank_documents(query: str, documents: List[Dict], top_k: int = 3):
    """Rerank documents using cross-encoder for better precision."""
    pairs = [[query, doc["content"]] for doc in documents]
    scores = cross_encoder.predict(pairs)

    # Sort by score and return top_k
    ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
    return [doc for doc, score in ranked[:top_k]]

# Solution 3: Hybrid search (combine semantic + keyword)
from rank_bm25 import BM25Okapi

def hybrid_search(query: str, top_k: int = 5):
    # Semantic search
    semantic_results = vector_store.search(query, top_k=top_k*2)

    # Keyword search (BM25)
    # Assuming you have all documents loaded
    bm25 = BM25Okapi([doc["content"].split() for doc in all_docs])
    bm25_scores = bm25.get_scores(query.split())

    # Combine scores (weighted average)
    combined_scores = {}
    for doc in semantic_results:
        # Normalize and combine
        combined_scores[doc["id"]] = 0.7 * doc["score"] + 0.3 * bm25_scores[doc["id"]]

    # Return top_k
    return sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]

Problem: High API Costs

# Symptom: OpenAI costs growing too quickly

# Solution 1: Implement response caching
import hashlib
import redis

cache = redis.from_url("redis://localhost:6379")

def get_cached_response(query: str, ttl_hours: int = 24):
    """Cache responses for frequently asked questions."""
    # Create cache key
    query_hash = hashlib.md5(query.lower().encode()).hexdigest()
    cache_key = f"response:{query_hash}"

    # Check cache
    cached = cache.get(cache_key)
    if cached:
        return json.loads(cached)

    # Generate response
    response = chatbot.generate_response(query, conversation_id="...")

    # Cache for TTL
    cache.setex(cache_key, ttl_hours * 3600, json.dumps(response))

    return response

# Solution 2: Use smaller/faster models for simple queries
def select_model_by_complexity(query: str) -> str:
    """Choose model based on query complexity."""
    # Simple heuristic: word count and question type
    word_count = len(query.split())

    if word_count < 10 and any(word in query.lower() for word in ["what", "when", "who"]):
        return "gpt-3.5-turbo"  # Cheaper for simple queries
    else:
        return "gpt-4-turbo-preview"  # Better for complex queries

# Solution 3: Set token limits and budgets
class CostMonitor:
    """Monitor and limit API costs."""

    def __init__(self, daily_budget_usd: float = 100.0):
        self.daily_budget = daily_budget_usd
        self.cache = redis.from_url("redis://localhost:6379")

    def check_budget(self):
        """Check if within budget."""
        today = datetime.now().strftime("%Y-%m-%d")
        spent_key = f"spent:{today}"
        spent = float(self.cache.get(spent_key) or 0)

        if spent >= self.daily_budget:
            raise Exception(f"Daily budget of ${self.daily_budget} exceeded")

    def record_usage(self, tokens: int, model: str):
        """Record token usage."""
        # Pricing (as of Jan 2025)
        pricing = {
            "gpt-4-turbo-preview": 0.01 / 1000,  # $0.01 per 1K tokens
            "gpt-3.5-turbo": 0.0005 / 1000  # $0.0005 per 1K tokens
        }

        cost = tokens * pricing.get(model, 0.01 / 1000)

        today = datetime.now().strftime("%Y-%m-%d")
        spent_key = f"spent:{today}"

        self.cache.incrbyfloat(spent_key, cost)
        self.cache.expire(spent_key, 86400)  # 24 hours

Problem: Slow Response Times

# Symptom: Users waiting 5+ seconds for responses

# Solution 1: Parallel retrieval and generation
import asyncio

async def fast_rag_pipeline(query: str, conversation_id: str):
    """Optimize RAG pipeline with parallel operations."""
    # Run retrieval and conversation history fetch in parallel
    retrieval_task = asyncio.create_task(
        asyncio.to_thread(vector_store.search, query, top_k=3)
    )
    history_task = asyncio.create_task(
        asyncio.to_thread(memory.get_history, conversation_id)
    )

    # Wait for both to complete
    retrieved_docs, conversation_history = await asyncio.gather(
        retrieval_task,
        history_task
    )

    # Build context and generate (remaining pipeline)
    # ...

# Solution 2: Reduce embedding inference time
# Use smaller, faster models for embedding
vector_store = VectorStore(
    embedding_model="sentence-transformers/all-MiniLM-L6-v2"  # 80MB, very fast
    # vs "all-mpnet-base-v2" (420MB, slower but better quality)
)

# Solution 3: Implement streaming for better perceived performance
# Even if total time is same, users prefer seeing incremental responses
# Use WebSocket streaming (already implemented above)

Problem: Conversation Context Overflow

# Symptom: Error "This model's maximum context length is 128000 tokens"

# Solution 1: Implement conversation summarization
def summarize_conversation(conversation_id: str):
    """Summarize old messages to compress context."""
    history = memory.get_history(conversation_id)

    if len(history) > 10:
        # Get messages to summarize (oldest 70%)
        to_summarize = history[:int(len(history) * 0.7)]

        # Generate summary
        summary_prompt = "Summarize the following conversation:\n\n"
        for msg in to_summarize:
            summary_prompt += f"{msg['role']}: {msg['content']}\n"

        summary_response = chatbot.client.chat.completions.create(
            model="gpt-3.5-turbo",  # Cheaper for summarization
            messages=[{"role": "user", "content": summary_prompt}],
            max_tokens=200
        )

        summary = summary_response.choices[0].message.content

        # Replace old messages with summary
        new_history = [
            {"role": "system", "content": f"Previous conversation summary: {summary}"}
        ] + history[int(len(history) * 0.7):]

        # Save back
        memory.redis_client.setex(
            f"conversation:{conversation_id}",
            memory.ttl_seconds,
            json.dumps(new_history)
        )

# Solution 2: Use sliding window with token counting
from tiktoken import encoding_for_model

def get_context_with_token_limit(conversation_id: str, max_tokens: int = 4000):
    """Get conversation history within token budget."""
    enc = encoding_for_model("gpt-4")
    history = memory.get_history(conversation_id)

    # Build context from most recent messages
    context_messages = []
    total_tokens = 0

    for message in reversed(history):
        message_tokens = len(enc.encode(message["content"]))

        if total_tokens + message_tokens > max_tokens:
            break

        context_messages.insert(0, message)
        total_tokens += message_tokens

    return context_messages

Production Best Practices

1. Implement Proper Error Handling

class ChatbotError(Exception):
    """Base exception for chatbot errors."""
    pass

class RetrievalError(ChatbotError):
    """Error during document retrieval."""
    pass

class GenerationError(ChatbotError):
    """Error during LLM generation."""
    pass

def safe_generate_response(query: str, conversation_id: str) -> Dict:
    """Generate response with comprehensive error handling."""
    try:
        # Validate input
        if not query or len(query) > 5000:
            raise ValueError("Query must be between 1 and 5000 characters")

        # Retrieve documents with retry
        max_retries = 3
        for attempt in range(max_retries):
            try:
                docs = vector_store.search(query, top_k=3)
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    raise RetrievalError(f"Failed to retrieve documents: {e}")
                time.sleep(2 ** attempt)  # Exponential backoff

        # Generate response with timeout
        try:
            response = chatbot.generate_response(query, conversation_id)
            return response
        except Exception as e:
            raise GenerationError(f"Failed to generate response: {e}")

    except ValueError as e:
        logger.warning(f"Invalid input: {e}")
        return {"error": str(e), "type": "validation"}
    except RetrievalError as e:
        logger.error(f"Retrieval error: {e}")
        return {"error": "Unable to access knowledge base", "type": "retrieval"}
    except GenerationError as e:
        logger.error(f"Generation error: {e}")
        return {"error": "Unable to generate response", "type": "generation"}
    except Exception as e:
        logger.exception(f"Unexpected error: {e}")
        return {"error": "An unexpected error occurred", "type": "unknown"}

2. Monitor and Log Everything

import time
from prometheus_client import Counter, Histogram

# Metrics
chat_requests = Counter("chat_requests_total", "Total chat requests")
chat_errors = Counter("chat_errors_total", "Total chat errors", ["error_type"])
response_time = Histogram("response_time_seconds", "Response generation time")
tokens_used = Counter("tokens_used_total", "Total tokens consumed")

def monitored_generate_response(query: str, conversation_id: str):
    """Generate response with monitoring."""
    chat_requests.inc()
    start_time = time.time()

    try:
        response = chatbot.generate_response(query, conversation_id)

        # Record metrics
        duration = time.time() - start_time
        response_time.observe(duration)
        tokens_used.inc(response["metadata"]["tokens_used"])

        # Structured logging
        logger.info(
            "Chat request processed",
            extra={
                "conversation_id": conversation_id,
                "duration_seconds": duration,
                "tokens_used": response["metadata"]["tokens_used"],
                "model": response["metadata"]["model"]
            }
        )

        return response

    except Exception as e:
        chat_errors.labels(error_type=type(e).__name__).inc()
        logger.error(
            "Chat request failed",
            extra={
                "conversation_id": conversation_id,
                "error": str(e),
                "query_length": len(query)
            }
        )
        raise

3. Secure Your Chatbot

# Input validation and sanitization
import re
from typing import Optional

class SecurityValidator:
    """Validate and sanitize user inputs."""

    @staticmethod
    def sanitize_input(text: str, max_length: int = 5000) -> str:
        """Sanitize user input."""
        # Trim whitespace
        text = text.strip()

        # Limit length
        if len(text) > max_length:
            text = text[:max_length]

        # Remove control characters
        text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)

        return text

    @staticmethod
    def detect_injection(text: str) -> bool:
        """Detect potential prompt injection attempts."""
        injection_patterns = [
            r"ignore previous instructions",
            r"disregard all prior",
            r"system:.*admin",
            r"<script>",
            r"DROP TABLE",
            r"\\n\\nHuman:",
            r"\\n\\nAssistant:"
        ]

        for pattern in injection_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return True

        return False

    @staticmethod
    def check_pii(text: str) -> Optional[str]:
        """Check for personally identifiable information."""
        # Email pattern
        if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
            return "email"

        # Phone number pattern
        if re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text):
            return "phone"

        # Credit card pattern
        if re.search(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', text):
            return "credit_card"

        return None

# Use in API endpoint
@app.post("/chat")
async def secure_chat(request: ChatRequest):
    # Sanitize input
    sanitized_message = SecurityValidator.sanitize_input(request.message)

    # Check for injection attempts
    if SecurityValidator.detect_injection(sanitized_message):
        logger.warning(f"Injection attempt detected: {request.message}")
        raise HTTPException(status_code=400, detail="Invalid input detected")

    # Check for PII
    pii_type = SecurityValidator.check_pii(sanitized_message)
    if pii_type:
        logger.warning(f"PII detected ({pii_type}): {request.message}")
        # Either reject or redact
        # raise HTTPException(status_code=400, detail="PII detected in input")

    # Process request
    # ...

Conclusion

Building production-grade AI chatbots has fundamentally changed with the advent of Large Language Models. The combination of LLMs with Retrieval-Augmented Generation (RAG) enables chatbots that are:

Knowledgeable: Grounded in your organization’s data through vector search Context-Aware: Maintain conversation history across multiple turns Flexible: Handle arbitrary queries without intent classification Scalable: Distributed architecture with Redis memory and vector databases

This guide covered the complete stack:

  1. Vector Databases (Pinecone/ChromaDB) for semantic knowledge retrieval
  2. Conversation Memory (Redis) for multi-turn dialog tracking
  3. RAG Pipeline combining retrieval with LLM generation
  4. Production API (FastAPI) with WebSocket streaming
  5. Interactive UI (Streamlit) with real-time responses

Key Takeaways:

  • RAG is Essential: Pure LLMs hallucinate. RAG grounds responses in facts.
  • Streaming Improves UX: Even if total latency is the same, incremental display feels faster
  • Monitor Everything: Track costs, latency, errors, and user satisfaction
  • Security Matters: Validate inputs, detect injections, protect PII
  • Start Simple: Begin with ChromaDB and smaller models, scale to Pinecone and GPT-4 as needed

Production Checklist:

  • ✅ Implement error handling and fallbacks
  • ✅ Add monitoring and alerting (Prometheus, Grafana)
  • ✅ Set up logging (structured JSON logs)
  • ✅ Implement rate limiting and cost controls
  • ✅ Add input validation and sanitization
  • ✅ Test with real users and iterate
  • ✅ Document your knowledge base update process
  • ✅ Plan for scaling (horizontal API scaling, vector DB sharding)

The future of chatbots is multimodal (text, voice, images), agentic (taking actions, not just answering), and personalized (adapting to individual users). But the foundation—LLMs + RAG + production engineering—remains constant.

Start building, measure everything, and iterate based on real user feedback. The technology is mature enough for production today.