Streams Plugin
Event streaming solution built on Redis Streams with consumer groups, dead letter queues, and automatic retry handling.
Overview
The Streams Plugin provides Redis Streams support for NestJS, enabling event-driven architectures, background job processing, and data pipelines.
| Capability | Description |
|---|---|
| Durable Messaging | Messages persist in Redis until acknowledged |
| Consumer Groups | Distribute workload across multiple consumers |
| At-Least-Once Delivery | Explicit acknowledgment ensures processing |
| Message History | Read historical messages from any point |
| Dead Letter Queue | Isolate failed messages for analysis |
| Auto-Claiming | Recover from consumer failures |
Key Features
- Producer API — Publish single or batch messages
- Consumer Groups — Load-balanced message distribution with pending message tracking
- Declarative Consumers —
@StreamConsumerdecorator for consumer definitions - Automatic Retry — Configurable exponential backoff for failed messages
- Dead Letter Queue — Failed messages isolated after max retries
- Backpressure Handling — Configurable concurrency and batch sizes
Installation
bash
npm install @nestjs-redisx/core @nestjs-redisx/streams ioredisbash
npm install @nestjs-redisx/core @nestjs-redisx/streams redisBasic Configuration
typescript
import { Module } from '@nestjs/common';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';
@Module({
imports: [
RedisModule.forRoot({
clients: {
host: 'localhost',
port: 6379,
},
plugins: [
new StreamsPlugin({
consumer: {
batchSize: 10,
concurrency: 1,
blockTimeout: 5000,
maxRetries: 3,
},
dlq: {
enabled: true,
},
}),
],
}),
],
})
export class AppModule {}Publishing Messages
typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { CreateOrderDto, Order, OrderRepository } from './types';
@Injectable()
export class OrderService {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly orderRepository: OrderRepository,
) {}
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.create(dto);
await this.producer.publish('orders', {
type: 'ORDER_CREATED',
orderId: order.id,
customerId: order.customerId,
total: order.total,
});
return order;
}
}Consuming Messages
typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { OrderEvent, FulfillmentService } from './types';
@Injectable()
export class OrderProcessor {
constructor(private readonly fulfillmentService: FulfillmentService) {}
@StreamConsumer({
stream: 'orders',
group: 'order-processors',
batchSize: 10,
})
async handleOrder(message: IStreamMessage<OrderEvent>): Promise<void> {
const { orderId } = message.data;
try {
await this.fulfillmentService.process(orderId);
await message.ack();
} catch (error) {
await message.reject(error);
}
}
}Message Flow
Common Use Cases
| Use Case | Description |
|---|---|
| Event Sourcing | Persist domain events in order |
| Task Queue | Distribute background jobs |
| Audit Log | Record system actions |
| Notifications | Fan-out to multiple services |
| Analytics | Event processing |
| Webhooks | Delivery with retry |
| Order Processing | Multi-step workflows |
Documentation
| Topic | Description |
|---|---|
| Core Concepts | Understanding Redis Streams |
| Configuration | Configuration reference |
| Producer API | Publishing messages |
| Consumer API | Consuming messages |
| Consumer Groups | Load balancing consumers |
| Dead Letter Queue | Handling failed messages |
| Message Handling | Ack, reject, retry patterns |
| Backpressure | Managing throughput |
| Patterns | Architecture patterns |
| Monitoring | Metrics and observability |
| Testing | Testing stream processors |
| Recipes | Implementation examples |
| Troubleshooting | Debugging common issues |