Skip to content

Monitoring

Monitor stream health and performance.

Key Metrics

MetricDescriptionAlert Threshold
Stream LengthTotal messages in stream> 1,000,000
Consumer LagMessages waiting to be processed> 10,000
Pending MessagesMessages delivered but not ACKed> 1,000
Processing RateMessages processed per second< expected rate
Error RateFailed messages per second> 1%
DLQ SizeMessages in dead letter queue> 100

Prometheus Metrics

Setup

typescript
import { Injectable } from '@nestjs/common';
import * as promClient from 'prom-client';

@Injectable()
export class StreamMetrics {
  // Counter: Total messages published
  readonly messagesPublished = new promClient.Counter({
    name: 'stream_messages_published_total',
    help: 'Total messages published to streams',
    labelNames: ['stream'],
  });

  // Counter: Total messages processed
  readonly messagesProcessed = new promClient.Counter({
    name: 'stream_messages_processed_total',
    help: 'Total messages processed',
    labelNames: ['stream', 'group', 'status'],
  });

  // Gauge: Current stream length
  readonly streamLength = new promClient.Gauge({
    name: 'stream_length',
    help: 'Current number of messages in stream',
    labelNames: ['stream'],
  });

  // Gauge: Consumer lag
  readonly consumerLag = new promClient.Gauge({
    name: 'stream_consumer_lag',
    help: 'Number of messages waiting to be processed',
    labelNames: ['stream', 'group'],
  });

  // Gauge: Pending messages
  readonly pendingMessages = new promClient.Gauge({
    name: 'stream_pending_messages',
    help: 'Messages delivered but not acknowledged',
    labelNames: ['stream', 'group'],
  });

  // Histogram: Processing duration
  readonly processingDuration = new promClient.Histogram({
    name: 'stream_message_processing_duration_seconds',
    help: 'Message processing duration',
    labelNames: ['stream', 'group'],
    buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
  });

  // Gauge: DLQ size
  readonly dlqSize = new promClient.Gauge({
    name: 'stream_dlq_size',
    help: 'Number of messages in DLQ',
    labelNames: ['stream'],
  });
}

Collect Metrics

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';
import { StreamMetrics } from './monitoring-metrics.usage';

@Injectable()
export class StreamMonitor {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    @Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
    private readonly metrics: StreamMetrics,
  ) {}

  // @Cron('*/30 * * * * *')  // Every 30 seconds
  async collectMetrics(): Promise<void> {
    const streams = ['orders', 'notifications', 'emails'];

    for (const stream of streams) {
      await this.collectStreamMetrics(stream);
    }
  }

  private async collectStreamMetrics(stream: string): Promise<void> {
    // Stream length
    const info = await this.producer.getStreamInfo(stream);
    this.metrics.streamLength.set({ stream }, info.length);

    // DLQ size
    const dlqInfo = await this.producer.getStreamInfo(`${stream}:dlq`);
    this.metrics.dlqSize.set({ stream }, dlqInfo.length);

    // IStreamConsumer has no getGroups() - track known group names
    const groupNames = ['processors', 'notifications', 'analytics'];

    for (const groupName of groupNames) {
      // Pending messages
      const pending = await this.consumer.getPending(stream, groupName);
      this.metrics.pendingMessages.set(
        { stream, group: groupName },
        pending.count
      );

      // Consumer lag (approximation)
      const lag = info.length - pending.count;
      this.metrics.consumerLag.set(
        { stream, group: groupName },
        Math.max(0, lag)
      );
    }
  }
}

Instrument Consumers

typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { StreamMetrics } from './monitoring-metrics.usage';
import { Order } from './types';

@Injectable()
export class InstrumentedConsumer {
  constructor(private readonly metrics: StreamMetrics) {}

  @StreamConsumer({ stream: 'orders', group: 'processors' })
  async handle(message: IStreamMessage<Order>): Promise<void> {
    const timer = this.metrics.processingDuration.startTimer({
      stream: 'orders',
      group: 'processors',
    });

    try {
      await this.processOrder(message.data);
      await message.ack();

      this.metrics.messagesProcessed.inc({
        stream: 'orders',
        group: 'processors',
        status: 'success',
      });
    } catch (error) {
      await message.reject(error);

      this.metrics.messagesProcessed.inc({
        stream: 'orders',
        group: 'processors',
        status: 'error',
      });
    } finally {
      timer();
    }
  }

