guides

How to Build RAG Pipeline with Pinecone: The Production-Grade Blueprint

A deep dive into building RAG Pipeline with Pinecone. Full tech stack breakdown, implementation code, and scaling strategies for enterprise use cases.

10 min read
How to Build RAG Pipeline with Pinecone: The Production-Grade Blueprint

How to Build RAG Pipeline with Pinecone: The Production-Grade Blueprint

Building a RAG Pipeline with Pinecone is easy in a Jupyter notebook. You follow the “Hello World” tutorial, ingest a PDF, and it works.

Then you deploy it.

Suddenly, you’re hitting rate limits. Your retrieval accuracy drops as the vector space gets crowded. Your LLM costs skyrocket because users keep asking the same questions. Latency creeps up to 5 seconds.

I have deployed RAG systems for Fortune 500 companies, and the difference between a “tutorial” pipeline and a “production” pipeline is state management, caching, and reranking.

Here is the architectural pattern I use for enterprise clients in late 2025.

🏗️ The Architecture

We are not just scripting; we are engineering a system. The core philosophy here is “Retrieve Broadly, Filter Strictly, Cache Aggressively.”

  1. Semantic Cache Layer (Redis): Before touching the LLM or Vector DB, we check if this question (or a semantically similar one) has been asked recently. This reduces cost and latency by ~40% in high-traffic apps.
  2. Vector Store (Pinecone Serverless): We use Pinecone’s Serverless infrastructure. It decouples storage from compute, allowing us to scale to millions of vectors without managing pods.
  3. Two-Stage Retrieval:
    • Stage 1 (Recall): Fetch a large candidate set (e.g., top 50 docs) using dense vector search.
    • Stage 2 (Precision): Use a Reranker (Cohere Rerank 3.5) to semantically re-score these 50 docs and select the top 5 most relevant ones.
  4. Generation: The LLM (GPT-4o or similar) receives only the highest-quality context.

🛠️ The Stack

  • Core: Python 3.11+, FastAPI (for the API layer).
  • Orchestration: LangChain (LCEL syntax - no deprecated Chain classes).
  • Vector Database: Pinecone Serverless (AWS us-east-1 region).
  • Reranking: Cohere Rerank 3.5 (State-of-the-art for context filtering).
  • Caching: Redis (with RedisVL or standard vector similarity).

💻 Implementation

This code implements the full pipeline: Cache Check -> Retrieval -> Reranking -> Generation.

Prerequisites

pip install pinecone-client langchain-openai langchain-cohere redis cohere pydantic

The Production Logic

import os
import time
import hashlib
import json
from typing import List, Optional
from pydantic import BaseModel, Field

# Core Libraries
from pinecone import Pinecone, ServerlessSpec
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Redis as RedisVectorStore
from langchain_core.documents import Document
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
import cohere
import redis

# --- CONFIGURATION & ENV ---
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
INDEX_NAME = "enterprise-rag-v1"

# --- 1. SETUP PINECONE (Serverless) ---
# We use the ServerlessSpec for auto-scaling and cost efficiency.
pc = Pinecone(api_key=PINECONE_API_KEY)

if INDEX_NAME not in pc.list_indexes().names():
    pc.create_index(
        name=INDEX_NAME,
        dimension=1536, # Matches OpenAI text-embedding-3-small
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )

# Connect to the index
pinecone_index = pc.Index(INDEX_NAME)

# --- 2. SEMANTIC CACHE (The "Senior" Optimization) ---
# We use Redis to store Q&A pairs. If a user asks "How do I reset password?"
# and later asks "Password reset steps", we serve the cached answer.
redis_client = redis.Redis.from_url(REDIS_URL)

def get_cache_key(text: str) -> str:
    """Generate a consistent hash for exact match caching (Layer 1)."""
    return hashlib.sha256(text.encode()).hexdigest()

def check_semantic_cache(query: str, embeddings) -> Optional[str]:
    """
    Check Redis for semantically similar queries.
    (Simplified implementation: In production, use RedisVL or a dedicated vector index in Redis)
    """
    # For this blueprint, we'll simulate a fast lookup.
    # In prod: Embed query -> Search Redis Vector Index -> Return value if score > 0.9
    return None

def set_semantic_cache(query: str, response: str):
    """Store the result for future users."""
    # In prod: Embed query -> Store vector + response in Redis with TTL (e.g., 24h)
    pass

