Vector Search RAG
Implement semantic search using vector embeddings for intelligent document retrieval
Overview
Vector Search RAG combines the power of semantic understanding with traditional information retrieval to provide highly relevant search results. By converting text into high-dimensional vectors (embeddings), this approach can find documents that are semantically similar even when they don't share exact keywords.
This pattern is particularly effective for natural language queries, multilingual search, and finding related content across large document collections. It addresses the limitations of keyword-based search by understanding context and meaning.
Architecture Overview
Implementation Guide
1. Document Embedding Generation
import openai
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict
class DocumentEmbedder:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.openai_client = openai.OpenAI()
def generate_embeddings(self, documents: List[str]) -> np.ndarray:
"""Generate embeddings for a list of documents"""
return self.model.encode(documents, show_progress_bar=True)
def generate_openai_embeddings(self, documents: List[str]) -> List[List[float]]:
"""Generate embeddings using OpenAI's API"""
embeddings = []
for doc in documents:
response = self.openai_client.embeddings.create(
model="text-embedding-ada-002",
input=doc
)
embeddings.append(response.data[0].embedding)
return embeddings
def chunk_documents(self, documents: List[str], chunk_size: int = 512) -> List[str]:
"""Split documents into smaller chunks for better embedding"""
chunks = []
for doc in documents:
words = doc.split()
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks
2. Vector Database Setup
import chromadb
from chromadb.config import Settings
import numpy as np
from typing import List, Dict, Optional
class VectorDatabase:
def __init__(self, persist_directory: str = "./chroma_db"):
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
self.collection = self.client.get_or_create_collection(
name="documents",
metadata={"hnsw:space": "cosine"}
)
def add_documents(self, documents: List[str], metadata: List[Dict], ids: List[str]):
"""Add documents to the vector database"""
# Generate embeddings for documents
embedder = DocumentEmbedder()
embeddings = embedder.generate_embeddings(documents)
# Add to collection
self.collection.add(
embeddings=embeddings.tolist(),
documents=documents,
metadatas=metadata,
ids=ids
)
def search(self, query: str, n_results: int = 5) -> Dict:
"""Search for similar documents"""
embedder = DocumentEmbedder()
query_embedding = embedder.generate_embeddings([query])
results = self.collection.query(
query_embeddings=query_embedding.tolist(),
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
return results
def hybrid_search(self, query: str, n_results: int = 5,
semantic_weight: float = 0.7) -> Dict:
"""Combine semantic and keyword search"""
# Semantic search
semantic_results = self.search(query, n_results)
# Keyword search (simple implementation)
keyword_results = self.keyword_search(query, n_results)
# Combine and rerank results
combined_results = self.combine_results(
semantic_results, keyword_results, semantic_weight
)
return combined_results
3. RAG Pipeline Implementation
from typing import List, Dict
import openai
from dataclasses import dataclass
@dataclass
class SearchResult:
document: str
metadata: Dict
similarity_score: float
source: str
class VectorSearchRAG:
def __init__(self, vector_db: VectorDatabase, openai_api_key: str):
self.vector_db = vector_db
self.openai_client = openai.OpenAI(api_key=openai_api_key)
def retrieve_context(self, query: str, n_results: int = 5) -> List[SearchResult]:
"""Retrieve relevant context for a query"""
search_results = self.vector_db.search(query, n_results)
results = []
for i in range(len(search_results['documents'][0])):
result = SearchResult(
document=search_results['documents'][0][i],
metadata=search_results['metadatas'][0][i],
similarity_score=1 - search_results['distances'][0][i],
source=search_results['metadatas'][0][i].get('source', 'unknown')
)
results.append(result)
return results
def generate_response(self, query: str, context: List[SearchResult]) -> str:
"""Generate response using retrieved context"""
# Prepare context for the LLM
context_text = "\n\n".join([
f"Document {i+1} (Score: {result.similarity_score:.3f}):\n{result.document}"
for i, result in enumerate(context)
])
prompt = f"""Based on the following context, answer the user's question.
If the context doesn't contain enough information to answer the question,
say so. Always cite the source documents when possible.
Context:
{context_text}
Question: {query}
Answer:"""
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
max_tokens=500,
temperature=0.3
)
return response.choices[0].message.content
def query(self, question: str, n_results: int = 5) -> Dict:
"""Complete RAG pipeline: retrieve context and generate response"""
# Retrieve relevant context
context = self.retrieve_context(question, n_results)
# Generate response
answer = self.generate_response(question, context)
return {
"question": question,
"answer": answer,
"context": context,
"sources": [result.source for result in context]
}
Search Strategies
Semantic Search
- • Vector similarity using cosine distance
- • Understanding of context and meaning
- • Handles synonyms and related concepts
- • Language-agnostic with multilingual models
- • Best for conceptual queries
Hybrid Search
- • Combines semantic and keyword search
- • Configurable weights for each approach
- • Reranking based on multiple factors
- • Better precision and recall
- • Handles both exact and fuzzy matching
Data Flow Diagram
Configuration Example
# rag-config.yaml
vector_search:
embedding_model:
name: "all-MiniLM-L6-v2"
max_length: 512
device: "cpu" # or "cuda" for GPU
vector_database:
type: "chromadb"
persist_directory: "./vector_db"
collection_name: "documents"
similarity_metric: "cosine"
index_type: "hnsw"
m: 16 # HNSW parameter
ef_construction: 200
search:
default_results: 5
max_results: 20
similarity_threshold: 0.7
chunk_size: 512
overlap: 50
hybrid_search:
enabled: true
semantic_weight: 0.7
keyword_weight: 0.3
reranking:
enabled: true
algorithm: "reciprocal_rank_fusion"
max_candidates: 20
response_generation:
model: "gpt-4"
max_tokens: 500
temperature: 0.3
system_prompt: "You are a helpful assistant..."
context_assembly:
max_context_length: 4000
include_metadata: true
include_similarity_scores: true
source_attribution: true
monitoring:
logging:
level: "info"
format: "json"
file: "./logs/rag.log"
metrics:
track_queries: true
track_response_times: true
track_similarity_scores: true
track_user_feedback: true
alerts:
high_latency_threshold: 5000 # ms
low_similarity_threshold: 0.5
error_rate_threshold: 0.05
Performance Optimization
Indexing Strategies
# Efficient indexing with batching
def batch_index_documents(documents: List[str], batch_size: int = 100):
"""Index documents in batches for better performance"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
batch_metadata = metadata[i:i + batch_size]
batch_ids = [f"doc_{j}" for j in range(i, i + len(batch))]
# Process batch
vector_db.add_documents(batch, batch_metadata, batch_ids)
# Progress update
print(f"Indexed {min(i + batch_size, len(documents))}/{len(documents)} documents")
# Parallel processing for large datasets
from concurrent.futures import ThreadPoolExecutor
import threading
def parallel_embedding_generation(documents: List[str], max_workers: int = 4):
"""Generate embeddings in parallel"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Split documents into chunks
chunk_size = len(documents) // max_workers
chunks = [documents[i:i + chunk_size] for i in range(0, len(documents), chunk_size)]
# Process chunks in parallel
futures = [executor.submit(generate_chunk_embeddings, chunk) for chunk in chunks]
results = [future.result() for future in futures]
return np.concatenate(results)
Caching Strategies
import redis
from functools import lru_cache
import hashlib
class CachedVectorSearchRAG:
def __init__(self, vector_db: VectorDatabase, redis_url: str = "redis://localhost:6379"):
self.vector_db = vector_db
self.redis_client = redis.from_url(redis_url)
self.cache_ttl = 3600 # 1 hour
@lru_cache(maxsize=1000)
def cached_embedding(self, text: str) -> List[float]:
"""Cache embeddings in memory"""
return self.generate_embedding(text)
def cached_search(self, query: str, n_results: int = 5) -> Dict:
"""Cache search results in Redis"""
cache_key = self.generate_cache_key(query, n_results)
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Perform search
result = self.vector_db.search(query, n_results)
# Cache result
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(result)
)
return result
def generate_cache_key(self, query: str, n_results: int) -> str:
"""Generate unique cache key"""
content = f"{query}:{n_results}"
return f"rag_search:{hashlib.md5(content.encode()).hexdigest()}"
Common Use Cases
Document Search & Q&A
Enable users to ask natural language questions about large document collections, with the system retrieving relevant context and generating accurate answers.
Knowledge Base Search
Power internal knowledge management systems with semantic search capabilities, helping employees find relevant information quickly and accurately.
Content Recommendation
Suggest related articles, products, or content based on semantic similarity, improving user engagement and discovery.