Vector databases have emerged as critical infrastructure for modern AI applications, powering everything from semantic search to recommendation systems. As AI models generate increasingly complex embeddings, the need for efficient vector similarity search has become paramount. This post explores the architecture, implementation, and operational considerations for deploying vector databases at scale.

Understanding Vector Embeddings

Vector embeddings transform unstructured data (text, images, audio) into high-dimensional numerical representations that capture semantic meaning:

from sentence_transformers import SentenceTransformer
import numpy as np

# Generate embeddings for text
model = SentenceTransformer('all-MiniLM-L6-v2')

texts = [
    "Machine learning powers modern AI applications",
    "Deep neural networks require significant computational resources",
    "Pizza is a popular Italian food"
]

embeddings = model.encode(texts)
print(f"Embedding shape: {embeddings.shape}")  # (3, 384)

# Vectors close in embedding space are semantically similar
def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

sim_0_1 = cosine_similarity(embeddings[0], embeddings[1])
sim_0_2 = cosine_similarity(embeddings[0], embeddings[2])

print(f"Similarity (ML texts): {sim_0_1:.3f}")  # ~0.65
print(f"Similarity (ML vs Pizza): {sim_0_2:.3f}")  # ~0.15

Core Vector Database Operations

Vector databases must efficiently support three primary operations:

1. Insertion

from typing import List, Dict
import numpy as np

class VectorDatabase:
    def __init__(self, dimension: int):
        self.dimension = dimension
        self.vectors = []
        self.metadata = []
        self.index = None

    def insert(self, vector: np.ndarray, metadata: Dict):
        """Insert vector with associated metadata"""
        if vector.shape[0] != self.dimension:
            raise ValueError(f"Expected dimension {self.dimension}, got {vector.shape[0]}")

        # Normalize for cosine similarity
        normalized = vector / np.linalg.norm(vector)

        self.vectors.append(normalized)
        self.metadata.append(metadata)

        # Mark index as stale
        if self.index is not None:
            self.index.needs_rebuild = True

    def batch_insert(self, vectors: List[np.ndarray], metadata_list: List[Dict]):
        """Efficient batch insertion"""
        vectors_array = np.array(vectors)

        # Normalize all vectors at once
        norms = np.linalg.norm(vectors_array, axis=1, keepdims=True)
        normalized = vectors_array / norms

        self.vectors.extend(normalized)
        self.metadata.extend(metadata_list)
def search(
    self,
    query_vector: np.ndarray,
    k: int = 10,
    filter_fn: Optional[Callable] = None
) -> List[Dict]:
    """Find k nearest neighbors"""

    # Normalize query
    query_norm = query_vector / np.linalg.norm(query_vector)

    # Compute similarities
    vectors_array = np.array(self.vectors)
    similarities = np.dot(vectors_array, query_norm)

    # Apply metadata filtering if specified
    if filter_fn:
        valid_indices = [
            i for i, meta in enumerate(self.metadata)
            if filter_fn(meta)
        ]
        similarities_filtered = similarities[valid_indices]
        indices_filtered = np.array(valid_indices)
    else:
        similarities_filtered = similarities
        indices_filtered = np.arange(len(similarities))

    # Get top k
    top_k_positions = np.argpartition(similarities_filtered, -k)[-k:]
    top_k_positions = top_k_positions[np.argsort(-similarities_filtered[top_k_positions])]

    results = []
    for pos in top_k_positions:
        idx = indices_filtered[pos]
        results.append({
            'metadata': self.metadata[idx],
            'similarity': float(similarities[idx]),
            'vector': self.vectors[idx]
        })

    return results

3. Updates and Deletions

def update(self, doc_id: str, new_vector: np.ndarray, new_metadata: Dict):
    """Update existing vector"""
    # Find document
    idx = next(
        (i for i, meta in enumerate(self.metadata) if meta.get('id') == doc_id),
        None
    )

    if idx is None:
        raise ValueError(f"Document {doc_id} not found")

    # Update in place
    normalized = new_vector / np.linalg.norm(new_vector)
    self.vectors[idx] = normalized
    self.metadata[idx] = new_metadata

    if self.index:
        self.index.needs_rebuild = True