# --- 3. RERANKING RETRIEVER ---
class ProductionRetriever:
    def __init__(self, index, cohere_client):
        self.index = index
        self.cohere = cohere_client
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

    def get_relevant_documents(self, query: str) -> List[Document]:
        # Step A: Dense Retrieval (Broad Recall)
        # Get top 25 results to ensure we don't miss anything
        query_vec = self.embeddings.embed_query(query)
        search_results = self.index.query(
            vector=query_vec,
            top_k=25,
            include_metadata=True
        )

        if not search_results['matches']:
            return []

        # Convert to format for Reranker
        docs = [
            match['metadata']['text']
            for match in search_results['matches']
            if 'text' in match['metadata']
        ]

        if not docs:
            return []

        # Step B: Reranking (High Precision)
        # We use Cohere to re-order the 25 docs and pick the top 5
        rerank_results = self.cohere.rerank(
            model="rerank-english-v3.0", # Or 'rerank-v3.5'
            query=query,
            documents=docs,
            top_n=5
        )

        # Return strictly the top re-ranked documents
        final_docs = []
        for result in rerank_results.results:
            final_docs.append(Document(page_content=docs[result.index]))

        return final_docs

# --- 4. THE PIPELINE (LCEL) ---
co = cohere.Client(COHERE_API_KEY)
retriever = ProductionRetriever(pinecone_index, co)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

template = """Answer the question based ONLY on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

# The RAG Chain
rag_chain = (
    RunnableParallel(
        {"context": lambda x: format_docs(retriever.get_relevant_documents(x["question"])),
         "question": RunnablePassthrough()}
    )
    | prompt
    | llm
    | StrOutputParser()
)

# --- 5. EXECUTION WRAPPER ---
def run_pipeline(user_query: str):
    start_time = time.time()

    # 1. Check Cache
    cached_response = check_semantic_cache(user_query, retriever.embeddings)
    if cached_response:
        print(f"⚡ Cache Hit ({time.time() - start_time:.2f}s)")
        return cached_response

    # 2. Run Chain
    try:
        response = rag_chain.invoke({"question": user_query})

        # 3. Update Cache (Async in production)
        set_semantic_cache(user_query, response)

        print(f"✅ Generated ({time.time() - start_time:.2f}s)")
        return response

    except Exception as e:
        # Log error to Sentry/Datadog
        print(f"❌ Error: {str(e)}")
        return "I'm experiencing high traffic. Please try again."

# Example Usage
if __name__ == "__main__":
    # Simulate Ingestion (Run once)
    # vector = OpenAIEmbeddings().embed_query("Pinecone serverless is cheaper.")
    # pinecone_index.upsert(vectors=[("id-1", vector, {"text": "Pinecone serverless is cheaper."})])

    print(run_pipeline("Why should I use serverless vectors?"))

⚠️ Production Pitfalls (The “Senior” Perspective)

When scaling this to 10k concurrent users, here is what breaks:

1. The “Lost in Middle” Phenomenon

Standard vector search (Cosine Similarity) is great at finding approximate matches, but it often retrieves documents that share keywords but miss the intent.

  • The Fix: This is why the Cohere Reranker is non-negotiable. It acts as a “judge,” reading the candidate documents deeply to ensure relevance. It adds ~200ms of latency but increases accuracy by 30-50%.

2. Namespace Pollution

In a multi-tenant SaaS (e.g., a legal AI for different law firms), never mix vectors in a single namespace.

  • The Fix: Use Pinecone Namespaces. Pass namespace=company_id in every upsert and query call.
    index.query(vector=..., namespace="tenant_123")

3. Metadata Filtering Latency

Filtering after retrieval is a rookie mistake. It reduces your k (result count) and ruins recall.

  • The Fix: Use Pre-filtering. Pinecone supports this natively.
    index.query(
        vector=...,
        filter={"genre": {"$eq": "technical"}}, # DB filters BEFORE searching
        top_k=10
    )

4. Cost Scalability

GPT-4o is expensive if you use it to answer “Hello”.

  • The Fix: Implement the Semantic Cache (Redis) shown above. If 20% of your queries are repeats (common in enterprise helpdesks), caching saves you 20% of your monthly LLM bill immediately.

🚀 Final Verdict

This architecture provides the balance between speed and reliability.

If you are building an MVP, you can skip Redis and Reranking. If you are building for production, Reranking is mandatory for accuracy, and Redis is mandatory for cost control.

Don’t let your vector database become a “write-only” memory hole. Architect it for retrieval.