Designing and Implementing a Simple, Yet Powerful, Distributed Job Scheduler
Building a Distributed Background Job Scheduler with Redis and Node.js
The Problem
Complex workflows in distributed systems often require background processing that can take minutes to complete. Traditional vertical scaling approaches are cost-prohibitive due to low average utilization, while horizontal scaling introduces race conditions where multiple workers might process the same job.
Architecture Design
When designing distributed systems, we must balance the CAP theorem constraints: Consistency, Availability, and Partition tolerance.
Leader-Follower vs Leaderless Architecture
Leader-Follower Pattern:
- Maximum consistency through centralized job scheduling
- Complex implementation with single points of failure
- Unnecessary overhead for most use cases
Leaderless Pattern (Recommended):
- Maximizes availability and partition tolerance
- Embraces eventual consistency
- Simpler implementation and better fault tolerance
Redis-Based Solution
Redis Lists provide atomic operations that eliminate race conditions inherently. The key insight is using Redis as both a distributed lock and a queue system.
Key Technical Benefits
-
Atomic Operations:
LPUSH
/RPUSH
andBLPOP
/BRPOP
are atomic - Multiple Queue Support: Separate queues for jobs, retries, and priorities
- Priority Queues: Enterprise vs standard user job prioritization
- Horizontal Scaling: Deploy multiple worker nodes behind a load balancer
Implementation
Job Queue Manager
import Redis from 'ioredis';
interface JobPayload {
id: string;
type: string;
data: any;
priority?: 'high' | 'normal' | 'low';
retryCount?: number;
maxRetries?: number;
}
class DistributedJobQueue {
private redis: Redis;
private readonly QUEUE_KEYS = {
high: 'jobs:high_priority',
normal: 'jobs:normal_priority',
low: 'jobs:low_priority',
retry: 'jobs:retry'
};
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
// Enqueue job - any instance can call this
async enqueue(payload: JobPayload): Promise<void> {
const queueKey = this.getQueueKey(payload.priority || 'normal');
await this.redis.rpush(queueKey, JSON.stringify({
...payload,
enqueuedAt: Date.now(),
retryCount: payload.retryCount || 0
}));
console.log(`Job ${payload.id} enqueued to ${queueKey}`);
}
// Process jobs - atomic dequeue operation
async processJob(timeout: number = 5): Promise<void> {
try {
// Check queues by priority order
const queueKeys = [
this.QUEUE_KEYS.high,
this.QUEUE_KEYS.normal,
this.QUEUE_KEYS.low,
this.QUEUE_KEYS.retry
];
// BLPOP is atomic - only one worker gets each job
const result = await this.redis.blpop(...queueKeys, timeout);
if (!result) return; // Timeout - no jobs available
const [queueName, jobData] = result;
const job = JSON.parse(jobData) as JobPayload;
console.log(`Processing job ${job.id} from ${queueName}`);
await this.executeJob(job);
} catch (error) {
console.error('Error processing job:', error);
}
}
private async executeJob(job: JobPayload): Promise<void> {
try {
// Simulate job processing
await this.processJobByType(job);
console.log(`Job ${job.id} completed successfully`);
} catch (error) {
console.error(`Job ${job.id} failed:`, error);
await this.handleJobFailure(job);
}
}
private async handleJobFailure(job: JobPayload): Promise<void> {
const maxRetries = job.maxRetries || 3;
const currentRetries = job.retryCount || 0;
if (currentRetries < maxRetries) {
// Re-queue with exponential backoff
const delay = Math.pow(2, currentRetries) * 1000; // 1s, 2s, 4s, 8s...
setTimeout(async () => {
await this.redis.rpush(
this.QUEUE_KEYS.retry,
JSON.stringify({
...job,
retryCount: currentRetries + 1,
lastRetryAt: Date.now()
})
);
}, delay);
console.log(`Job ${job.id} scheduled for retry ${currentRetries + 1}/${maxRetries} in ${delay}ms`);
} else {
console.error(`Job ${job.id} exceeded max retries (${maxRetries})`);
// Send to dead letter queue or log failure
await this.redis.rpush('jobs:failed', JSON.stringify(job));
}
}
private async processJobByType(job: JobPayload): Promise<void> {
// Implement your business logic here based on job.type
switch (job.type) {
case 'data_processing':
await this.handleDataProcessing(job.data);
break;
case 'email_sending':
await this.handleEmailSending(job.data);
break;
case 'report_generation':
await this.handleReportGeneration(job.data);
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
}
private getQueueKey(priority: string): string {
return this.QUEUE_KEYS[priority as keyof typeof this.QUEUE_KEYS] || this.QUEUE_KEYS.normal;
}
// Placeholder methods for different job types
private async handleDataProcessing(data: any): Promise<void> {
// Implement data processing logic
await new Promise(resolve => setTimeout(resolve, 2000));
}
private async handleEmailSending(data: any): Promise<void> {
// Implement email sending logic
await new Promise(resolve => setTimeout(resolve, 1000));
}
private async handleReportGeneration(data: any): Promise<void> {
// Implement report generation logic
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
export { DistributedJobQueue, JobPayload };
Scheduler Service
import cron from 'node-cron';
import { DistributedJobQueue } from './jobQueue';
class JobScheduler {
private jobQueue: DistributedJobQueue;
private isRunning = false;
constructor(redisUrl: string) {
this.jobQueue = new DistributedJobQueue(redisUrl);
}
start(): void {
if (this.isRunning) return;
// Schedule job processing every 10 seconds
cron.schedule('*/10 * * * * *', async () => {
await this.jobQueue.processJob(5); // 5 second timeout
});
// Health check and metrics collection every minute
cron.schedule('0 * * * * *', async () => {
await this.collectMetrics();
});
// Cleanup failed jobs older than 24 hours (daily at midnight)
cron.schedule('0 0 * * *', async () => {
await this.cleanupOldJobs();
});
this.isRunning = true;
console.log('Distributed job scheduler started');
}
stop(): void {
cron.destroy();
this.isRunning = false;
console.log('Distributed job scheduler stopped');
}
private async collectMetrics(): Promise<void> {
// Implement metrics collection logic
console.log('Collecting job queue metrics');
}
private async cleanupOldJobs(): Promise<void> {
// Implement cleanup logic for old failed jobs
console.log('Cleaning up old failed jobs');
}
}
export { JobScheduler };
Usage Example
import { JobScheduler } from './scheduler';
import { DistributedJobQueue } from './jobQueue';
// Initialize the scheduler
const scheduler = new JobScheduler('redis://localhost:6379');
const jobQueue = new DistributedJobQueue('redis://localhost:6379');
// Start the background processor
scheduler.start();
// Enqueue jobs from anywhere in your application
await jobQueue.enqueue({
id: 'job-001',
type: 'data_processing',
data: { userId: 123, dataset: 'analytics' },
priority: 'high',
maxRetries: 5
});
Scaling Strategy
- Deploy Multiple Instances: Each worker node runs the same scheduler code
- Load Balancer: Distribute API traffic across instances
- Redis Cluster: Scale Redis horizontally for high-throughput scenarios
- Monitoring: Track queue lengths, processing times, and failure rates
Key Advantages
- Race Condition Free: Redis atomic operations eliminate duplicate processing
- Cost Effective: Horizontal scaling with automatic load distribution
- Fault Tolerant: Failed jobs automatically retry with exponential backoff
- Priority Support: Critical jobs process before standard ones
- Simple Deployment: Stateless workers behind a load balancer
This architecture provides a robust, scalable solution for distributed background job processing while maintaining simplicity and cost efficiency.