Skip to content

Configuration

Configure the Streams plugin.

Basic Configuration

typescript
import { Module } from '@nestjs/common';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';

@Module({
  imports: [
    RedisModule.forRoot({
      clients: {
        host: 'localhost',
        port: 6379,
      },
      plugins: [
        new StreamsPlugin({
          consumer: {
            batchSize: 10,
            concurrency: 1,
            blockTimeout: 5000,
            maxRetries: 3,
          },
          dlq: {
            enabled: true,
          },
        }),
      ],
    }),
  ],
})
export class AppModule {}

Async Configuration

Load connection config from ConfigService:

typescript
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin } from '@nestjs-redisx/streams';

@Module({
  imports: [
    ConfigModule.forRoot(),
    RedisModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      plugins: [
        new StreamsPlugin({
          consumer: {
            batchSize: parseInt(process.env.STREAMS_BATCH_SIZE || '10', 10),
            concurrency: parseInt(process.env.STREAMS_CONCURRENCY || '1', 10),
            maxRetries: parseInt(process.env.STREAMS_MAX_RETRIES || '3', 10),
          },
          dlq: {
            enabled: process.env.STREAMS_DLQ_ENABLED !== 'false',
          },
        }),
      ],
      useFactory: (config: ConfigService) => ({
        clients: {
          host: config.get<string>('REDIS_HOST', 'localhost'),
          port: config.get<number>('REDIS_PORT', 6379),
        },
      }),
    }),
  ],
})
export class AppModule {}

Configuration Options

Consumer Options

typescript
interface ConsumerOptions {
  /**
   * Messages per batch
   * @default 10
   */
  batchSize?: number;

  /**
   * Block timeout when waiting for messages (ms)
   * @default 5000
   */
  blockTimeout?: number;

  /**
   * Maximum concurrent message processing
   * @default 1
   */
  concurrency?: number;

  /**
   * Maximum retry attempts before DLQ
   * @default 3
   */
  maxRetries?: number;

  /**
   * Idle time before claiming messages (ms)
   * @default 30000
   */
  claimIdleTimeout?: number;
}

Producer Options

typescript
interface ProducerOptions {
  /**
   * Maximum stream length (auto-trim)
   * @default 100000
   */
  maxLen?: number;

  /**
   * Auto-create stream on first publish
   * @default true
   */
  autoCreate?: boolean;
}

Dead Letter Queue Options

typescript
interface DLQOptions {
  /**
   * Enable Dead Letter Queue
   * @default true
   */
  enabled?: boolean;

  /**
   * DLQ stream suffix
   * @default ':dlq'
   */
  streamSuffix?: string;

  /**
   * Maximum DLQ stream length
   * @default 10000
   */
  maxLen?: number;
}

Retry Options

INFO

When a message is rejected and attempt < maxRetries, the library waits using exponential backoff (initialDelay × multiplier^(attempt-1), capped at maxDelay), then re-adds the message to the stream with _attempt incremented. See Message Handling for details.

typescript
interface RetryOptions {
  /**
   * Maximum retry attempts
   * @default 3
   */
  maxRetries?: number;

  /**
   * Initial backoff delay (ms)
   * @default 1000
   */
  initialDelay?: number;

  /**
   * Maximum backoff delay (ms)
   * @default 30000
   */
  maxDelay?: number;

  /**
   * Backoff multiplier
   * @default 2
   */
  multiplier?: number;
}

Trim Options

typescript
interface TrimOptions {
  /**
   * Enable auto-trimming
   * @default true
   */
  enabled?: boolean;

  /**
   * Maximum stream length
   * @default 100000
   */
  maxLen?: number;

  /**
   * Trim strategy
   * @default 'MAXLEN'
   */
  strategy?: 'MAXLEN' | 'MINID';

  /**
   * Use approximate trimming (~)
   * @default true
   */
  approximate?: boolean;
}

Full Configuration Example

typescript
new StreamsPlugin({
  // Consumer configuration
  consumer: {
    batchSize: 50,
    blockTimeout: 10000,
    concurrency: 5,
    maxRetries: 5,
    claimIdleTimeout: 60000,
  },

  // Producer configuration
  producer: {
    maxLen: 500000,
    autoCreate: true,
  },

  // Dead Letter Queue
  dlq: {
    enabled: true,
    streamSuffix: ':failed',
    maxLen: 50000,
  },

  // Retry backoff
  retry: {
    maxRetries: 5,
    initialDelay: 2000,
    maxDelay: 30000,
    multiplier: 3,
  },

  // Stream trimming
  trim: {
    enabled: true,
    maxLen: 500000,
    strategy: 'MAXLEN',
    approximate: true,
  },
})

Retry Behavior

When a message handler fails and attempt < maxRetries, the library applies exponential backoff and re-adds the message to the stream with _attempt incremented. After maxRetries failures, the message moves to DLQ. Use claimIdle() separately for recovering orphaned messages (e.g., consumer crashed without ACK):

typescript
new StreamsPlugin({
  consumer: {
    maxRetries: 3,              // Max attempts before DLQ
  },
  retry: {
    initialDelay: 1000,         // First retry delay: 1s
    maxDelay: 30000,            // Cap at 30s
    multiplier: 2,              // Double each attempt
  },
})

See Message Handling for details on the retry lifecycle.

Presets

High Throughput

typescript
new StreamsPlugin({
  consumer: {
    batchSize: 100,      // Large batches
    concurrency: 20,     // High parallelism
    blockTimeout: 1000,  // Quick checks
  },
  producer: {
    maxLen: 1000000,     // Large stream
  },
})

Low-Latency Processing

typescript
new StreamsPlugin({
  consumer: {
    batchSize: 1,        // Single message
    concurrency: 10,
    blockTimeout: 100,   // Very short wait
  },
})

Reliable Processing

typescript
new StreamsPlugin({
  consumer: {
    maxRetries: 10,           // Many retries before DLQ
    claimIdleTimeout: 60000,  // Long idle before claim
  },
  dlq: {
    enabled: true,
    maxLen: 100000,      // Large DLQ
  },
})

Event Sourcing

typescript
new StreamsPlugin({
  trim: {
    enabled: false,      // Keep all events!
  },
  producer: {
    maxLen: 0,           // No limit
  },
})

Environment Configuration

typescript
// config/streams.config.ts
import { registerAs } from '@nestjs/config';

export default registerAs('streams', () => ({
  consumer: {
    batchSize: parseInt(process.env.STREAMS_BATCH_SIZE || '10', 10),
    concurrency: parseInt(process.env.STREAMS_CONCURRENCY || '1', 10),
    maxRetries: parseInt(process.env.STREAMS_MAX_RETRIES || '3', 10),
  },
  dlq: {
    enabled: process.env.STREAMS_DLQ_ENABLED !== 'false',
  },
}));
bash
# .env
STREAMS_BATCH_SIZE=20
STREAMS_CONCURRENCY=5
STREAMS_MAX_RETRIES=5
STREAMS_DLQ_ENABLED=true

Next Steps

Released under the MIT License.