Scaling Codes Logo
Scaling Codes

Vector Search RAG

Implement semantic search using vector embeddings for intelligent document retrieval

Search & RetrievalIntermediateVector Search

Overview

Vector Search RAG combines the power of semantic understanding with traditional information retrieval to provide highly relevant search results. By converting text into high-dimensional vectors (embeddings), this approach can find documents that are semantically similar even when they don't share exact keywords.

This pattern is particularly effective for natural language queries, multilingual search, and finding related content across large document collections. It addresses the limitations of keyword-based search by understanding context and meaning.

Architecture Overview

Implementation Guide

1. Document Embedding Generation

import openai
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict

class DocumentEmbedder:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.openai_client = openai.OpenAI()
    
    def generate_embeddings(self, documents: List[str]) -> np.ndarray:
        """Generate embeddings for a list of documents"""
        return self.model.encode(documents, show_progress_bar=True)
    
    def generate_openai_embeddings(self, documents: List[str]) -> List[List[float]]:
        """Generate embeddings using OpenAI's API"""
        embeddings = []
        for doc in documents:
            response = self.openai_client.embeddings.create(
                model="text-embedding-ada-002",
                input=doc
            )
            embeddings.append(response.data[0].embedding)
        return embeddings
    
    def chunk_documents(self, documents: List[str], chunk_size: int = 512) -> List[str]:
        """Split documents into smaller chunks for better embedding"""
        chunks = []
        for doc in documents:
            words = doc.split()
            for i in range(0, len(words), chunk_size):
                chunk = " ".join(words[i:i + chunk_size])
                chunks.append(chunk)
        return chunks

2. Vector Database Setup

import chromadb
from chromadb.config import Settings
import numpy as np
from typing import List, Dict, Optional

class VectorDatabase:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.client = chromadb.PersistentClient(
            path=persist_directory,
            settings=Settings(anonymized_telemetry=False)
        )
        self.collection = self.client.get_or_create_collection(
            name="documents",
            metadata={"hnsw:space": "cosine"}
        )
    
    def add_documents(self, documents: List[str], metadata: List[Dict], ids: List[str]):
        """Add documents to the vector database"""
        # Generate embeddings for documents
        embedder = DocumentEmbedder()
        embeddings = embedder.generate_embeddings(documents)
        
        # Add to collection
        self.collection.add(
            embeddings=embeddings.tolist(),
            documents=documents,
            metadatas=metadata,
            ids=ids
        )
    
    def search(self, query: str, n_results: int = 5) -> Dict:
        """Search for similar documents"""
        embedder = DocumentEmbedder()
        query_embedding = embedder.generate_embeddings([query])
        
        results = self.collection.query(
            query_embeddings=query_embedding.tolist(),
            n_results=n_results,
            include=["documents", "metadatas", "distances"]
        )
        
        return results
    
    def hybrid_search(self, query: str, n_results: int = 5, 
                     semantic_weight: float = 0.7) -> Dict:
        """Combine semantic and keyword search"""
        # Semantic search
        semantic_results = self.search(query, n_results)
        
        # Keyword search (simple implementation)
        keyword_results = self.keyword_search(query, n_results)
        
        # Combine and rerank results
        combined_results = self.combine_results(
            semantic_results, keyword_results, semantic_weight
        )
        
        return combined_results

3. RAG Pipeline Implementation

from typing import List, Dict
import openai
from dataclasses import dataclass

@dataclass
class SearchResult:
    document: str
    metadata: Dict
    similarity_score: float
    source: str

class VectorSearchRAG:
    def __init__(self, vector_db: VectorDatabase, openai_api_key: str):
        self.vector_db = vector_db
        self.openai_client = openai.OpenAI(api_key=openai_api_key)
    
    def retrieve_context(self, query: str, n_results: int = 5) -> List[SearchResult]:
        """Retrieve relevant context for a query"""
        search_results = self.vector_db.search(query, n_results)
        
        results = []
        for i in range(len(search_results['documents'][0])):
            result = SearchResult(
                document=search_results['documents'][0][i],
                metadata=search_results['metadatas'][0][i],
                similarity_score=1 - search_results['distances'][0][i],
                source=search_results['metadatas'][0][i].get('source', 'unknown')
            )
            results.append(result)
        
        return results
    
    def generate_response(self, query: str, context: List[SearchResult]) -> str:
        """Generate response using retrieved context"""
        # Prepare context for the LLM
        context_text = "\n\n".join([
            f"Document {i+1} (Score: {result.similarity_score:.3f}):\n{result.document}"
            for i, result in enumerate(context)
        ])
        
        prompt = f"""Based on the following context, answer the user's question. 
        If the context doesn't contain enough information to answer the question, 
        say so. Always cite the source documents when possible.

        Context:
        {context_text}

        Question: {query}

        Answer:"""
        
        response = self.openai_client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=500,
            temperature=0.3
        )
        
        return response.choices[0].message.content
    
    def query(self, question: str, n_results: int = 5) -> Dict:
        """Complete RAG pipeline: retrieve context and generate response"""
        # Retrieve relevant context
        context = self.retrieve_context(question, n_results)
        
        # Generate response
        answer = self.generate_response(question, context)
        
        return {
            "question": question,
            "answer": answer,
            "context": context,
            "sources": [result.source for result in context]
        }

