Configuration
Configure the Streams plugin.
Basic Configuration
typescript
import { Module } from '@nestjs/common';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';
@Module({
imports: [
RedisModule.forRoot({
clients: {
host: 'localhost',
port: 6379,
},
plugins: [
new StreamsPlugin({
consumer: {
batchSize: 10,
concurrency: 1,
blockTimeout: 5000,
maxRetries: 3,
},
dlq: {
enabled: true,
},
}),
],
}),
],
})
export class AppModule {}Async Configuration
Load connection config from ConfigService:
typescript
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';
@Module({
imports: [
ConfigModule.forRoot(),
RedisModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
plugins: [
new StreamsPlugin({
consumer: {
batchSize: parseInt(process.env.STREAMS_BATCH_SIZE || '10', 10),
concurrency: parseInt(process.env.STREAMS_CONCURRENCY || '1', 10),
maxRetries: parseInt(process.env.STREAMS_MAX_RETRIES || '3', 10),
},
dlq: {
enabled: process.env.STREAMS_DLQ_ENABLED !== 'false',
},
}),
],
useFactory: (config: ConfigService) => ({
clients: {
host: config.get<string>('REDIS_HOST', 'localhost'),
port: config.get<number>('REDIS_PORT', 6379),
},
}),
}),
],
})
export class AppModule {}Configuration Options
Consumer Options
typescript
interface ConsumerOptions {
/**
* Messages per batch
* @default 10
*/
batchSize?: number;
/**
* Block timeout when waiting for messages (ms)
* @default 5000
*/
blockTimeout?: number;
/**
* Maximum concurrent message processing
* @default 1
*/
concurrency?: number;
/**
* Maximum retry attempts before DLQ
* @default 3
*/
maxRetries?: number;
/**
* Idle time before claiming messages (ms)
* @default 30000
*/
claimIdleTimeout?: number;
}Producer Options
typescript
interface ProducerOptions {
/**
* Maximum stream length (auto-trim)
* @default 100000
*/
maxLen?: number;
/**
* Auto-create stream on first publish
* @default true
*/
autoCreate?: boolean;
}Dead Letter Queue Options
typescript
interface DLQOptions {
/**
* Enable Dead Letter Queue
* @default true
*/
enabled?: boolean;
/**
* DLQ stream suffix
* @default ':dlq'
*/
streamSuffix?: string;
/**
* Maximum DLQ stream length
* @default 10000
*/
maxLen?: number;
}Retry Options
INFO
When a message is rejected and attempt < maxRetries, the library waits using exponential backoff (initialDelay × multiplier^(attempt-1), capped at maxDelay), then re-adds the message to the stream with _attempt incremented. See Message Handling for details.
typescript
interface RetryOptions {
/**
* Maximum retry attempts
* @default 3
*/
maxRetries?: number;
/**
* Initial backoff delay (ms)
* @default 1000
*/
initialDelay?: number;
/**
* Maximum backoff delay (ms)
* @default 30000
*/
maxDelay?: number;
/**
* Backoff multiplier
* @default 2
*/
multiplier?: number;
}Trim Options
typescript
interface TrimOptions {
/**
* Enable auto-trimming
* @default true
*/
enabled?: boolean;
/**
* Maximum stream length
* @default 100000
*/
maxLen?: number;
/**
* Trim strategy
* @default 'MAXLEN'
*/
strategy?: 'MAXLEN' | 'MINID';
/**
* Use approximate trimming (~)
* @default true
*/
approximate?: boolean;
}Full Configuration Example
typescript
new StreamsPlugin({
// Consumer configuration
consumer: {
batchSize: 50,
blockTimeout: 10000,
concurrency: 5,
maxRetries: 5,
claimIdleTimeout: 60000,
},
// Producer configuration
producer: {
maxLen: 500000,
autoCreate: true,
},
// Dead Letter Queue
dlq: {
enabled: true,
streamSuffix: ':failed',
maxLen: 50000,
},
// Retry backoff
retry: {
maxRetries: 5,
initialDelay: 2000,
maxDelay: 30000,
multiplier: 3,
},
// Stream trimming
trim: {
enabled: true,
maxLen: 500000,
strategy: 'MAXLEN',
approximate: true,
},
})Retry Behavior
When a message handler fails and attempt < maxRetries, the library applies exponential backoff and re-adds the message to the stream with _attempt incremented. After maxRetries failures, the message moves to DLQ. Use claimIdle() separately for recovering orphaned messages (e.g., consumer crashed without ACK):
typescript
new StreamsPlugin({
consumer: {
maxRetries: 3, // Max attempts before DLQ
},
retry: {
initialDelay: 1000, // First retry delay: 1s
maxDelay: 30000, // Cap at 30s
multiplier: 2, // Double each attempt
},
})See Message Handling for details on the retry lifecycle.
Presets
High Throughput
typescript
new StreamsPlugin({
consumer: {
batchSize: 100, // Large batches
concurrency: 20, // High parallelism
blockTimeout: 1000, // Quick checks
},
producer: {
maxLen: 1000000, // Large stream
},
})Low-Latency Processing
typescript
new StreamsPlugin({
consumer: {
batchSize: 1, // Single message
concurrency: 10,
blockTimeout: 100, // Very short wait
},
})Reliable Processing
typescript
new StreamsPlugin({
consumer: {
maxRetries: 10, // Many retries before DLQ
claimIdleTimeout: 60000, // Long idle before claim
},
dlq: {
enabled: true,
maxLen: 100000, // Large DLQ
},
})Event Sourcing
typescript
new StreamsPlugin({
trim: {
enabled: false, // Keep all events!
},
producer: {
maxLen: 0, // No limit
},
})Environment Configuration
typescript
// config/streams.config.ts
import { registerAs } from '@nestjs/config';
export default registerAs('streams', () => ({
consumer: {
batchSize: parseInt(process.env.STREAMS_BATCH_SIZE || '10', 10),
concurrency: parseInt(process.env.STREAMS_CONCURRENCY || '1', 10),
maxRetries: parseInt(process.env.STREAMS_MAX_RETRIES || '3', 10),
},
dlq: {
enabled: process.env.STREAMS_DLQ_ENABLED !== 'false',
},
}));bash
# .env
STREAMS_BATCH_SIZE=20
STREAMS_CONCURRENCY=5
STREAMS_MAX_RETRIES=5
STREAMS_DLQ_ENABLED=true