def delete(self, doc_id: str):
    """Delete vector by document ID"""
    idx = next(
        (i for i, meta in enumerate(self.metadata) if meta.get('id') == doc_id),
        None
    )

    if idx is not None:
        del self.vectors[idx]
        del self.metadata[idx]
        if self.index:
            self.index.needs_rebuild = True

Indexing Strategies for Performance

Brute force search scales O(n×d) where n is the number of vectors and d is dimensionality. For production systems with millions of vectors, approximate nearest neighbor (ANN) indexes are essential.

Hierarchical Navigable Small World (HNSW)

HNSW builds a multi-layer graph structure for efficient search:

import hnswlib

class HNSWVectorDatabase:
    def __init__(self, dimension: int, max_elements: int = 1000000):
        self.dimension = dimension
        self.max_elements = max_elements

        # Initialize HNSW index
        self.index = hnswlib.Index(space='cosine', dim=dimension)
        self.index.init_index(
            max_elements=max_elements,
            ef_construction=200,  # Higher = better quality, slower build
            M=16  # Number of bidirectional links per node
        )

        self.metadata = {}
        self.current_id = 0

    def insert(self, vector: np.ndarray, metadata: Dict) -> int:
        """Insert vector and return assigned ID"""
        doc_id = self.current_id
        self.index.add_items(vector.reshape(1, -1), np.array([doc_id]))
        self.metadata[doc_id] = metadata
        self.current_id += 1
        return doc_id

    def batch_insert(self, vectors: np.ndarray, metadata_list: List[Dict]) -> List[int]:
        """Efficient batch insertion"""
        start_id = self.current_id
        ids = np.arange(start_id, start_id + len(vectors))

        self.index.add_items(vectors, ids)

        for doc_id, meta in zip(ids, metadata_list):
            self.metadata[doc_id] = meta

        self.current_id += len(vectors)
        return ids.tolist()

    def search(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
        """Search with HNSW index"""
        # Set search quality parameter
        self.index.set_ef(max(k * 2, 50))  # Higher = better recall, slower

        labels, distances = self.index.knn_query(query_vector.reshape(1, -1), k=k)

        results = []
        for label, distance in zip(labels[0], distances[0]):
            results.append({
                'id': int(label),
                'metadata': self.metadata[label],
                'similarity': 1 - distance  # Convert distance to similarity
            })

        return results

Inverted File Index (IVF)

IVF partitions the vector space into clusters for faster search:

import faiss
import numpy as np

class IVFVectorDatabase:
    def __init__(self, dimension: int, n_clusters: int = 100):
        self.dimension = dimension
        self.n_clusters = n_clusters

        # Create quantizer and index
        quantizer = faiss.IndexFlatL2(dimension)
        self.index = faiss.IndexIVFFlat(
            quantizer,
            dimension,
            n_clusters,
            faiss.METRIC_INNER_PRODUCT
        )

        self.is_trained = False
        self.metadata = []

    def train(self, training_vectors: np.ndarray):
        """Train IVF clusters on representative data"""
        # Normalize for cosine similarity
        faiss.normalize_L2(training_vectors)

        self.index.train(training_vectors)
        self.is_trained = True

    def insert(self, vectors: np.ndarray, metadata_list: List[Dict]):
        """Insert vectors (must be trained first)"""
        if not self.is_trained:
            # Auto-train on first batch
            self.train(vectors)

        # Normalize
        vectors_normalized = vectors.copy()
        faiss.normalize_L2(vectors_normalized)

        # Add to index
        self.index.add(vectors_normalized)
        self.metadata.extend(metadata_list)

    def search(self, query_vector: np.ndarray, k: int = 10, nprobe: int = 10) -> List[Dict]:
        """Search with IVF index

        nprobe: number of clusters to search (higher = better recall, slower)
        """
        self.index.nprobe = nprobe

        # Normalize query
        query_normalized = query_vector.copy().reshape(1, -1)
        faiss.normalize_L2(query_normalized)

        distances, indices = self.index.search(query_normalized, k)

        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx != -1:  # Valid result
                results.append({
                    'metadata': self.metadata[idx],
                    'similarity': float(distance)
                })

        return results

Production Architecture

A production vector database needs to handle:

  • High throughput: Millions of queries per second
  • Low latency: Sub-100ms p99 latency
  • Scalability: Billions of vectors
  • Durability: No data loss
  • Availability: 99.99% uptime

Sharding Strategy

import hashlib
from typing import List

class ShardedVectorDatabase:
    def __init__(self, n_shards: int, dimension: int):
        self.n_shards = n_shards
        self.shards = [
            HNSWVectorDatabase(dimension)
            for _ in range(n_shards)
        ]

    def _get_shard(self, doc_id: str) -> int:
        """Consistent hashing for shard assignment"""
        hash_val = int(hashlib.md5(doc_id.encode()).hexdigest(), 16)
        return hash_val % self.n_shards

    def insert(self, doc_id: str, vector: np.ndarray, metadata: Dict):
        """Route to appropriate shard"""
        shard_id = self._get_shard(doc_id)
        metadata['doc_id'] = doc_id
        self.shards[shard_id].insert(vector, metadata)

    async def search(self, query_vector: np.ndarray, k: int = 10) -> List[Dict]:
        """Parallel search across all shards"""
        import asyncio

        # Search all shards in parallel
        tasks = [
            self._search_shard(shard, query_vector, k)
            for shard in self.shards
        ]

        shard_results = await asyncio.gather(*tasks)

        # Merge and re-rank
        all_results = []
        for results in shard_results:
            all_results.extend(results)

        # Sort by similarity and return top k
        all_results.sort(key=lambda x: x['similarity'], reverse=True)
        return all_results[:k]

    async def _search_shard(self, shard, query_vector, k):
        """Search single shard"""
        return await asyncio.to_thread(shard.search, query_vector, k * 2)

Replication for High Availability

class ReplicatedVectorDatabase:
    def __init__(self, primary: VectorDatabase, replicas: List[VectorDatabase]):
        self.primary = primary
        self.replicas = replicas

    async def insert(self, vector: np.ndarray, metadata: Dict):
        """Write to primary and replicate asynchronously"""
        # Synchronous write to primary
        doc_id = self.primary.insert(vector, metadata)

        # Asynchronous replication
        asyncio.create_task(self._replicate(vector, metadata))

        return doc_id

    async def _replicate(self, vector: np.ndarray, metadata: Dict):
        """Replicate to all replicas"""
        tasks = [
            asyncio.to_thread(replica.insert, vector, metadata)
            for replica in self.replicas
        ]
        await asyncio.gather(*tasks, return_exceptions=True)

    async def search(self, query_vector: np.ndarray, k: int = 10):
        """Read from any available replica"""
        # Try primary first
        try:
            return await asyncio.wait_for(
                asyncio.to_thread(self.primary.search, query_vector, k),
                timeout=0.1
            )
        except asyncio.TimeoutError:
            # Fallback to replica
            for replica in self.replicas:
                try:
                    return await asyncio.to_thread(replica.search, query_vector, k)
                except Exception:
                    continue

            raise Exception("All replicas unavailable")
from typing import Set
from rank_bm25 import BM25Okapi

class HybridSearchDatabase:
    def __init__(self, dimension: int):
        self.vector_db = HNSWVectorDatabase(dimension)
        self.documents = []
        self.bm25 = None

    def insert(self, text: str, vector: np.ndarray, metadata: Dict):
        """Insert with both text and vector"""
        doc_id = len(self.documents)
        metadata['id'] = doc_id
        metadata['text'] = text

        # Vector index
        self.vector_db.insert(vector, metadata)

        # Text index
        self.documents.append(text)
        self._rebuild_bm25()

    def _rebuild_bm25(self):
        """Rebuild BM25 index"""
        tokenized = [doc.lower().split() for doc in self.documents]
        self.bm25 = BM25Okapi(tokenized)

    def hybrid_search(
        self,
        query_text: str,
        query_vector: np.ndarray,
        k: int = 10,
        alpha: float = 0.5
    ) -> List[Dict]:
        """
        Hybrid search combining semantic and keyword matching

        alpha: weight for vector search (1-alpha for keyword search)
        """
        # Vector search
        vector_results = self.vector_db.search(query_vector, k=k*2)

        # Keyword search
        tokenized_query = query_text.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)

        # Normalize scores to [0, 1]
        vector_scores = {r['id']: r['similarity'] for r in vector_results}
        max_bm25 = max(bm25_scores) if max(bm25_scores) > 0 else 1
        bm25_normalized = {i: score/max_bm25 for i, score in enumerate(bm25_scores)}

        # Combine scores
        combined_scores = {}
        all_ids = set(vector_scores.keys()) | set(bm25_normalized.keys())

        for doc_id in all_ids:
            vec_score = vector_scores.get(doc_id, 0)
            bm25_score = bm25_normalized.get(doc_id, 0)
            combined_scores[doc_id] = alpha * vec_score + (1 - alpha) * bm25_score

        # Sort and return top k
        sorted_ids = sorted(
            combined_scores.keys(),
            key=lambda x: combined_scores[x],
            reverse=True
        )[:k]

        results = []
        for doc_id in sorted_ids:
            results.append({
                'id': doc_id,
                'text': self.documents[doc_id],
                'score': combined_scores[doc_id],
                'vector_score': vector_scores.get(doc_id, 0),
                'keyword_score': bm25_normalized.get(doc_id, 0)
            })

        return results