  private async processOrder(data: Order): Promise<void> {
    // Process the order
  }
}

Grafana Dashboard

Built-in Metrics (redisx_*)

These metrics are automatically collected by the Metrics plugin — no extra code needed:

Processing Rate (success):

yaml
rate(redisx_stream_messages_consumed_total{stream="orders",status="success"}[5m])

Error Rate:

yaml
rate(redisx_stream_messages_consumed_total{stream="orders",status="error"}[5m])

Retry Rate:

yaml
rate(redisx_stream_messages_consumed_total{stream="orders",status="retry"}[5m])

DLQ Rate:

yaml
rate(redisx_stream_messages_consumed_total{stream="orders",status="dead_letter"}[5m])

P95 Processing Duration:

yaml
histogram_quantile(0.95,
  rate(redisx_stream_processing_duration_seconds_bucket[5m])
)

Publish Rate:

yaml
rate(redisx_stream_messages_published_total{stream="orders"}[5m])

Custom Metrics

The examples above (in "Collect Metrics" and "Instrument Consumers") show how to add your own application-level metrics for stream length, consumer lag, pending counts, and DLQ size. These are not built-in — you implement them using the IStreamProducer.getStreamInfo() and IStreamConsumer.getPending() APIs with your own Prometheus registry.

Stream Length (custom):

yaml
stream_length{stream="orders"}

Consumer Lag (custom):

yaml
stream_consumer_lag{stream="orders",group="processors"}

DLQ Size (custom):

yaml
increase(stream_dlq_size{stream="orders"}[1h])

Alert Rules (built-in)

Using built-in redisx_* metrics:

High Error Rate:

yaml
- alert: HighStreamErrorRate
  expr: |
    sum(rate(redisx_stream_messages_consumed_total{status="error"}[5m])) /
    sum(rate(redisx_stream_messages_consumed_total[5m])) > 0.05
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "High stream processing error rate"
    description: "{{ $value | humanizePercentage }} errors on {{ $labels.stream }}"

High DLQ Rate:

yaml
- alert: HighDLQRate
  expr: rate(redisx_stream_messages_consumed_total{status="dead_letter"}[5m]) > 0
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "Messages entering DLQ on {{ $labels.stream }}"

Alert Rules (custom)

Using custom metrics from your collector (see examples above):

High Consumer Lag:

yaml
- alert: HighConsumerLag
  expr: stream_consumer_lag > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "High consumer lag on {{ $labels.stream }}"
    description: "{{ $value }} messages waiting to be processed"

High Pending Messages:

yaml
- alert: HighPendingMessages
  expr: stream_pending_messages > 1000
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "High pending messages on {{ $labels.stream }}/{{ $labels.group }}"

DLQ Growing:

yaml
- alert: DLQGrowing
  expr: increase(stream_dlq_size[1h]) > 10
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "DLQ size growing on {{ $labels.stream }}"

Redis CLI Monitoring

Stream Info

bash
# Get stream info
redis-cli XINFO STREAM orders

# Output:
# 1) length
# 2) (integer) 12345
# 3) radix-tree-keys
# 4) (integer) 1
# 5) radix-tree-nodes
# 6) (integer) 2
# 7) groups
# 8) (integer) 3
# 9) first-entry
# 10) 1) "1706123456789-0"
#     2) 1) "field1" 2) "value1"

Consumer Group Info

bash
# List groups
redis-cli XINFO GROUPS orders

# Output per group:
# 1) name
# 2) "processors"
# 3) consumers
# 4) (integer) 3
# 5) pending
# 6) (integer) 150
# 7) last-delivered-id
# 8) "1706123456799-0"

Pending Messages

bash
# Get pending count
redis-cli XPENDING orders processors

# Output:
# 1) (integer) 150        # Total pending
# 2) "1706123456789-0"    # Oldest ID
# 3) "1706123456799-0"    # Newest ID
# 4) 1) 1) "worker-1"     # Consumer
#       2) "100"          # Pending count
#    2) 1) "worker-2"
#       2) "50"

Consumer Info

bash
# List consumers
redis-cli XINFO CONSUMERS orders processors

# Output per consumer:
# 1) name
# 2) "worker-1"
# 3) pending
# 4) (integer) 100
# 5) idle
# 6) (integer) 5432  # ms since last activity

Application Logging

Structured Logging

