Scaling MongoDB with Horizontal Sharding
When our e-commerce platform reached 10 million products and 50 million monthly visitors, our MongoDB infrastructure began showing serious strain. Queries slowed down, writes caused blocking, and nightly aggregation jobs were taking over 8 hours to complete. After optimizing indexes and query patterns, we realized we needed a more fundamental solution: horizontal sharding. This is the story of how we successfully implemented MongoDB sharding, the challenges we faced, and the lessons we learned.
Understanding the Need for Sharding
Before diving into sharding, we explored several scaling options:
- Vertical scaling - We had already upgraded to the largest available instances, but were still experiencing performance issues
- Read replicas - We were using these, but they didn't help with write performance
- Data archiving - We implemented this for older data, but active data was still growing rapidly
Sharding became the clear choice when we realized:
- Our product catalog was growing by over 100,000 items per week
- Customer data and order history were growing linearly with our user base
- Our analytics needs required keeping historical data accessible
- Peak traffic was causing write operations to block reads
MongoDB Sharding Basics
For those unfamiliar, MongoDB sharding distributes data across multiple machines. A sharded cluster consists of:
- Shard servers - Where the actual data is stored, each shard is a replica set
- Config servers - Store metadata about the cluster's data distribution
- Mongos routers - Query routers that direct operations to the appropriate shards
The data is partitioned using a shard key, which determines how records are distributed across shards.
Planning the Sharded Cluster
Our planning phase took several weeks, with careful consideration of:
1. Shard Key Selection
This was our most critical decision. After analyzing our workload, we identified several potential shard keys for our main collections:
// Products collection candidates
- product_id (high cardinality, but sequential)
- category + product_id (good distribution, common query pattern)
- vendor_id + created_at (good distribution, but not aligned with common queries)
// Orders collection candidates
- customer_id (potential for hotspots with very active customers)
- order_date + customer_id (better distribution, aligned with queries)
- region + order_date (good for regional queries, but uneven distribution)
// User events collection
- user_id + event_date (good distribution for active users)
- event_type + timestamp (better for analytics, uneven distribution)After testing with our dataset and query patterns, we settled on:
- Products:
{category: 1, product_id: 1}- This aligned with our most common query patterns while providing good distribution - Orders:
{order_date: 1, customer_id: 1}- Most queries included date ranges, and this prevented hotspots - User events:
{event_date: 1, user_id: 1}- Optimized for our daily event processing jobs
Key Shard Key Insights
Choose shard keys that: (1) have high cardinality, (2) distribute writes evenly, and (3) support your most frequent queries without requiring scatter-gather operations across all shards.
2. Cluster Topology
We designed our initial cluster with:
- 3 shards (each a 3-node replica set)
- 3 config servers (replica set)
- 2 mongos routers (with potential to add more)
This gave us redundancy within each shard and for the cluster metadata, while allowing us to handle our current load with room for growth.
3. Infrastructure Requirements
Based on our data growth projections and performance requirements, we calculated our resource needs:
// Per shard (3-node replica set):
- 16 CPU cores
- 64 GB RAM
- 2 TB SSD storage
- 10 Gbps network
// Config servers (3-node replica set):
- 4 CPU cores
- 16 GB RAM
- 100 GB SSD storage
// Mongos routers (2 instances):
- 8 CPU cores
- 32 GB RAMWe opted to deploy the entire infrastructure on AWS using EC2 instances, with separate VPC subnets for each component to control traffic flow.
Setting Up the Sharded Cluster
With our planning complete, we began the implementation. Here's a simplified version of our setup process:
1. Configuring the Config Server Replica Set
# Start the config server processes
mongod --configsvr --replSet configRS --dbpath /data/configdb --port 27019
# Connect to one config server and initiate the replica set
mongo --port 27019
> rs.initiate({
_id: "configRS",
configsvr: true,
members: [
{ _id: 0, host: "config1:27019" },
{ _id: 1, host: "config2:27019" },
{ _id: 2, host: "config3:27019" }
]
})
# Verify the config server replica set is healthy
> rs.status()2. Setting Up Each Shard Replica Set
For each of our three shards, we followed this process:
# Start the shard members
mongod --shardsvr --replSet shard1RS --dbpath /data/shard1 --port 27018
# Initiate the replica set
mongo --port 27018
> rs.initiate({
_id: "shard1RS",
members: [
{ _id: 0, host: "shard1-node1:27018" },
{ _id: 1, host: "shard1-node2:27018" },
{ _id: 2, host: "shard1-node3:27018" }
]
})
# Verify the shard replica set is healthy
> rs.status()We repeated this for shard2RS and shard3RS.
3. Starting the Mongos Routers
# Start the mongos process, pointing to the config server replica set
mongos --configdb configRS/config1:27019,config2:27019,config3:27019 --port 270174. Adding Shards to the Cluster
# Connect to mongos
mongo --port 27017
# Add each shard to the cluster
> sh.addShard("shard1RS/shard1-node1:27018,shard1-node2:27018,shard1-node3:27018")
> sh.addShard("shard2RS/shard2-node1:27018,shard2-node2:27018,shard2-node3:27018")
> sh.addShard("shard3RS/shard3-node1:27018,shard3-node2:27018,shard3-node3:27018")
# Verify the shards were added
> sh.status()5. Enabling Sharding for Databases and Collections
# Enable sharding for our database
> sh.enableSharding("ecommerce")
# Shard the products collection
> db.adminCommand({
shardCollection: "ecommerce.products",
key: { category: 1, product_id: 1 }
})
# Shard the orders collection
> db.adminCommand({
shardCollection: "ecommerce.orders",
key: { order_date: 1, customer_id: 1 }
})
# Shard the user events collection
> db.adminCommand({
shardCollection: "ecommerce.events",
key: { event_date: 1, user_id: 1 }
})Migrating Data to the Sharded Cluster
With our cluster configured, we faced the challenge of migrating terabytes of data without downtime. We used a multi-phase approach:
1. Initial Bulk Load
For the initial data migration, we used MongoDB's native tools during a planned maintenance window:
# Dump data from the production system
mongodump --db ecommerce --collection products --out /backup/initial
# Restore to the sharded cluster via mongos
mongorestore --host mongos1:27017 --db ecommerce /backup/initial/ecommerce/products.bsonWe repeated this for each of our collections. However, for very large collections, we broke the process down into smaller chunks using date ranges to avoid excessively long operations.
2. Incremental Synchronization
To catch up with changes that occurred after the initial dump, we built a custom synchronization tool:
// Simplified version of our sync logic
const { MongoClient } = require('mongodb');
async function syncCollection(collection, lastSyncTime) {
// Connect to source and target
const sourceClient = new MongoClient(SOURCE_URI);
const targetClient = new MongoClient(TARGET_URI); // mongos
try {
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db('ecommerce');
const targetDb = targetClient.db('ecommerce');
// Find records modified since last sync
const query = {
last_modified: { $gt: lastSyncTime }
};
const cursor = sourceDb.collection(collection).find(query);
// Process in batches
const batchSize = 1000;
let batch = [];
while (await cursor.hasNext()) {
const doc = await cursor.next();
batch.push(doc);
if (batch.length >= batchSize) {
// Upsert batch to target
await processBatch(targetDb, collection, batch);
batch = [];
}
}
// Process any remaining documents
if (batch.length > 0) {
await processBatch(targetDb, collection, batch);
}
return new Date();
} finally {
await sourceClient.close();
await targetClient.close();
}
}
async function processBatch(targetDb, collection, batch) {
const bulkOps = batch.map(doc => ({
updateOne: {
filter: { _id: doc._id },
update: { $set: doc },
upsert: true
}
}));
await targetDb.collection(collection).bulkWrite(bulkOps);
console.log(`Processed ${batch.length} documents for ${collection}`);
}3. Cutover Strategy
After the data was synchronized, we implemented the final cutover:
- Put the application into maintenance mode
- Run one final sync to catch any last-minute changes
- Verify data integrity between source and target
- Update connection strings in our application configuration
- Restart the application with the new connection string
- Keep the old system running in read-only mode for a fallback
Challenges and Solutions
Our migration wasn't without issues. Here are the major challenges we faced and how we solved them:
1. Uneven Data Distribution
Despite our careful shard key selection, we noticed uneven data distribution across shards after a few weeks of operation.
# Checking chunk distribution
> db.adminCommand({ listChunks: "ecommerce.products" })
# Shard distribution
> db.products.getShardDistribution()
# Results showed: Shard1: 50%, Shard2: 30%, Shard3: 20%Solution: We adjusted our chunk size and implemented pre-splitting for new collections:
# Decrease chunk size to allow more granular distribution
> use config
> db.settings.updateOne(
{ _id: "chunksize" },
{ $set: { value: 32 } },
{ upsert: true }
)
# Pre-split chunks for better initial distribution
> for (let i = 0; i < 20; i++) {
let category = categories[i % categories.length];
sh.splitAt("ecommerce.products", {
category: category,
product_id: ObjectId()
});
}2. Query Performance Issues
Some of our queries that performed well in a single-server environment were now slow due to scatter-gather operations.
// Problematic query
db.products.find({ price: { $lt: 50 }, in_stock: true })
.sort({ popularity: -1 })
.limit(20);
// This query had to check all shards because it didn't include the shard keySolution: We modified our queries to leverage the shard key whenever possible and created compound indexes to support common query patterns:
// Improved query pattern
db.products.find({
category: "electronics",
price: { $lt: 50 },
in_stock: true
}).sort({ popularity: -1 }).limit(20);
// Adding supportive indexes
db.products.createIndex({
category: 1,
price: 1,
in_stock: 1,
popularity: -1
});3. Managing Migrations and Updates
Applying schema changes and index builds across a sharded cluster proved more complex than in a single-server environment.
Solution: We developed a new approach to schema updates:
- Create indexes in the background with reduced priority to avoid performance impact
- Use a rolling update approach for schema changes
- Implement version checks in our application code to handle documents with different schemas
// Example of building indexes on a sharded collection
db.adminCommand({
createIndexes: "products",
indexes: [{
key: { name: "text", description: "text" },
name: "text_search_idx",
background: true
}],
writeConcern: { w: "majority" }
});4. Monitoring and Alerting
Our existing monitoring solution wasn't adequate for the complexity of a sharded cluster.
Solution: We expanded our monitoring stack to track:
- Chunk migration statistics and balancer activity
- Per-shard performance metrics
- Mongos router throughput and latency
- Query patterns that trigger scatter-gather operations
// Simplified version of our monitoring script
db.adminCommand({ serverStatus: 1 }).metrics.commands;
db.adminCommand({ serverStatus: 1 }).sharding;
db.adminCommand({ shardConnPoolStats: 1 });
// Check balancer status
use config
db.locks.find({ _id: "balancer" });
db.chunks.aggregate([
{ $group: { _id: "$shard", count: { $sum: 1 } } }
]);
// Monitor jumbo chunks
db.chunks.find({ jumbo: true });Performance Results
After fully migrating to the sharded cluster and working through the initial challenges, we saw dramatic improvements:
- Query Response Time: 75% reduction in average response time for our most common queries
- Write Throughput: 5x increase in write operations per second
- Nightly Aggregation Jobs: Reduced from 8+ hours to under 2 hours
- Peak Load Handling: Successfully managed 3x our previous peak load during sales events
Lessons Learned
Throughout this journey, we learned several valuable lessons:
- Shard key selection is crucial and much harder than it initially seems. Take time to analyze your data distribution and query patterns thoroughly.
- Testing with realistic data volumes is essential. Our initial tests with small datasets missed several performance issues that only appeared at scale.
- Application code needs adjustment for sharded clusters. Many query patterns that work well on a single server can cause performance issues in a sharded environment.
- Pre-splitting chunks for large initial data loads saves significant time and prevents the balancer from becoming overwhelmed.
- Operational procedures must evolve for sharded clusters. Backups, maintenance, and monitoring all require new approaches.
Future Expansion Plans
With our sharded infrastructure in place, we're now well-positioned for continued growth. Our expansion strategy includes:
- Adding two additional shards when we reach 70% capacity on the current shards
- Implementing zone sharding to keep regional data closer to users
- Exploring time-series collections for our rapidly growing analytics data
Conclusion
Sharding MongoDB was a significant undertaking that required careful planning, execution, and ongoing maintenance. While the process was complex, the performance gains and scalability benefits have been well worth the effort. Our application can now handle our growing data volumes without sacrificing performance, and we have a clear path for future scaling.
For teams considering MongoDB sharding, I'd offer this advice: start planning early, test thoroughly with realistic data, and be prepared to evolve both your infrastructure and application code to fully leverage the benefits of a sharded architecture. With proper preparation, MongoDB sharding can provide a robust solution for handling growing datasets while maintaining performance.