Vector databases have emerged as critical infrastructure for modern AI applications, powering everything from semantic search to recommendation systems. As AI models generate increasingly complex embeddings, the need for efficient vector similarity search has become paramount. This post explores the architecture, implementation, and operational considerations for deploying vector databases at scale.
Understanding Vector Embeddings
Vector embeddings transform unstructured data (text, images, audio) into high-dimensional numerical representations that capture semantic meaning:
from sentence_transformers import SentenceTransformer
import numpy as np
# Generate embeddings for text
model = SentenceTransformer('all-MiniLM-L6-v2')
texts = [
"Machine learning powers modern AI applications",
"Deep neural networks require significant computational resources",
"Pizza is a popular Italian food"
]
embeddings = model.encode(texts)
print(f"Embedding shape: {embeddings.shape}") # (3, 384)
# Vectors close in embedding space are semantically similar
def cosine_similarity(v1, v2):
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
sim_0_1 = cosine_similarity(embeddings[0], embeddings[1])
sim_0_2 = cosine_similarity(embeddings[0], embeddings[2])
print(f"Similarity (ML texts): {sim_0_1:.3f}") # ~0.65
print(f"Similarity (ML vs Pizza): {sim_0_2:.3f}") # ~0.15
Core Vector Database Operations
Vector databases must efficiently support three primary operations:
1. Insertion
from typing import List, Dict
import numpy as np
class VectorDatabase:
def __init__(self, dimension: int):
self.dimension = dimension
self.vectors = []
self.metadata = []
self.index = None
def insert(self, vector: np.ndarray, metadata: Dict):
"""Insert vector with associated metadata"""
if vector.shape[0] != self.dimension:
raise ValueError(f"Expected dimension {self.dimension}, got {vector.shape[0]}")
# Normalize for cosine similarity
normalized = vector / np.linalg.norm(vector)
self.vectors.append(normalized)
self.metadata.append(metadata)
# Mark index as stale
if self.index is not None:
self.index.needs_rebuild = True
def batch_insert(self, vectors: List[np.ndarray], metadata_list: List[Dict]):
"""Efficient batch insertion"""
vectors_array = np.array(vectors)
# Normalize all vectors at once
norms = np.linalg.norm(vectors_array, axis=1, keepdims=True)
normalized = vectors_array / norms
self.vectors.extend(normalized)
self.metadata.extend(metadata_list)
2. Similarity Search
def search(
self,
query_vector: np.ndarray,
k: int = 10,
filter_fn: Optional[Callable] = None
) -> List[Dict]:
"""Find k nearest neighbors"""
# Normalize query
query_norm = query_vector / np.linalg.norm(query_vector)
# Compute similarities
vectors_array = np.array(self.vectors)
similarities = np.dot(vectors_array, query_norm)
# Apply metadata filtering if specified
if filter_fn:
valid_indices = [
i for i, meta in enumerate(self.metadata)
if filter_fn(meta)
]
similarities_filtered = similarities[valid_indices]
indices_filtered = np.array(valid_indices)
else:
similarities_filtered = similarities
indices_filtered = np.arange(len(similarities))
# Get top k
top_k_positions = np.argpartition(similarities_filtered, -k)[-k:]
top_k_positions = top_k_positions[np.argsort(-similarities_filtered[top_k_positions])]
results = []
for pos in top_k_positions:
idx = indices_filtered[pos]
results.append({
'metadata': self.metadata[idx],
'similarity': float(similarities[idx]),
'vector': self.vectors[idx]
})
return results
3. Updates and Deletions
def update(self, doc_id: str, new_vector: np.ndarray, new_metadata: Dict):
"""Update existing vector"""
# Find document
idx = next(
(i for i, meta in enumerate(self.metadata) if meta.get('id') == doc_id),
None
)
if idx is None:
raise ValueError(f"Document {doc_id} not found")
# Update in place
normalized = new_vector / np.linalg.norm(new_vector)
self.vectors[idx] = normalized
self.metadata[idx] = new_metadata
if self.index:
self.index.needs_rebuild = True
def delete(self, doc_id: str):
"""Delete vector by document ID"""
idx = next(
(i for i, meta in enumerate(self.metadata) if meta.get('id') == doc_id),
None
)
if idx is not None:
del self.vectors[idx]
del self.metadata[idx]
if self.index:
self.index.needs_rebuild = True
Indexing Strategies for Performance
Brute force search scales O(n×d) where n is the number of vectors and d is dimensionality. For production systems with millions of vectors, approximate nearest neighbor (ANN) indexes are essential.
Hierarchical Navigable Small World (HNSW)
HNSW builds a multi-layer graph structure for efficient search:
import hnswlib
class HNSWVectorDatabase:
def __init__(self, dimension: int, max_elements: int = 1000000):
self.dimension = dimension
self.max_elements = max_elements
# Initialize HNSW index
self.index = hnswlib.Index(space='cosine', dim=dimension)
self.index.init_index(
max_elements=max_elements,
ef_construction=200, # Higher = better quality, slower build
M=16 # Number of bidirectional links per node
)
self.metadata = {}
self.current_id = 0
def insert(self, vector: np.ndarray, metadata: Dict) -> int:
"""Insert vector and return assigned ID"""
doc_id = self.current_id
self.index.add_items(vector.reshape(1, -1), np.array([doc_id]))
self.metadata[doc_id] = metadata
self.current_id += 1
return doc_id
def batch_insert(self, vectors: np.ndarray, metadata_list: List[Dict]) -> List[int]:
"""Efficient batch insertion"""
start_id = self.current_id
ids = np.arange(start_id, start_id + len(vectors))
self.index.add_items(vectors, ids)
for doc_id, meta in zip(ids, metadata_list):
self.metadata[doc_id] = meta
self.current_id += len(vectors)
return ids.tolist()
def search(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
"""Search with HNSW index"""
# Set search quality parameter
self.index.set_ef(max(k * 2, 50)) # Higher = better recall, slower
labels, distances = self.index.knn_query(query_vector.reshape(1, -1), k=k)
results = []
for label, distance in zip(labels[0], distances[0]):
results.append({
'id': int(label),
'metadata': self.metadata[label],
'similarity': 1 - distance # Convert distance to similarity
})
return results
Inverted File Index (IVF)
IVF partitions the vector space into clusters for faster search:
import faiss
import numpy as np
class IVFVectorDatabase:
def __init__(self, dimension: int, n_clusters: int = 100):
self.dimension = dimension
self.n_clusters = n_clusters
# Create quantizer and index
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(
quantizer,
dimension,
n_clusters,
faiss.METRIC_INNER_PRODUCT
)
self.is_trained = False
self.metadata = []
def train(self, training_vectors: np.ndarray):
"""Train IVF clusters on representative data"""
# Normalize for cosine similarity
faiss.normalize_L2(training_vectors)
self.index.train(training_vectors)
self.is_trained = True
def insert(self, vectors: np.ndarray, metadata_list: List[Dict]):
"""Insert vectors (must be trained first)"""
if not self.is_trained:
# Auto-train on first batch
self.train(vectors)
# Normalize
vectors_normalized = vectors.copy()
faiss.normalize_L2(vectors_normalized)
# Add to index
self.index.add(vectors_normalized)
self.metadata.extend(metadata_list)
def search(self, query_vector: np.ndarray, k: int = 10, nprobe: int = 10) -> List[Dict]:
"""Search with IVF index
nprobe: number of clusters to search (higher = better recall, slower)
"""
self.index.nprobe = nprobe
# Normalize query
query_normalized = query_vector.copy().reshape(1, -1)
faiss.normalize_L2(query_normalized)
distances, indices = self.index.search(query_normalized, k)
results = []
for idx, distance in zip(indices[0], distances[0]):
if idx != -1: # Valid result
results.append({
'metadata': self.metadata[idx],
'similarity': float(distance)
})
return results
Production Architecture
A production vector database needs to handle:
- High throughput: Millions of queries per second
- Low latency: Sub-100ms p99 latency
- Scalability: Billions of vectors
- Durability: No data loss
- Availability: 99.99% uptime
Sharding Strategy
import hashlib
from typing import List
class ShardedVectorDatabase:
def __init__(self, n_shards: int, dimension: int):
self.n_shards = n_shards
self.shards = [
HNSWVectorDatabase(dimension)
for _ in range(n_shards)
]
def _get_shard(self, doc_id: str) -> int:
"""Consistent hashing for shard assignment"""
hash_val = int(hashlib.md5(doc_id.encode()).hexdigest(), 16)
return hash_val % self.n_shards
def insert(self, doc_id: str, vector: np.ndarray, metadata: Dict):
"""Route to appropriate shard"""
shard_id = self._get_shard(doc_id)
metadata['doc_id'] = doc_id
self.shards[shard_id].insert(vector, metadata)
async def search(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
"""Parallel search across all shards"""
import asyncio
# Search all shards in parallel
tasks = [
self._search_shard(shard, query_vector, k)
for shard in self.shards
]
shard_results = await asyncio.gather(*tasks)
# Merge and re-rank
all_results = []
for results in shard_results:
all_results.extend(results)
# Sort by similarity and return top k
all_results.sort(key=lambda x: x['similarity'], reverse=True)
return all_results[:k]
async def _search_shard(self, shard, query_vector, k):
"""Search single shard"""
return await asyncio.to_thread(shard.search, query_vector, k * 2)
Replication for High Availability
class ReplicatedVectorDatabase:
def __init__(self, primary: VectorDatabase, replicas: List[VectorDatabase]):
self.primary = primary
self.replicas = replicas
async def insert(self, vector: np.ndarray, metadata: Dict):
"""Write to primary and replicate asynchronously"""
# Synchronous write to primary
doc_id = self.primary.insert(vector, metadata)
# Asynchronous replication
asyncio.create_task(self._replicate(vector, metadata))
return doc_id
async def _replicate(self, vector: np.ndarray, metadata: Dict):
"""Replicate to all replicas"""
tasks = [
asyncio.to_thread(replica.insert, vector, metadata)
for replica in self.replicas
]
await asyncio.gather(*tasks, return_exceptions=True)
async def search(self, query_vector: np.ndarray, k: int = 10):
"""Read from any available replica"""
# Try primary first
try:
return await asyncio.wait_for(
asyncio.to_thread(self.primary.search, query_vector, k),
timeout=0.1
)
except asyncio.TimeoutError:
# Fallback to replica
for replica in self.replicas:
try:
return await asyncio.to_thread(replica.search, query_vector, k)
except Exception:
continue
raise Exception("All replicas unavailable")
Hybrid Search: Combining Vector and Keyword Search
from typing import Set
from rank_bm25 import BM25Okapi
class HybridSearchDatabase:
def __init__(self, dimension: int):
self.vector_db = HNSWVectorDatabase(dimension)
self.documents = []
self.bm25 = None
def insert(self, text: str, vector: np.ndarray, metadata: Dict):
"""Insert with both text and vector"""
doc_id = len(self.documents)
metadata['id'] = doc_id
metadata['text'] = text
# Vector index
self.vector_db.insert(vector, metadata)
# Text index
self.documents.append(text)
self._rebuild_bm25()
def _rebuild_bm25(self):
"""Rebuild BM25 index"""
tokenized = [doc.lower().split() for doc in self.documents]
self.bm25 = BM25Okapi(tokenized)
def hybrid_search(
self,
query_text: str,
query_vector: np.ndarray,
k: int = 10,
alpha: float = 0.5
) -> List[Dict]:
"""
Hybrid search combining semantic and keyword matching
alpha: weight for vector search (1-alpha for keyword search)
"""
# Vector search
vector_results = self.vector_db.search(query_vector, k=k*2)
# Keyword search
tokenized_query = query_text.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# Normalize scores to [0, 1]
vector_scores = {r['id']: r['similarity'] for r in vector_results}
max_bm25 = max(bm25_scores) if max(bm25_scores) > 0 else 1
bm25_normalized = {i: score/max_bm25 for i, score in enumerate(bm25_scores)}
# Combine scores
combined_scores = {}
all_ids = set(vector_scores.keys()) | set(bm25_normalized.keys())
for doc_id in all_ids:
vec_score = vector_scores.get(doc_id, 0)
bm25_score = bm25_normalized.get(doc_id, 0)
combined_scores[doc_id] = alpha * vec_score + (1 - alpha) * bm25_score
# Sort and return top k
sorted_ids = sorted(
combined_scores.keys(),
key=lambda x: combined_scores[x],
reverse=True
)[:k]
results = []
for doc_id in sorted_ids:
results.append({
'id': doc_id,
'text': self.documents[doc_id],
'score': combined_scores[doc_id],
'vector_score': vector_scores.get(doc_id, 0),
'keyword_score': bm25_normalized.get(doc_id, 0)
})
return results
Performance Optimization
Dimensionality Reduction
Reduce embedding dimensions while preserving information:
from sklearn.decomposition import PCA
class DimensionalityReducer:
def __init__(self, original_dim: int, reduced_dim: int):
self.pca = PCA(n_components=reduced_dim)
self.is_fitted = False
def fit(self, vectors: np.ndarray):
"""Fit PCA on representative sample"""
self.pca.fit(vectors)
self.is_fitted = True
variance_retained = sum(self.pca.explained_variance_ratio_)
print(f"Variance retained: {variance_retained:.2%}")
def transform(self, vectors: np.ndarray) -> np.ndarray:
"""Reduce dimensionality"""
if not self.is_fitted:
raise ValueError("Must fit before transform")
return self.pca.transform(vectors)
# Example: 768 -> 256 dimensions
reducer = DimensionalityReducer(768, 256)
reducer.fit(sample_vectors)
reduced = reducer.transform(all_vectors) # 3x smaller, similar accuracy
Quantization
Reduce memory footprint with scalar quantization:
class QuantizedVectorDB:
def __init__(self, dimension: int, n_bits: int = 8):
self.dimension = dimension
self.n_bits = n_bits
self.vectors_quantized = []
self.scale_factors = []
def quantize(self, vector: np.ndarray) -> tuple[np.ndarray, float]:
"""Quantize float32 to int8"""
max_val = np.abs(vector).max()
scale = max_val / (2 ** (self.n_bits - 1) - 1)
quantized = np.round(vector / scale).astype(np.int8)
return quantized, scale
def dequantize(self, quantized: np.ndarray, scale: float) -> np.ndarray:
"""Dequantize int8 to float32"""
return quantized.astype(np.float32) * scale
def insert(self, vector: np.ndarray):
"""Insert quantized vector"""
quantized, scale = self.quantize(vector)
self.vectors_quantized.append(quantized)
self.scale_factors.append(scale)
# Memory savings: 4x reduction (float32 -> int8)
Conclusion
Vector databases are foundational infrastructure for AI applications. Key takeaways:
- Choose the right index - HNSW for general use, IVF for billion-scale
- Shard and replicate - horizontal scaling and high availability
- Hybrid search - combine semantic and keyword matching
- Optimize dimensions - PCA and quantization reduce costs
- Monitor performance - latency, recall, and throughput metrics
As AI models continue to evolve, vector databases will become increasingly critical. Understanding their internals and operational characteristics is essential for building scalable, reliable AI applications.