guides

How to Build Cai: The Production-Grade Blueprint

A deep dive into building production-ready Conversational AI (CAI). Full tech stack breakdown, LangGraph implementation, and scaling strategies for enterprise use cases.

8 min read
How to Build Cai: The Production-Grade Blueprint

How to Build Cai: The Production-Grade Blueprint

Building Cai (Conversational AI) is easy in a notebook, but deploying it to production requires handling concurrency, latency, and state. Most developers stop at while True: chat(). That doesn’t scale.

Here is the architectural pattern I use for enterprise clients who need stateful, resilient, and observable Conversational AI agents.

🏗️ The Architecture

We are not just scripting; we are engineering a system. To build a production CAI, we move beyond simple “Chains” and adopt a State Machine architecture. This allows us to handle loops, corrections, and complex tool usage deterministically.

The Data Flow:

  1. Ingress: Client sends a message via WebSocket or SSE (Server-Sent Events) to a FastAPI endpoint.
  2. State Loading: The system fetches the conversation history (Thread) from Redis.
  3. Orchestration: LangGraph executes the workflow. It decides whether to call the LLM, execute a tool, or return a response.
  4. Generation: The LLM streams tokens back.
  5. Persistence: The new state (messages + artifacts) is saved back to Redis (Checkpointing).
  6. Egress: Response is streamed to the client.

🛠️ The Stack

  • Core: LangGraph (Python) - The standard for stateful agents in 2025.
  • Runtime: FastAPI (Async)
  • State Store: Redis (via langgraph-checkpoint-redis)
  • LLM: OpenAI GPT-4o or Anthropic Claude 3.5 Sonnet
  • Observability: LangSmith

💻 Implementation

We will build a Stateful Conversational Agent that persists memory across sessions. We use LangGraph because it treats conversation as a graph of states, not a rigid chain.

Prerequisites

pip install langgraph langchain-openai fastapi uvicorn redis aioredis

The Core Logic

This code handles the agent loop, state persistence, and streaming.

import os
from typing import Annotated, Literal, TypedDict
from uuid import uuid4

import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.redis import RedisSaver
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from redis import asyncio as aioredis

# --- 1. Configuration & Setup ---
# In production, load these from environment variables
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Define the State of our CAI
# We track the list of messages. 'add_messages' handles appending automatically.
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

# --- 2. The Agent Logic (Nodes) ---

# Initialize the Model
# Streaming=True is critical for perceived latency
model = ChatOpenAI(model="gpt-4o", streaming=True)

# Define Tools (Example: A dummy search tool)
def search_database(query: str):
    """Call this to search the internal database."""
    return f"Results for {query}: [Data found]"

tools = [search_database]
model_with_tools = model.bind_tools(tools)

# Node 1: The Assistant (LLM Decision Maker)
async def assistant_node(state: AgentState):
    response = await model_with_tools.ainvoke(state["messages"])
    return {"messages": [response]}

# Node 2: Tool Execution
# LangGraph provides a prebuilt node for executing tools safely
tool_node = ToolNode(tools)

# Edge Logic: Should we continue or stop?
def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
    last_message = state["messages"][-1]
    # If the LLM made a tool call, route to 'tools' node
    if last_message.tool_calls:
        return "tools"
    # Otherwise, end the turn
    return "__end__"

# --- 3. Building the Graph ---

def build_graph():
    workflow = StateGraph(AgentState)

    # Add Nodes
    workflow.add_node("agent", assistant_node)
    workflow.add_node("tools", tool_node)

    # Set Entry Point
    workflow.set_entry_point("agent")

    # Add Edges
    workflow.add_conditional_edges(
        "agent",
        should_continue,
    )
    workflow.add_edge("tools", "agent") # Loop back to agent after tool use

    return workflow

# --- 4. The API Layer (Production Serving) ---

app = FastAPI(title="Cai Production API")

@app.on_event("startup")
async def startup_event():
    # Initialize Redis connection pool
    app.state.redis = aioredis.from_url(REDIS_URL)

@app.post("/chat/stream")
async def chat_stream(request: Request):
    """
    Endpoint that handles full conversation state and streams response.
    Expects JSON: { "message": "Hello", "thread_id": "user-123" }
    """
    data = await request.json()
    user_message = data.get("message")
    thread_id = data.get("thread_id", str(uuid4()))

    # Setup Checkpointer (Redis)
    # This ensures if the server restarts, the conversation state is safe.
    async with RedisSaver.from_conn(app.state.redis) as checkpointer:
        # Compile the graph with persistence
        graph = build_graph().compile(checkpointer=checkpointer)

        # Configure the thread config for isolation
        config = {"configurable": {"thread_id": thread_id}}

        # Input payload
        inputs = {"messages": [HumanMessage(content=user_message)]}

        # Generator for Streaming Response
        async def event_generator():
            # 'stream' yields events as the graph executes
            async for event in graph.astream_events(inputs, config=config, version="v1"):
                kind = event["event"]

                # Stream LLM tokens for low latency
                if kind == "on_chat_model_stream":
                    content = event["data"]["chunk"].content
                    if content:
                        # Server-Sent Events (SSE) format
                        yield f"data: {content}\n\n"

            yield "data: [DONE]\n\n"

        return StreamingResponse(event_generator(), media_type="text/event-stream")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

⚠️ Production Pitfalls (The “Senior” Perspective)

When scaling this to 10k concurrent users, here is what breaks:

1. State Bloat

The Issue: Storing the entire chat history in Redis indefinitely makes context windows expensive and Redis slow. The Fix: Implement a trimming strategy. Before passing messages to the LLM, use a trim_messages utility to keep only the last $k$ tokens or summarize older messages.

2. Concurrency & Locking

The Issue: If a user sends two messages rapidly, the second request might fetch an old state before the first one finishes writing. The Fix: LangGraph handles optimistic locking via thread_ts (timestamp). Ensure your frontend debounces inputs or queues messages to prevent “race conditions” on the conversation state.

3. Tool Latency

The Issue: If search_database takes 5 seconds, the user sees a spinner. The Fix: Emit “Intermediate Steps” to the frontend. Modify the event_generator to yield generic events like data: {"status": "searching_db"}\n\n so the UI can show “Searching…” instead of freezing.

4. Cost Control

The Issue: Infinite loops. The model calls a tool, the tool fails, the model tries again forever. The Fix: Set a recursion_limit in LangGraph (default is usually 25, lower it to 5-10 for simple agents) and implement hard timeouts on tool execution.

🚀 Final Verdict

This architecture provides the balance between speed and reliability.

  • For MVP: You can use memory-based checkpointers.
  • For Production: You must use Redis or Postgres checkpointers.

If you are building a “Character.ai” clone, your next step is to inject System Prompts dynamically based on the thread_id to load specific personas for each chat session.