Backend & DevOps Blog

Real-world experiences with MongoDB, Docker, Kubernetes and more

Scaling MongoDB with Horizontal Sharding

When our e-commerce platform reached 10 million products and 50 million monthly visitors, our MongoDB infrastructure began showing serious strain. Queries slowed down, writes caused blocking, and nightly aggregation jobs were taking over 8 hours to complete. After optimizing indexes and query patterns, we realized we needed a more fundamental solution: horizontal sharding. This is the story of how we successfully implemented MongoDB sharding, the challenges we faced, and the lessons we learned.

Understanding the Need for Sharding

Before diving into sharding, we explored several scaling options:

  1. Vertical scaling - We had already upgraded to the largest available instances, but were still experiencing performance issues
  2. Read replicas - We were using these, but they didn't help with write performance
  3. Data archiving - We implemented this for older data, but active data was still growing rapidly

Sharding became the clear choice when we realized:

  • Our product catalog was growing by over 100,000 items per week
  • Customer data and order history were growing linearly with our user base
  • Our analytics needs required keeping historical data accessible
  • Peak traffic was causing write operations to block reads

MongoDB Sharding Basics

For those unfamiliar, MongoDB sharding distributes data across multiple machines. A sharded cluster consists of:

  • Shard servers - Where the actual data is stored, each shard is a replica set
  • Config servers - Store metadata about the cluster's data distribution
  • Mongos routers - Query routers that direct operations to the appropriate shards

The data is partitioned using a shard key, which determines how records are distributed across shards.

Planning the Sharded Cluster

Our planning phase took several weeks, with careful consideration of:

1. Shard Key Selection

This was our most critical decision. After analyzing our workload, we identified several potential shard keys for our main collections:

// Products collection candidates
- product_id (high cardinality, but sequential)
- category + product_id (good distribution, common query pattern)
- vendor_id + created_at (good distribution, but not aligned with common queries)

// Orders collection candidates
- customer_id (potential for hotspots with very active customers)
- order_date + customer_id (better distribution, aligned with queries)
- region + order_date (good for regional queries, but uneven distribution)

// User events collection
- user_id + event_date (good distribution for active users)
- event_type + timestamp (better for analytics, uneven distribution)

After testing with our dataset and query patterns, we settled on:

  • Products: {category: 1, product_id: 1} - This aligned with our most common query patterns while providing good distribution
  • Orders: {order_date: 1, customer_id: 1} - Most queries included date ranges, and this prevented hotspots
  • User events: {event_date: 1, user_id: 1} - Optimized for our daily event processing jobs

Key Shard Key Insights

Choose shard keys that: (1) have high cardinality, (2) distribute writes evenly, and (3) support your most frequent queries without requiring scatter-gather operations across all shards.

2. Cluster Topology

We designed our initial cluster with:

  • 3 shards (each a 3-node replica set)
  • 3 config servers (replica set)
  • 2 mongos routers (with potential to add more)

This gave us redundancy within each shard and for the cluster metadata, while allowing us to handle our current load with room for growth.

3. Infrastructure Requirements

Based on our data growth projections and performance requirements, we calculated our resource needs:

// Per shard (3-node replica set):
- 16 CPU cores
- 64 GB RAM
- 2 TB SSD storage
- 10 Gbps network

// Config servers (3-node replica set):
- 4 CPU cores
- 16 GB RAM
- 100 GB SSD storage

// Mongos routers (2 instances):
- 8 CPU cores
- 32 GB RAM

We opted to deploy the entire infrastructure on AWS using EC2 instances, with separate VPC subnets for each component to control traffic flow.

Setting Up the Sharded Cluster

With our planning complete, we began the implementation. Here's a simplified version of our setup process:

1. Configuring the Config Server Replica Set

# Start the config server processes
mongod --configsvr --replSet configRS --dbpath /data/configdb --port 27019

# Connect to one config server and initiate the replica set
mongo --port 27019
> rs.initiate({
    _id: "configRS",
    configsvr: true,
    members: [
      { _id: 0, host: "config1:27019" },
      { _id: 1, host: "config2:27019" },
      { _id: 2, host: "config3:27019" }
    ]
  })

# Verify the config server replica set is healthy
> rs.status()

2. Setting Up Each Shard Replica Set

For each of our three shards, we followed this process:

# Start the shard members
mongod --shardsvr --replSet shard1RS --dbpath /data/shard1 --port 27018

# Initiate the replica set
mongo --port 27018
> rs.initiate({
    _id: "shard1RS",
    members: [
      { _id: 0, host: "shard1-node1:27018" },
      { _id: 1, host: "shard1-node2:27018" },
      { _id: 2, host: "shard1-node3:27018" }
    ]
  })

