Monitoring
Monitor stream health and performance.
Key Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
| Stream Length | Total messages in stream | > 1,000,000 |
| Consumer Lag | Messages waiting to be processed | > 10,000 |
| Pending Messages | Messages delivered but not ACKed | > 1,000 |
| Processing Rate | Messages processed per second | < expected rate |
| Error Rate | Failed messages per second | > 1% |
| DLQ Size | Messages in dead letter queue | > 100 |
Prometheus Metrics
Setup
import { Injectable } from '@nestjs/common';
import * as promClient from 'prom-client';
@Injectable()
export class StreamMetrics {
// Counter: Total messages published
readonly messagesPublished = new promClient.Counter({
name: 'stream_messages_published_total',
help: 'Total messages published to streams',
labelNames: ['stream'],
});
// Counter: Total messages processed
readonly messagesProcessed = new promClient.Counter({
name: 'stream_messages_processed_total',
help: 'Total messages processed',
labelNames: ['stream', 'group', 'status'],
});
// Gauge: Current stream length
readonly streamLength = new promClient.Gauge({
name: 'stream_length',
help: 'Current number of messages in stream',
labelNames: ['stream'],
});
// Gauge: Consumer lag
readonly consumerLag = new promClient.Gauge({
name: 'stream_consumer_lag',
help: 'Number of messages waiting to be processed',
labelNames: ['stream', 'group'],
});
// Gauge: Pending messages
readonly pendingMessages = new promClient.Gauge({
name: 'stream_pending_messages',
help: 'Messages delivered but not acknowledged',
labelNames: ['stream', 'group'],
});
// Histogram: Processing duration
readonly processingDuration = new promClient.Histogram({
name: 'stream_message_processing_duration_seconds',
help: 'Message processing duration',
labelNames: ['stream', 'group'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
});
// Gauge: DLQ size
readonly dlqSize = new promClient.Gauge({
name: 'stream_dlq_size',
help: 'Number of messages in DLQ',
labelNames: ['stream'],
});
}Collect Metrics
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';
import { StreamMetrics } from './monitoring-metrics.usage';
@Injectable()
export class StreamMonitor {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
@Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
private readonly metrics: StreamMetrics,
) {}
// @Cron('*/30 * * * * *') // Every 30 seconds
async collectMetrics(): Promise<void> {
const streams = ['orders', 'notifications', 'emails'];
for (const stream of streams) {
await this.collectStreamMetrics(stream);
}
}
private async collectStreamMetrics(stream: string): Promise<void> {
// Stream length
const info = await this.producer.getStreamInfo(stream);
this.metrics.streamLength.set({ stream }, info.length);
// DLQ size
const dlqInfo = await this.producer.getStreamInfo(`${stream}:dlq`);
this.metrics.dlqSize.set({ stream }, dlqInfo.length);
// IStreamConsumer has no getGroups() - track known group names
const groupNames = ['processors', 'notifications', 'analytics'];
for (const groupName of groupNames) {
// Pending messages
const pending = await this.consumer.getPending(stream, groupName);
this.metrics.pendingMessages.set(
{ stream, group: groupName },
pending.count
);
// Consumer lag (approximation)
const lag = info.length - pending.count;
this.metrics.consumerLag.set(
{ stream, group: groupName },
Math.max(0, lag)
);
}
}
}Instrument Consumers
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { StreamMetrics } from './monitoring-metrics.usage';
import { Order } from './types';
@Injectable()
export class InstrumentedConsumer {
constructor(private readonly metrics: StreamMetrics) {}
@StreamConsumer({ stream: 'orders', group: 'processors' })
async handle(message: IStreamMessage<Order>): Promise<void> {
const timer = this.metrics.processingDuration.startTimer({
stream: 'orders',
group: 'processors',
});
try {
await this.processOrder(message.data);
await message.ack();
this.metrics.messagesProcessed.inc({
stream: 'orders',
group: 'processors',
status: 'success',
});
} catch (error) {
await message.reject(error);
this.metrics.messagesProcessed.inc({
stream: 'orders',
group: 'processors',
status: 'error',
});
} finally {
timer();
}
}
private async processOrder(data: Order): Promise<void> {
// Process the order
}
}Grafana Dashboard
Built-in Metrics (redisx_*)
These metrics are automatically collected by the Metrics plugin — no extra code needed:
Processing Rate (success):
rate(redisx_stream_messages_consumed_total{stream="orders",status="success"}[5m])Error Rate:
rate(redisx_stream_messages_consumed_total{stream="orders",status="error"}[5m])Retry Rate:
rate(redisx_stream_messages_consumed_total{stream="orders",status="retry"}[5m])DLQ Rate:
rate(redisx_stream_messages_consumed_total{stream="orders",status="dead_letter"}[5m])P95 Processing Duration:
histogram_quantile(0.95,
rate(redisx_stream_processing_duration_seconds_bucket[5m])
)Publish Rate:
rate(redisx_stream_messages_published_total{stream="orders"}[5m])Custom Metrics
The examples above (in "Collect Metrics" and "Instrument Consumers") show how to add your own application-level metrics for stream length, consumer lag, pending counts, and DLQ size. These are not built-in — you implement them using the IStreamProducer.getStreamInfo() and IStreamConsumer.getPending() APIs with your own Prometheus registry.
Stream Length (custom):
stream_length{stream="orders"}Consumer Lag (custom):
stream_consumer_lag{stream="orders",group="processors"}DLQ Size (custom):
increase(stream_dlq_size{stream="orders"}[1h])Alert Rules (built-in)
Using built-in redisx_* metrics:
High Error Rate:
- alert: HighStreamErrorRate
expr: |
sum(rate(redisx_stream_messages_consumed_total{status="error"}[5m])) /
sum(rate(redisx_stream_messages_consumed_total[5m])) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High stream processing error rate"
description: "{{ $value | humanizePercentage }} errors on {{ $labels.stream }}"High DLQ Rate:
- alert: HighDLQRate
expr: rate(redisx_stream_messages_consumed_total{status="dead_letter"}[5m]) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Messages entering DLQ on {{ $labels.stream }}"Alert Rules (custom)
Using custom metrics from your collector (see examples above):
High Consumer Lag:
- alert: HighConsumerLag
expr: stream_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "High consumer lag on {{ $labels.stream }}"
description: "{{ $value }} messages waiting to be processed"High Pending Messages:
- alert: HighPendingMessages
expr: stream_pending_messages > 1000
for: 10m
labels:
severity: warning
annotations:
summary: "High pending messages on {{ $labels.stream }}/{{ $labels.group }}"DLQ Growing:
- alert: DLQGrowing
expr: increase(stream_dlq_size[1h]) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "DLQ size growing on {{ $labels.stream }}"Redis CLI Monitoring
Stream Info
# Get stream info
redis-cli XINFO STREAM orders
# Output:
# 1) length
# 2) (integer) 12345
# 3) radix-tree-keys
# 4) (integer) 1
# 5) radix-tree-nodes
# 6) (integer) 2
# 7) groups
# 8) (integer) 3
# 9) first-entry
# 10) 1) "1706123456789-0"
# 2) 1) "field1" 2) "value1"Consumer Group Info
# List groups
redis-cli XINFO GROUPS orders
# Output per group:
# 1) name
# 2) "processors"
# 3) consumers
# 4) (integer) 3
# 5) pending
# 6) (integer) 150
# 7) last-delivered-id
# 8) "1706123456799-0"Pending Messages
# Get pending count
redis-cli XPENDING orders processors
# Output:
# 1) (integer) 150 # Total pending
# 2) "1706123456789-0" # Oldest ID
# 3) "1706123456799-0" # Newest ID
# 4) 1) 1) "worker-1" # Consumer
# 2) "100" # Pending count
# 2) 1) "worker-2"
# 2) "50"Consumer Info
# List consumers
redis-cli XINFO CONSUMERS orders processors
# Output per consumer:
# 1) name
# 2) "worker-1"
# 3) pending
# 4) (integer) 100
# 5) idle
# 6) (integer) 5432 # ms since last activityApplication Logging
Structured Logging
import { Injectable, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { Order } from './types';
@Injectable()
export class LoggingConsumer {
private readonly logger = new Logger('StreamConsumer');
@StreamConsumer({ stream: 'orders', group: 'processors' })
async handle(message: IStreamMessage<Order>): Promise<void> {
this.logger.log({
event: 'message_received',
stream: 'orders',
group: 'processors',
messageId: message.id,
attempt: message.attempt,
timestamp: message.timestamp,
});
try {
await this.processOrder(message.data);
await message.ack();
this.logger.log({
event: 'message_processed',
messageId: message.id,
duration: Date.now() - message.timestamp.getTime(),
});
} catch (error) {
this.logger.error({
event: 'message_failed',
messageId: message.id,
attempt: message.attempt,
error: (error as Error).message,
stack: (error as Error).stack,
});
await message.reject(error as Error);
}
}
private async processOrder(data: Order): Promise<void> {
// Process the order
}
}Health Checks
import { Injectable, Inject } from '@nestjs/common';
import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';
@Injectable()
export class StreamHealthIndicator extends HealthIndicator {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
@Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
) {
super();
}
async isHealthy(): Promise<HealthIndicatorResult> {
try {
// Check if we can publish
const testId = await this.producer.publish('health-check', {
timestamp: Date.now(),
});
// Check if we can read stream info
const info = await this.producer.getStreamInfo('health-check');
// Check consumer groups
const pending = await this.consumer.getPending('orders', 'processors');
const isHealthy = pending.count < 10000; // Threshold
return this.getStatus('streams', isHealthy, {
streamLength: info.length,
pendingMessages: pending.count,
});
} catch (error) {
return this.getStatus('streams', false, {
message: (error as Error).message,
});
}
}
}Debug Tools
Inspect Messages
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer, STREAM_CONSUMER, IStreamConsumer } from '@nestjs-redisx/streams';
@Injectable()
export class StreamDebugger {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
@Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
) {}
async inspectStream(stream: string, count: number = 10): Promise<void> {
const info = await this.producer.getStreamInfo(stream);
console.log({
stream,
length: info.length,
firstEntry: info.firstEntry,
lastEntry: info.lastEntry,
groups: info.groups,
});
}
async inspectPending(stream: string, group: string): Promise<void> {
// getPending() returns IPendingInfo (summary, not individual messages)
const pending = await this.consumer.getPending(stream, group);
console.log({
totalPending: pending.count,
oldestId: pending.minId,
newestId: pending.maxId,
});
// Per-consumer breakdown
pending.consumers.forEach(c => {
console.log({
consumer: c.name,
pending: c.pending,
});
});
}
}Real-time Monitor
@Injectable()
export class RealtimeMonitor {
@Cron('*/10 * * * * *') // Every 10 seconds
async printStats() {
const streams = ['orders', 'notifications'];
// IStreamConsumer has no getGroups() — track known group names
const groupNames = ['processors', 'notifications'];
for (const stream of streams) {
const info = await this.producer.getStreamInfo(stream);
console.log(`\n=== ${stream} ===`);
console.log(`Length: ${info.length}`);
console.log(`Groups: ${info.groups}`);
for (const groupName of groupNames) {
const pending = await this.consumer.getPending(stream, groupName);
console.log(` ${groupName}:`);
console.log(` Pending: ${pending.count}`);
console.log(` Consumers: ${pending.consumers.length}`);
}
}
}
}Best Practices
1. Monitor consumer lag continuously:
@Cron('*/1 * * * *') // Every minute
async checkLag() {
const lag = await this.getConsumerLag('orders', 'processors');
if (lag > threshold) {
await this.alertService.send('High consumer lag');
}
}2. Set up alerts for DLQ growth:
@Cron('*/5 * * * *')
async checkDLQ() {
const size = await this.getDLQSize('orders');
if (size > 100) {
await this.alertService.send('DLQ size critical');
}
}3. Track processing duration:
const timer = this.metrics.processingDuration.startTimer();
try {
await this.process(message);
} finally {
timer();
}4. Log errors with context:
catch (error) {
this.logger.error({
message: 'Processing failed',
messageId: message.id,
stream: message.stream,
attempt: message.attempt,
error: error.message,
data: message.data,
});
}Next Steps
- Testing — Test stream consumers
- Troubleshooting — Debug common issues