Search Strategies

Semantic Search

  • • Vector similarity using cosine distance
  • • Understanding of context and meaning
  • • Handles synonyms and related concepts
  • • Language-agnostic with multilingual models
  • • Best for conceptual queries

Hybrid Search

  • • Combines semantic and keyword search
  • • Configurable weights for each approach
  • • Reranking based on multiple factors
  • • Better precision and recall
  • • Handles both exact and fuzzy matching

Data Flow Diagram

Configuration Example

# rag-config.yaml
vector_search:
  embedding_model:
    name: "all-MiniLM-L6-v2"
    max_length: 512
    device: "cpu"  # or "cuda" for GPU
  
  vector_database:
    type: "chromadb"
    persist_directory: "./vector_db"
    collection_name: "documents"
    similarity_metric: "cosine"
    index_type: "hnsw"
    m: 16  # HNSW parameter
    ef_construction: 200
  
  search:
    default_results: 5
    max_results: 20
    similarity_threshold: 0.7
    chunk_size: 512
    overlap: 50

hybrid_search:
  enabled: true
  semantic_weight: 0.7
  keyword_weight: 0.3
  reranking:
    enabled: true
    algorithm: "reciprocal_rank_fusion"
    max_candidates: 20

response_generation:
  model: "gpt-4"
  max_tokens: 500
  temperature: 0.3
  system_prompt: "You are a helpful assistant..."
  
  context_assembly:
    max_context_length: 4000
    include_metadata: true
    include_similarity_scores: true
    source_attribution: true

monitoring:
  logging:
    level: "info"
    format: "json"
    file: "./logs/rag.log"
  
  metrics:
    track_queries: true
    track_response_times: true
    track_similarity_scores: true
    track_user_feedback: true
  
  alerts:
    high_latency_threshold: 5000  # ms
    low_similarity_threshold: 0.5
    error_rate_threshold: 0.05

Performance Optimization

Indexing Strategies

# Efficient indexing with batching
def batch_index_documents(documents: List[str], batch_size: int = 100):
    """Index documents in batches for better performance"""
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        batch_metadata = metadata[i:i + batch_size]
        batch_ids = [f"doc_{j}" for j in range(i, i + len(batch))]
        
        # Process batch
        vector_db.add_documents(batch, batch_metadata, batch_ids)
        
        # Progress update
        print(f"Indexed {min(i + batch_size, len(documents))}/{len(documents)} documents")

# Parallel processing for large datasets
from concurrent.futures import ThreadPoolExecutor
import threading

def parallel_embedding_generation(documents: List[str], max_workers: int = 4):
    """Generate embeddings in parallel"""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Split documents into chunks
        chunk_size = len(documents) // max_workers
        chunks = [documents[i:i + chunk_size] for i in range(0, len(documents), chunk_size)]
        
        # Process chunks in parallel
        futures = [executor.submit(generate_chunk_embeddings, chunk) for chunk in chunks]
        results = [future.result() for future in futures]
        
        return np.concatenate(results)

Caching Strategies

import redis
from functools import lru_cache
import hashlib

class CachedVectorSearchRAG:
    def __init__(self, vector_db: VectorDatabase, redis_url: str = "redis://localhost:6379"):
        self.vector_db = vector_db
        self.redis_client = redis.from_url(redis_url)
        self.cache_ttl = 3600  # 1 hour
    
    @lru_cache(maxsize=1000)
    def cached_embedding(self, text: str) -> List[float]:
        """Cache embeddings in memory"""
        return self.generate_embedding(text)
    
    def cached_search(self, query: str, n_results: int = 5) -> Dict:
        """Cache search results in Redis"""
        cache_key = self.generate_cache_key(query, n_results)
        
        # Try to get from cache
        cached_result = self.redis_client.get(cache_key)
        if cached_result:
            return json.loads(cached_result)
        
        # Perform search
        result = self.vector_db.search(query, n_results)
        
        # Cache result
        self.redis_client.setex(
            cache_key, 
            self.cache_ttl, 
            json.dumps(result)
        )
        
        return result
    
    def generate_cache_key(self, query: str, n_results: int) -> str:
        """Generate unique cache key"""
        content = f"{query}:{n_results}"
        return f"rag_search:{hashlib.md5(content.encode()).hexdigest()}"

Common Use Cases

Document Search & Q&A

Enable users to ask natural language questions about large document collections, with the system retrieving relevant context and generating accurate answers.

Knowledge Base Search

Power internal knowledge management systems with semantic search capabilities, helping employees find relevant information quickly and accurately.

Content Recommendation

Suggest related articles, products, or content based on semantic similarity, improving user engagement and discovery.