# Verify the shard replica set is healthy
> rs.status()

We repeated this for shard2RS and shard3RS.

3. Starting the Mongos Routers

# Start the mongos process, pointing to the config server replica set
mongos --configdb configRS/config1:27019,config2:27019,config3:27019 --port 27017

4. Adding Shards to the Cluster

# Connect to mongos
mongo --port 27017

# Add each shard to the cluster
> sh.addShard("shard1RS/shard1-node1:27018,shard1-node2:27018,shard1-node3:27018")
> sh.addShard("shard2RS/shard2-node1:27018,shard2-node2:27018,shard2-node3:27018")
> sh.addShard("shard3RS/shard3-node1:27018,shard3-node2:27018,shard3-node3:27018")

# Verify the shards were added
> sh.status()

5. Enabling Sharding for Databases and Collections

# Enable sharding for our database
> sh.enableSharding("ecommerce")

# Shard the products collection
> db.adminCommand({
    shardCollection: "ecommerce.products",
    key: { category: 1, product_id: 1 }
  })

# Shard the orders collection
> db.adminCommand({
    shardCollection: "ecommerce.orders",
    key: { order_date: 1, customer_id: 1 }
  })

# Shard the user events collection
> db.adminCommand({
    shardCollection: "ecommerce.events",
    key: { event_date: 1, user_id: 1 }
  })

Migrating Data to the Sharded Cluster

With our cluster configured, we faced the challenge of migrating terabytes of data without downtime. We used a multi-phase approach:

1. Initial Bulk Load

For the initial data migration, we used MongoDB's native tools during a planned maintenance window:

# Dump data from the production system
mongodump --db ecommerce --collection products --out /backup/initial

# Restore to the sharded cluster via mongos
mongorestore --host mongos1:27017 --db ecommerce /backup/initial/ecommerce/products.bson

We repeated this for each of our collections. However, for very large collections, we broke the process down into smaller chunks using date ranges to avoid excessively long operations.

2. Incremental Synchronization

To catch up with changes that occurred after the initial dump, we built a custom synchronization tool:

// Simplified version of our sync logic
const { MongoClient } = require('mongodb');

async function syncCollection(collection, lastSyncTime) {
  // Connect to source and target
  const sourceClient = new MongoClient(SOURCE_URI);
  const targetClient = new MongoClient(TARGET_URI); // mongos
  
  try {
    await sourceClient.connect();
    await targetClient.connect();
    
    const sourceDb = sourceClient.db('ecommerce');
    const targetDb = targetClient.db('ecommerce');
    
    // Find records modified since last sync
    const query = { 
      last_modified: { $gt: lastSyncTime } 
    };
    
    const cursor = sourceDb.collection(collection).find(query);
    
    // Process in batches
    const batchSize = 1000;
    let batch = [];
    
    while (await cursor.hasNext()) {
      const doc = await cursor.next();
      batch.push(doc);
      
      if (batch.length >= batchSize) {
        // Upsert batch to target
        await processBatch(targetDb, collection, batch);
        batch = [];
      }
    }
    
    // Process any remaining documents
    if (batch.length > 0) {
      await processBatch(targetDb, collection, batch);
    }
    
    return new Date();
  } finally {
    await sourceClient.close();
    await targetClient.close();
  }
}

async function processBatch(targetDb, collection, batch) {
  const bulkOps = batch.map(doc => ({
    updateOne: {
      filter: { _id: doc._id },
      update: { $set: doc },
      upsert: true
    }
  }));
  
  await targetDb.collection(collection).bulkWrite(bulkOps);
  console.log(`Processed ${batch.length} documents for ${collection}`);
}

3. Cutover Strategy

After the data was synchronized, we implemented the final cutover:

  1. Put the application into maintenance mode
  2. Run one final sync to catch any last-minute changes
  3. Verify data integrity between source and target
  4. Update connection strings in our application configuration
  5. Restart the application with the new connection string
  6. Keep the old system running in read-only mode for a fallback

Challenges and Solutions

Our migration wasn't without issues. Here are the major challenges we faced and how we solved them:

1. Uneven Data Distribution

Despite our careful shard key selection, we noticed uneven data distribution across shards after a few weeks of operation.

# Checking chunk distribution
> db.adminCommand({ listChunks: "ecommerce.products" })

# Shard distribution
> db.products.getShardDistribution()

# Results showed: Shard1: 50%, Shard2: 30%, Shard3: 20%

