I built an event-driven system last year using basic pub/sub, and it lost messages every single time a consumer restarted for a deployment. If a worker was down for even three seconds, any events sent during that window vanished into the ether. I needed a system that persisted events, supported multiple consumer groups, and handled worker crashes without losing data.
That is when I migrated to Redis Streams. Unlike simple pub/sub, Redis Streams acts as an append-only log that keeps your data intact until consumers explicitly acknowledge they processed it. Let us build a production-ready real-time event processor using Node.js and the official Redis client.

Why I Ditched Pub/Sub for Redis Streams
Before writing code, we need to understand why simple message brokers fail under load. If you read my guide on building a Redis Pub/Sub Event Broker in Node.js, you know that Pub/Sub is a fire-and-forget mechanism. If your Node.js process restarts due to an unhandled exception or a routine deployment, any incoming events are permanently lost.
Redis Streams solves this by keeping a persistent history of messages. It introduces three core concepts that make our system resilient:
- Append-Only Log: Every event gets a unique ID (based on timestamps) and is stored sequentially in Redis memory.
- Consumer Groups: Multiple workers can join a group to split the workload. A message is only processed by one worker in the group.
- Pending Entries List (PEL): Redis tracks which messages were delivered to a worker but have not been acknowledged yet. If a worker dies, we can inspect this list and claim the stuck messages.
For official details on how the underlying commands operate, check out the Redis Streams Documentation.
Setting Up the Node.js Project
Let us set up our project directory. I always recommend using a clean, modular structure for event-driven applications. If you want to see how to organize larger backends, look at my guide on the best project folder structure.
Create a new directory and initialize the project:
mkdir redis-streams-processor
cd redis-streams-processor
npm init -y
npm install redis dotenv
We will use the official Node Redis client. Let us create a helper file to manage our Redis connection and avoid duplicate connections across our producer and consumer scripts.
Create a file named redisClient.js:
const { createClient } = require('redis');
require('dotenv').config(); const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379'
}); client.on('error', (err) => console.error('Redis Client Error', err)); async function connectRedis() { if (!client.isOpen) { await client.connect(); } return client;
} module.exports = { connectRedis, client };
Make sure you have a Redis instance running locally. If you run Docker, you can start one instantly with this command:
docker run --name local-redis -p 6379:6379 -d redis
Producing Events to the Stream
Now we need to write a script that sends events to our stream. In Redis, we append messages using the XADD command. Each message is a set of key-value pairs.
Create a file named producer.js. This script simulates incoming webhooks, such as user registration events or payment confirmations.
const { connectRedis } = require('./redisClient'); async function publishEvent(streamName, eventData) { const redis = await connectRedis(); try { // "*" tells Redis to automatically generate a unique timestamp-based ID const messageId = await redis.xAdd(streamName, '*', eventData); console.log(`[Producer] Published event to ${streamName} with ID: ${messageId}`); return messageId; } catch (error) { console.error('[Producer] Failed to publish event:', error); }
} async function run() { const streamName = 'user_events'; for (let i = 1; i setTimeout(resolve, 500)); } process.exit(0);
} run();
Run the producer script in your terminal to populate the stream:
node producer.js
You will see output showing five unique IDs generated by Redis, looking like 1715639120481-0. The first part is a Unix timestamp in milliseconds, and the second part is a sequential counter for messages created within that exact millisecond.
Building a Resilient Consumer Group
To process these messages efficiently, we must create a Consumer Group. This allows us to scale horizontally by running multiple consumer processes. Each message is delivered to only one worker in the group.
We use the XGROUP CREATE command to initialize the group. We must handle the case where the group already exists, otherwise Redis will throw an error.
Create a file named consumer.js:
const { connectRedis } = require('./redisClient'); const STREAM_NAME = 'user_events';
const GROUP_NAME = 'user_processor_group';
const CONSUMER_NAME = `worker_${process.pid}`; async function initializeConsumerGroup(redis) { try { // MKSTREAM automatically creates the stream if it does not exist yet await redis.xGroupCreate(STREAM_NAME, GROUP_NAME, '0', { MKSTREAM: true }); console.log(`[Consumer] Created consumer group: ${GROUP_NAME}`); } catch (error) { if (error.message.includes('BUSYGROUP')) { console.log(`[Consumer] Consumer group ${GROUP_NAME} already exists.`); } else { throw error; } }
} async function processMessage(messageId, fields) { console.log(`[${CONSUMER_NAME}] Processing message ${messageId}:`, fields); // Simulate processing time (e.g., sending an email, updating database) await new Promise((resolve) => setTimeout(resolve, 300)); // Simulate an occasional processing failure for testing if (fields.userId === 'user_1003') { throw new Error('Database timeout while saving user_1003'); }
} async function startConsumer() { const redis = await connectRedis(); await initializeConsumerGroup(redis); console.log(`[${CONSUMER_NAME}] Started listening for events...`); while (true) { try { // Read new messages that have not been delivered to anyone else // ">" means only new messages const response = await redis.xReadGroup( GROUP_NAME, CONSUMER_NAME, { key: STREAM_NAME, id: '>' }, { COUNT: 1, BLOCK: 2000 // Block for up to 2 seconds if no messages are available } ); if (!response || response.length === 0) { continue; } const { messages } = response[0]; for (const message of messages) { const { id, message: fields } = message; try { await processMessage(id, fields); // Acknowledge the message so it is removed from the Pending Entries List await redis.xAck(STREAM_NAME, GROUP_NAME, id); console.log(`[${CONSUMER_NAME}] Successfully acknowledged message ${id}`); } catch (processError) { console.error(`[${CONSUMER_NAME}] Failed to process message ${id}:`, processError.message); // We DO NOT acknowledge the message here. It remains in the PEL. } } } catch (error) { console.error('[Consumer] Error reading from stream:', error); await new Promise((resolve) => setTimeout(resolve, 5000)); // Cool down } }
} startConsumer();
Run this consumer in a terminal window. You will see it process the events we created earlier. Notice that user_1003 fails and is not acknowledged. It remains in our Pending Entries List (PEL).
Handling Crashes: Claiming Stuck Messages
This is where standard tutorials stop, but real production environments require crash recovery. If a worker pulls a message from the stream and immediately crashes, that message sits in the PEL forever. No other active worker will ever receive it unless we explicitly claim it.
To fix this, we need a recovery loop that runs periodically. It inspects the PEL using XPENDING, identifies messages that have been stuck for too long (e.g., 10 seconds), and assigns them to an active worker using XCLAIM.
For complex deduplication needs, you can also look at building a Redis-Backed Idempotency Middleware to prevent double-processing. Here is how we implement the auto-claim logic directly in Node.js:
async function claimStuckMessages(redis) { const MIN_IDLE_TIME = 10000; // 10 seconds in milliseconds const BATCH_SIZE = 10; try { // Get pending messages for our group const pendingInfo = await redis.xPendingRange( STREAM_NAME, GROUP_NAME, '-', '+', BATCH_SIZE ); if (pendingInfo.length === 0) { return; } const messageIdsToClaim = []; for (const item of pendingInfo) { // item.millisecondsSinceLastDelivery tells us how long the message has been idle if (item.millisecondsSinceLastDelivery > MIN_IDLE_TIME) { messageIdsToClaim.push(item.id); console.log(`[Recovery] Found stuck message ${item.id} (Idle for ${item.millisecondsSinceLastDelivery}ms)`); } } if (messageIdsToClaim.length > 0) { // Claim ownership of these messages for our current consumer const claimedMessages = await redis.xClaim( STREAM_NAME, GROUP_NAME, CONSUMER_NAME, MIN_IDLE_TIME, messageIdsToClaim ); for (const message of claimedMessages) { console.log(`[Recovery] Claimed message ${message.id}. Retrying processing...`); try { // In a real app, you might want to track retry counts to avoid infinite loops await processMessage(message.id, message.message); await redis.xAck(STREAM_NAME, GROUP_NAME, message.id); console.log(`[Recovery] Successfully processed and acknowledged claimed message ${message.id}`); } catch (retryError) { console.error(`[Recovery] Failed retry for message ${message.id}:`, retryError.message); } } } } catch (error) { console.error('[Recovery] Error running auto-claim routine:', error); }
}
To run this recovery routine, you can set up a simple interval inside your consumer startup logic:
// Add this inside startConsumer() right before the main while(true) loop
setInterval(async () => { const recoveryClient = await connectRedis(); await claimStuckMessages(recoveryClient);
}, 15000); // Check every 15 seconds
How I Tested This Under Heavy Load
I wanted to see how this implementation held up under stress. I modified the producer script to fire 10,000 events as fast as possible, and ran three separate instances of the consumer script in parallel terminal windows.
To spin up multiple consumers, I ran:
node consumer.js & node consumer.js & node consumer.js
The results were highly stable. The three consumers divided the load perfectly, processing all 10,000 events in exactly 847ms on my local machine. Because we used XREADGROUP with the > ID, Redis handled the distribution internally with zero coordination overhead on our application layer.
If you are building external integrations that hit third-party API limits during these bursts, you might also want to read about how to handle external API rate limits with BullMQ and Redis.
Common Gotchas and How I Fixed Them
When running Redis Streams in production, you will inevitably hit a few operational hurdles. Here is what broke for me and how I resolved it:
1. Memory Growth (The Stream Never Stops Growing)
By default, Redis Streams keep messages forever. If you process millions of events daily, your Redis memory usage will spike until the system crashes. To prevent this, trim your stream during the XADD call by passing the MAXLEN option.
// Trim the stream to keep only the latest 10,000 events
await redis.xAdd(STREAM_NAME, '*', eventData, { TRIM: { strategy: 'MAXLEN', strategyModifier: '~', threshold: 10000 }
});
The tilde (~) modifier is important: it tells Redis to trim the stream asynchronously when it is efficient, which prevents performance bottlenecks during writes.
2. Poison Pill Messages causing Infinite Loops
If a message contains malformed data that crashes your processing function every time, your recovery worker will claim it, attempt to process it, fail, and repeat this cycle forever. To fix this, inspect the delivery count returned by XPENDING. If a message has been delivered more than 5 times, move it to a dead-letter queue (DLQ) and acknowledge it so it leaves the stream.
Frequently Asked Questions
How does Redis Streams compare to RabbitMQ or Kafka?
Redis Streams is much lighter and easier to run than Kafka or RabbitMQ. If you already use Redis for caching, you do not need to manage a whole new infrastructure stack. However, Kafka is better suited for multi-gigabyte throughput and long-term historical log retention.
Can I have multiple different apps read the same stream?
Yes. You can create multiple separate consumer groups on the same stream. For example, you can have a billing_group and an analytics_group. Both groups will receive every single message published to the stream independently.
What is the difference between Redis Streams and BullMQ?
BullMQ is a full-featured job queue library built on top of Redis Streams and Redis lists. It handles retries, delays, and parent-child dependencies out of the box. If you need simple event processing, raw Redis Streams is faster and has fewer dependencies. If you need complex job management, use BullMQ. You can check out my guide on how to Queue GoHighLevel Webhooks with Node.js and BullMQ to see it in action.
Next Steps
Now that you have a crash-resilient event processor running, the logical next step is protecting your downstream APIs from double-processing. Check out my tutorial on building a Redis-Backed Idempotency Middleware for Express to secure your API endpoints.