Performance Optimization

Dimensionality Reduction

Reduce embedding dimensions while preserving information:

from sklearn.decomposition import PCA

class DimensionalityReducer:
    def __init__(self, original_dim: int, reduced_dim: int):
        self.pca = PCA(n_components=reduced_dim)
        self.is_fitted = False

    def fit(self, vectors: np.ndarray):
        """Fit PCA on representative sample"""
        self.pca.fit(vectors)
        self.is_fitted = True

        variance_retained = sum(self.pca.explained_variance_ratio_)
        print(f"Variance retained: {variance_retained:.2%}")

    def transform(self, vectors: np.ndarray) -> np.ndarray:
        """Reduce dimensionality"""
        if not self.is_fitted:
            raise ValueError("Must fit before transform")
        return self.pca.transform(vectors)

# Example: 768 -> 256 dimensions
reducer = DimensionalityReducer(768, 256)
reducer.fit(sample_vectors)
reduced = reducer.transform(all_vectors)  # 3x smaller, similar accuracy

Quantization

Reduce memory footprint with scalar quantization:

class QuantizedVectorDB:
    def __init__(self, dimension: int, n_bits: int = 8):
        self.dimension = dimension
        self.n_bits = n_bits
        self.vectors_quantized = []
        self.scale_factors = []

    def quantize(self, vector: np.ndarray) -> tuple[np.ndarray, float]:
        """Quantize float32 to int8"""
        max_val = np.abs(vector).max()
        scale = max_val / (2 ** (self.n_bits - 1) - 1)

        quantized = np.round(vector / scale).astype(np.int8)
        return quantized, scale

    def dequantize(self, quantized: np.ndarray, scale: float) -> np.ndarray:
        """Dequantize int8 to float32"""
        return quantized.astype(np.float32) * scale

    def insert(self, vector: np.ndarray):
        """Insert quantized vector"""
        quantized, scale = self.quantize(vector)
        self.vectors_quantized.append(quantized)
        self.scale_factors.append(scale)

        # Memory savings: 4x reduction (float32 -> int8)

Conclusion

Vector databases are foundational infrastructure for AI applications. Key takeaways:

  1. Choose the right index - HNSW for general use, IVF for billion-scale
  2. Shard and replicate - horizontal scaling and high availability
  3. Hybrid search - combine semantic and keyword matching
  4. Optimize dimensions - PCA and quantization reduce costs
  5. Monitor performance - latency, recall, and throughput metrics

As AI models continue to evolve, vector databases will become increasingly critical. Understanding their internals and operational characteristics is essential for building scalable, reliable AI applications.