Solution: We adjusted our chunk size and implemented pre-splitting for new collections:

# Decrease chunk size to allow more granular distribution
> use config
> db.settings.updateOne(
   { _id: "chunksize" },
   { $set: { value: 32 } },
   { upsert: true }
)

# Pre-split chunks for better initial distribution
> for (let i = 0; i < 20; i++) {
    let category = categories[i % categories.length];
    sh.splitAt("ecommerce.products", { 
      category: category, 
      product_id: ObjectId() 
    });
  }

2. Query Performance Issues

Some of our queries that performed well in a single-server environment were now slow due to scatter-gather operations.

// Problematic query
db.products.find({ price: { $lt: 50 }, in_stock: true })
  .sort({ popularity: -1 })
  .limit(20);

// This query had to check all shards because it didn't include the shard key

Solution: We modified our queries to leverage the shard key whenever possible and created compound indexes to support common query patterns:

// Improved query pattern
db.products.find({ 
  category: "electronics", 
  price: { $lt: 50 }, 
  in_stock: true 
}).sort({ popularity: -1 }).limit(20);

// Adding supportive indexes
db.products.createIndex({ 
  category: 1, 
  price: 1, 
  in_stock: 1, 
  popularity: -1 
});

3. Managing Migrations and Updates

Applying schema changes and index builds across a sharded cluster proved more complex than in a single-server environment.

Solution: We developed a new approach to schema updates:

  1. Create indexes in the background with reduced priority to avoid performance impact
  2. Use a rolling update approach for schema changes
  3. Implement version checks in our application code to handle documents with different schemas
// Example of building indexes on a sharded collection
db.adminCommand({
  createIndexes: "products",
  indexes: [{
    key: { name: "text", description: "text" },
    name: "text_search_idx",
    background: true
  }],
  writeConcern: { w: "majority" }
});

4. Monitoring and Alerting

Our existing monitoring solution wasn't adequate for the complexity of a sharded cluster.

Solution: We expanded our monitoring stack to track:

  • Chunk migration statistics and balancer activity
  • Per-shard performance metrics
  • Mongos router throughput and latency
  • Query patterns that trigger scatter-gather operations
// Simplified version of our monitoring script
db.adminCommand({ serverStatus: 1 }).metrics.commands;
db.adminCommand({ serverStatus: 1 }).sharding;
db.adminCommand({ shardConnPoolStats: 1 });

// Check balancer status
use config
db.locks.find({ _id: "balancer" });
db.chunks.aggregate([
  { $group: { _id: "$shard", count: { $sum: 1 } } }
]);

// Monitor jumbo chunks
db.chunks.find({ jumbo: true });

Performance Results

After fully migrating to the sharded cluster and working through the initial challenges, we saw dramatic improvements:

  • Query Response Time: 75% reduction in average response time for our most common queries
  • Write Throughput: 5x increase in write operations per second
  • Nightly Aggregation Jobs: Reduced from 8+ hours to under 2 hours
  • Peak Load Handling: Successfully managed 3x our previous peak load during sales events

Lessons Learned

Throughout this journey, we learned several valuable lessons:

  1. Shard key selection is crucial and much harder than it initially seems. Take time to analyze your data distribution and query patterns thoroughly.
  2. Testing with realistic data volumes is essential. Our initial tests with small datasets missed several performance issues that only appeared at scale.
  3. Application code needs adjustment for sharded clusters. Many query patterns that work well on a single server can cause performance issues in a sharded environment.
  4. Pre-splitting chunks for large initial data loads saves significant time and prevents the balancer from becoming overwhelmed.
  5. Operational procedures must evolve for sharded clusters. Backups, maintenance, and monitoring all require new approaches.

Future Expansion Plans

With our sharded infrastructure in place, we're now well-positioned for continued growth. Our expansion strategy includes:

  • Adding two additional shards when we reach 70% capacity on the current shards
  • Implementing zone sharding to keep regional data closer to users
  • Exploring time-series collections for our rapidly growing analytics data

Conclusion

Sharding MongoDB was a significant undertaking that required careful planning, execution, and ongoing maintenance. While the process was complex, the performance gains and scalability benefits have been well worth the effort. Our application can now handle our growing data volumes without sacrificing performance, and we have a clear path for future scaling.

For teams considering MongoDB sharding, I'd offer this advice: start planning early, test thoroughly with realistic data, and be prepared to evolve both your infrastructure and application code to fully leverage the benefits of a sharded architecture. With proper preparation, MongoDB sharding can provide a robust solution for handling growing datasets while maintaining performance.