Skip to content

Dead Letter Queue

Handle messages that fail after maximum retries.

What is a DLQ?

A Dead Letter Queue (DLQ) is a separate stream where failed messages are moved after exhausting all retry attempts.

Enable DLQ

typescript
new StreamsPlugin({
  dlq: {
    enabled: true,
    streamSuffix: ':dlq',
    maxLen: 10000,
  },
  consumer: {
    maxRetries: 3,  // Try 3 times before DLQ
  },
})

How It Works

Automatic DLQ

When max retries reached, message automatically moves to DLQ:

typescript
@StreamConsumer({
  stream: 'orders',
  group: 'processors',
  maxRetries: 3,  // After 3 failures → DLQ
})
async handle(message: IStreamMessage<Order>): Promise<void> {
  try {
    await this.processOrder(message.data);
    await message.ack();
  } catch (error) {
    // Will retry up to 3 times, then DLQ
    await message.reject(error);
  }
}

Flow:

Attempt 1: Fail → Retry
Attempt 2: Fail → Retry
Attempt 3: Fail → Retry
Attempt 4: Fail → Move to orders:dlq

DLQ Stream Name

DLQ stream = original stream + suffix:

Stream: orders
DLQ:    orders:dlq

Stream: notifications
DLQ:    notifications:dlq

DLQ Message Format

DLQ messages follow the DlqMessage<T> interface:

typescript
interface DlqMessage<T> {
  id: string;                    // DLQ message ID
  data: T;                       // Original message data
  originalId: string;            // Original stream message ID
  originalStream: string;        // Source stream name
  error: string;                 // Last error message
  failedAt: Date;                // When the message was moved to DLQ
}

// Example:
{
  id: '1706200000000-0',
  data: {
    orderId: 'order-123',
    customerId: 'cust-456',
  },
  originalId: '1706123456789-0',
  originalStream: 'orders',
  error: 'Payment gateway timeout',
  failedAt: new Date('2025-01-28T10:30:00Z'),
}

Monitor DLQ

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { AlertService } from './types';

@Injectable()
export class DLQMonitor {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly alertService: AlertService,
  ) {}

  async getDLQCount(stream: string): Promise<number> {
    const info = await this.producer.getStreamInfo(`${stream}:dlq`);
    return info.length;
  }

  // @Cron('*/5 * * * *')  // Every 5 minutes
  async checkDLQ(): Promise<void> {
    const count = await this.getDLQCount('orders');

    if (count > 100) {
      await this.alertService.send({
        title: 'High DLQ Count',
        message: `${count} messages in orders:dlq`,
        severity: 'critical',
      });
    }
  }
}

Read DLQ Messages

Manual Consumer

Create a consumer to read DLQ:

typescript
@Injectable()
export class DLQReader {
  constructor(
    @Inject(DEAD_LETTER_SERVICE) private readonly dlq: IDeadLetterService,
  ) {}

  async inspectDLQ(stream: string): Promise<void> {
    const messages = await this.dlq.getMessages(stream);

    messages.forEach(msg => {
      console.log({
        originalStream: msg.originalStream,
        originalId: msg.originalId,
        error: msg.error,
        failedAt: msg.failedAt,
        data: msg.data,
      });
    });
  }
}

Query DLQ via Service

typescript
import { DEAD_LETTER_SERVICE, IDeadLetterService } from '@nestjs-redisx/streams';

@Injectable()
export class DLQService {
  constructor(
    @Inject(DEAD_LETTER_SERVICE) private readonly dlq: IDeadLetterService,
  ) {}

  async getDLQMessages(stream: string, count: number = 100) {
    return this.dlq.getMessages(stream, count);
  }
}

Requeue and Purge

Move messages from DLQ back to original stream, or clear the DLQ:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { DEAD_LETTER_SERVICE, IDeadLetterService } from '@nestjs-redisx/streams';

@Injectable()
export class DLQRequeue {
  constructor(
    @Inject(DEAD_LETTER_SERVICE) private readonly dlq: IDeadLetterService,
  ) {}

  async requeueMessage(
    dlqMessageId: string,
    stream: string,
  ): Promise<void> {
    // Requeue moves the message back to the original stream
    const newId = await this.dlq.requeue(dlqMessageId, stream);
    console.log(`Requeued message ${dlqMessageId}, new ID: ${newId}`);
  }

  async purgeDLQ(stream: string): Promise<void> {
    const count = await this.dlq.purge(stream);
    console.log(`Purged ${count} messages from DLQ: ${stream}`);
  }
}

DLQ Strategies

1. Manual Review

Inspect and manually fix issues:

typescript
@Get('admin/dlq/:stream')
async getDLQMessages(@Param('stream') stream: string) {
  const messages = await this.dlqService.getDLQMessages(stream);

  return messages.map(msg => ({
    id: msg.originalId,
    error: msg.error,
    failedAt: msg.failedAt,
    data: msg.data,
  }));
}

@Post('admin/dlq/:stream/requeue/:id')
async requeueMessage(
  @Param('stream') stream: string,
  @Param('id') messageId: string,
) {
  await this.dlqRequeue.requeueMessage(messageId, stream);
  return { success: true };
}

2. Auto-Requeue After Delay

Automatically retry DLQ messages after some time:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { DEAD_LETTER_SERVICE, IDeadLetterService } from '@nestjs-redisx/streams';

@Injectable()
export class DLQAutoRequeue {
  constructor(
    @Inject(DEAD_LETTER_SERVICE) private readonly dlq: IDeadLetterService,
  ) {}

  // @Cron('0 */6 * * *')  // Every 6 hours
  async autoRequeue(): Promise<void> {
    const messages = await this.dlq.getMessages('orders', 100);

    // Requeue messages older than 6 hours
    const sixHoursAgo = Date.now() - 6 * 60 * 60 * 1000;

    for (const msg of messages) {
      if (msg.failedAt.getTime() < sixHoursAgo) {
        await this.dlq.requeue(msg.id, 'orders');
      }
    }
  }
}

3. Alternative Processing

Read DLQ messages and process differently based on error type:

typescript
import { Injectable, Inject } from '@nestjs/common';
import {
  DEAD_LETTER_SERVICE,
  IDeadLetterService,
  STREAM_PRODUCER,
  IStreamProducer,
} from '@nestjs-redisx/streams';
import { Order } from './types';

@Injectable()
export class DLQProcessor {
  constructor(
    @Inject(DEAD_LETTER_SERVICE) private readonly dlq: IDeadLetterService,
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
  ) {}

  async processFailedOrders(): Promise<void> {
    const messages = await this.dlq.getMessages<Order>('orders', 50);

    for (const msg of messages) {
      if (msg.error.includes('timeout')) {
        // Use backup payment processor
        await this.processWithBackup(msg.data);
      } else if (msg.error.includes('validation')) {
        // Fix validation and reprocess
        const fixed = await this.fixValidation(msg.data);
        await this.producer.publish(msg.originalStream, fixed);
      }
    }
  }

  private async processWithBackup(data: Order): Promise<void> {
    // Backup processing logic
  }

  private async fixValidation(data: Order): Promise<Order> {
    // Fix validation issues and return corrected data
    return data;
  }
}

Monitoring

Prometheus Metrics

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import * as promClient from 'prom-client';

@Injectable()
export class DLQMetrics {
  private readonly dlqCounter = new promClient.Counter({
    name: 'redisx_stream_dlq_total',
    help: 'Total messages moved to DLQ',
    labelNames: ['stream'],
  });

  private readonly dlqGauge = new promClient.Gauge({
    name: 'redisx_stream_dlq_size',
    help: 'Current DLQ size',
    labelNames: ['stream'],
  });

  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
  ) {}

  async trackDLQMove(stream: string): Promise<void> {
    this.dlqCounter.inc({ stream });
  }

  // @Cron('*/1 * * * *')  // Every minute
  async updateDLQSizes(): Promise<void> {
    const streams = ['orders', 'notifications', 'emails'];

    for (const stream of streams) {
      const info = await this.producer.getStreamInfo(`${stream}:dlq`);
      this.dlqGauge.set({ stream }, info.length);
    }
  }
}

Grafana Dashboard

yaml
# DLQ message rate
rate(redisx_stream_dlq_total[5m])

# Current DLQ size
redisx_stream_dlq_size

# Alert: High DLQ size
redisx_stream_dlq_size > 100

Best Practices

1. Keep DLQ size bounded:

typescript
dlq: {
  maxLen: 10000,  // Trim old DLQ messages
}

2. Monitor DLQ growth:

typescript
@Cron('*/5 * * * *')
async alertOnDLQ() {
  const size = await this.getDLQCount('orders');
  if (size > threshold) {
    await this.alert('DLQ growing');
  }
}

3. Review DLQ regularly:

Schedule manual review of DLQ messages to fix recurring issues.

4. Log DLQ moves:

typescript
async moveToDLQ(message: any, error: Error) {
  this.logger.error({
    message: 'Message moved to DLQ',
    stream: message.stream,
    messageId: message.id,
    error: error.message,
    data: message.data,
  });
}

Next Steps

Released under the MIT License.