typescript
import { Injectable, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { Order } from './types';

@Injectable()
export class LoggingConsumer {
  private readonly logger = new Logger('StreamConsumer');

  @StreamConsumer({ stream: 'orders', group: 'processors' })
  async handle(message: IStreamMessage<Order>): Promise<void> {
    this.logger.log({
      event: 'message_received',
      stream: 'orders',
      group: 'processors',
      messageId: message.id,
      attempt: message.attempt,
      timestamp: message.timestamp,
    });

    try {
      await this.processOrder(message.data);
      await message.ack();

      this.logger.log({
        event: 'message_processed',
        messageId: message.id,
        duration: Date.now() - message.timestamp.getTime(),
      });
    } catch (error) {
      this.logger.error({
        event: 'message_failed',
        messageId: message.id,
        attempt: message.attempt,
        error: (error as Error).message,
        stack: (error as Error).stack,
      });

      await message.reject(error as Error);
    }
  }

  private async processOrder(data: Order): Promise<void> {
    // Process the order
  }
}

Health Checks

typescript
import { Injectable, Inject } from '@nestjs/common';
import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';

@Injectable()
export class StreamHealthIndicator extends HealthIndicator {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    @Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
  ) {
    super();
  }

  async isHealthy(): Promise<HealthIndicatorResult> {
    try {
      // Check if we can publish
      const testId = await this.producer.publish('health-check', {
        timestamp: Date.now(),
      });

      // Check if we can read stream info
      const info = await this.producer.getStreamInfo('health-check');

      // Check consumer groups
      const pending = await this.consumer.getPending('orders', 'processors');

      const isHealthy = pending.count < 10000;  // Threshold

      return this.getStatus('streams', isHealthy, {
        streamLength: info.length,
        pendingMessages: pending.count,
      });
    } catch (error) {
      return this.getStatus('streams', false, {
        message: (error as Error).message,
      });
    }
  }
}

Debug Tools

Inspect Messages

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';

@Injectable()
export class StreamDebugger {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    @Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
  ) {}

  async inspectStream(stream: string, count: number = 10): Promise<void> {
    const info = await this.producer.getStreamInfo(stream);

    console.log({
      stream,
      length: info.length,
      firstEntry: info.firstEntry,
      lastEntry: info.lastEntry,
      groups: info.groups,
    });
  }

  async inspectPending(stream: string, group: string): Promise<void> {
    // getPending() returns IPendingInfo (summary, not individual messages)
    const pending = await this.consumer.getPending(stream, group);

    console.log({
      totalPending: pending.count,
      oldestId: pending.minId,
      newestId: pending.maxId,
    });

    // Per-consumer breakdown
    pending.consumers.forEach(c => {
      console.log({
        consumer: c.name,
        pending: c.pending,
      });
    });
  }
}

Real-time Monitor

typescript
@Injectable()
export class RealtimeMonitor {
  @Cron('*/10 * * * * *')  // Every 10 seconds
  async printStats() {
    const streams = ['orders', 'notifications'];

    // IStreamConsumer has no getGroups() — track known group names
    const groupNames = ['processors', 'notifications'];

    for (const stream of streams) {
      const info = await this.producer.getStreamInfo(stream);

      console.log(`\n=== ${stream} ===`);
      console.log(`Length: ${info.length}`);
      console.log(`Groups: ${info.groups}`);

      for (const groupName of groupNames) {
        const pending = await this.consumer.getPending(stream, groupName);
        console.log(`  ${groupName}:`);
        console.log(`    Pending: ${pending.count}`);
        console.log(`    Consumers: ${pending.consumers.length}`);
      }
    }
  }
}

Best Practices

1. Monitor consumer lag continuously:

typescript
@Cron('*/1 * * * *')  // Every minute
async checkLag() {
  const lag = await this.getConsumerLag('orders', 'processors');
  if (lag > threshold) {
    await this.alertService.send('High consumer lag');
  }
}

2. Set up alerts for DLQ growth:

typescript
@Cron('*/5 * * * *')
async checkDLQ() {
  const size = await this.getDLQSize('orders');
  if (size > 100) {
    await this.alertService.send('DLQ size critical');
  }
}

3. Track processing duration:

typescript
const timer = this.metrics.processingDuration.startTimer();
try {
  await this.process(message);
} finally {
  timer();
}

4. Log errors with context:

typescript
catch (error) {
  this.logger.error({
    message: 'Processing failed',
    messageId: message.id,
    stream: message.stream,
    attempt: message.attempt,
    error: error.message,
    data: message.data,
  });
}

Next Steps

Released under the MIT License.