Skip to content

Streams Plugin

Event streaming solution built on Redis Streams with consumer groups, dead letter queues, and automatic retry handling.

Overview

The Streams Plugin provides Redis Streams support for NestJS, enabling event-driven architectures, background job processing, and data pipelines.

CapabilityDescription
Durable MessagingMessages persist in Redis until acknowledged
Consumer GroupsDistribute workload across multiple consumers
At-Least-Once DeliveryExplicit acknowledgment ensures processing
Message HistoryRead historical messages from any point
Dead Letter QueueIsolate failed messages for analysis
Auto-ClaimingRecover from consumer failures

Key Features

  • Producer API — Publish single or batch messages
  • Consumer Groups — Load-balanced message distribution with pending message tracking
  • Declarative Consumers@StreamConsumer decorator for consumer definitions
  • Automatic Retry — Configurable exponential backoff for failed messages
  • Dead Letter Queue — Failed messages isolated after max retries
  • Backpressure Handling — Configurable concurrency and batch sizes

Installation

bash
npm install @nestjs-redisx/core @nestjs-redisx/streams ioredis
bash
npm install @nestjs-redisx/core @nestjs-redisx/streams redis

Basic Configuration

typescript
import { Module } from '@nestjs/common';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';

@Module({
  imports: [
    RedisModule.forRoot({
      clients: {
        host: 'localhost',
        port: 6379,
      },
      plugins: [
        new StreamsPlugin({
          consumer: {
            batchSize: 10,
            concurrency: 1,
            blockTimeout: 5000,
            maxRetries: 3,
          },
          dlq: {
            enabled: true,
          },
        }),
      ],
    }),
  ],
})
export class AppModule {}

Publishing Messages

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { CreateOrderDto, Order, OrderRepository } from './types';

@Injectable()
export class OrderService {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly orderRepository: OrderRepository,
  ) {}

  async createOrder(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepository.create(dto);

    await this.producer.publish('orders', {
      type: 'ORDER_CREATED',
      orderId: order.id,
      customerId: order.customerId,
      total: order.total,
    });

    return order;
  }
}

Consuming Messages

typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { OrderEvent, FulfillmentService } from './types';

@Injectable()
export class OrderProcessor {
  constructor(private readonly fulfillmentService: FulfillmentService) {}

  @StreamConsumer({
    stream: 'orders',
    group: 'order-processors',
    batchSize: 10,
  })
  async handleOrder(message: IStreamMessage<OrderEvent>): Promise<void> {
    const { orderId } = message.data;

    try {
      await this.fulfillmentService.process(orderId);
      await message.ack();
    } catch (error) {
      await message.reject(error);
    }
  }
}

Message Flow

Common Use Cases

Use CaseDescription
Event SourcingPersist domain events in order
Task QueueDistribute background jobs
Audit LogRecord system actions
NotificationsFan-out to multiple services
AnalyticsEvent processing
WebhooksDelivery with retry
Order ProcessingMulti-step workflows

Documentation

TopicDescription
Core ConceptsUnderstanding Redis Streams
ConfigurationConfiguration reference
Producer APIPublishing messages
Consumer APIConsuming messages
Consumer GroupsLoad balancing consumers
Dead Letter QueueHandling failed messages
Message HandlingAck, reject, retry patterns
BackpressureManaging throughput
PatternsArchitecture patterns
MonitoringMetrics and observability
TestingTesting stream processors
RecipesImplementation examples
TroubleshootingDebugging common issues

Released under the MIT License.