Skip to content

Consumer API

Consume and process messages from streams.

@StreamConsumer Decorator

The easiest way to consume messages:

typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { OrderEvent, FulfillmentService } from './types';

@Injectable()
export class OrderProcessor {
  constructor(private readonly fulfillmentService: FulfillmentService) {}

  @StreamConsumer({
    stream: 'orders',
    group: 'order-processors',
    batchSize: 10,
  })
  async handleOrder(message: IStreamMessage<OrderEvent>): Promise<void> {
    const { orderId } = message.data;

    try {
      await this.fulfillmentService.process(orderId);
      await message.ack();
    } catch (error) {
      await message.reject(error);
    }
  }
}

Decorator Options

typescript
interface StreamConsumerOptions {
  /**
   * Stream name
   */
  stream: string;

  /**
   * Consumer group name
   */
  group: string;

  /**
   * Consumer name (default: hostname-pid)
   */
  consumer?: string;

  /**
   * Messages per batch
   * @default 10
   */
  batchSize?: number;

  /**
   * Block timeout (ms)
   * @default 5000
   */
  blockTimeout?: number;

  /**
   * Max retries before DLQ
   * @default 3
   */
  maxRetries?: number;

  /**
   * Concurrent processing
   * @default 1
   */
  concurrency?: number;
}

IStreamMessage Interface

typescript
interface IStreamMessage<T> {
  /**
   * Message ID (timestamp-sequence)
   */
  readonly id: string;

  /**
   * Stream name
   */
  readonly stream: string;

  /**
   * Message data
   */
  readonly data: T;

  /**
   * Current attempt (1-based)
   */
  readonly attempt: number;

  /**
   * Message timestamp
   */
  readonly timestamp: Date;

  /**
   * Acknowledge success
   */
  ack(): Promise<void>;

  /**
   * Reject (retry or DLQ)
   */
  reject(error?: Error): Promise<void>;
}

Message Handling

Success Path

typescript
@StreamConsumer({ stream: 'orders', group: 'processors' })
async handle(message: IStreamMessage<Order>): Promise<void> {
  // Process
  await this.processOrder(message.data);

  // Acknowledge success
  await message.ack();
}

Error Path

typescript
@StreamConsumer({ stream: 'orders', group: 'processors' })
async handle(message: IStreamMessage<Order>): Promise<void> {
  try {
    await this.processOrder(message.data);
    await message.ack();
  } catch (error) {
    // Will retry or move to DLQ
    await message.reject(error);
  }
}

Conditional Retry

typescript
@StreamConsumer({ stream: 'orders', group: 'processors', maxRetries: 5 })
async handle(message: IStreamMessage<Order>): Promise<void> {
  try {
    await this.processOrder(message.data);
    await message.ack();
  } catch (error) {
    if (this.isTransientError(error)) {
      // Transient error - will retry
      console.log(`Attempt ${message.attempt}/5 failed, will retry`);
      await message.reject(error);
    } else {
      // Permanent error - skip retries, go to DLQ
      console.error('Permanent error, moving to DLQ');
      await message.reject(error);
    }
  }
}

Manual Consumer Service

For more control, use the service directly:

typescript
import { Injectable, Inject, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import {
  STREAM_CONSUMER,
  IStreamConsumer,
  IStreamMessage,
  ConsumerHandle,
} from '@nestjs-redisx/streams';
import { Order, OrderService } from './types';

@Injectable()
export class OrderConsumer implements OnModuleInit, OnModuleDestroy {
  private handle: ConsumerHandle;

  constructor(
    @Inject(STREAM_CONSUMER) private readonly consumer: IStreamConsumer,
    private readonly orderService: OrderService,
  ) {}

  async onModuleInit(): Promise<void> {
    // Create group if not exists
    await this.consumer.createGroup('orders', 'processors', '0');

    // Start consuming
    this.handle = this.consumer.consume<Order>(
      'orders',
      'processors',
      'worker-1',
      async (message) => {
        await this.processMessage(message);
      },
      {
        batchSize: 10,
        concurrency: 5,
      }
    );
  }

  async onModuleDestroy(): Promise<void> {
    if (this.handle) {
      await this.consumer.stop(this.handle);
    }
  }

  private async processMessage(message: IStreamMessage<Order>): Promise<void> {
    try {
      await this.orderService.process(message.data);
      await message.ack();
    } catch (error) {
      await message.reject(error);
    }
  }
}

Consumer Service Methods

createGroup()

Create a consumer group:

typescript
await consumer.createGroup(
  'orders',      // Stream
  'processors',  // Group name
  '0'            // Start ID: '0' = beginning, '$' = new only
);

consume()

Start consuming messages:

typescript
const handle = consumer.consume(
  'orders',      // Stream
  'processors',  // Group
  'worker-1',    // Consumer name
  handler,       // Message handler
  options        // ConsumeOptions
);

stop()

Stop a consumer:

typescript
await consumer.stop(handle);

getPending()

Get pending messages info:

typescript
const pending = await consumer.getPending('orders', 'processors');

console.log({
  count: pending.count,
  minId: pending.minId,
  maxId: pending.maxId,
  consumers: pending.consumers,
});

claimIdle()

Claim idle messages from dead consumers:

typescript
const claimed = await consumer.claimIdle(
  'orders',
  'processors',
  'worker-1',
  30000,  // Min idle time (ms)
);
// Returns: IStreamMessage<T>[]

Multiple Consumers

Same Group (Load Balancing)

Messages distributed between consumers:

typescript
// Consumer 1
@StreamConsumer({ stream: 'orders', group: 'processors', consumer: 'worker-1' })
async handle1(message: IStreamMessage<Order>) { }

// Consumer 2
@StreamConsumer({ stream: 'orders', group: 'processors', consumer: 'worker-2' })
async handle2(message: IStreamMessage<Order>) { }

Different Groups (Fan-out)

Each group gets all messages:

typescript
// Processing group
@StreamConsumer({ stream: 'orders', group: 'processors' })
async process(message: IStreamMessage<Order>) { }

// Analytics group
@StreamConsumer({ stream: 'orders', group: 'analytics' })
async analyze(message: IStreamMessage<Order>) { }

// Notifications group
@StreamConsumer({ stream: 'orders', group: 'notifications' })
async notify(message: IStreamMessage<Order>) { }

Next Steps

Released under the MIT License.