Skip to content

Producer API

Publish messages to streams.

Inject Producer

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';

@Injectable()
export class OrderService {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
  ) {}
}

Methods Overview

MethodDescriptionReturns
publish()Publish single messageMessage ID
publishBatch()Publish multiple messagesMessage IDs
getStreamInfo()Get stream metadataStreamInfo
trim()Remove old messagesnumber (trimmed count)

publish()

Publish a single message:

typescript
async publish<T>(
  stream: string,
  data: T,
  options?: PublishOptions
): Promise<string>

Basic Usage

typescript
const messageId = await this.producer.publish('orders', {
  id: 'order-123',
  customerId: 'cust-456',
  total: 99.99,
});

console.log(`Published: ${messageId}`);
// Published: 1706123456789-0

With Options

typescript
const messageId = await this.producer.publish(
  'orders',
  { id: 'order-123' },
  {
    maxLen: 50000,  // Trim stream to 50K messages
  }
);

PublishOptions

typescript
interface PublishOptions {
  /**
   * Maximum stream length (approximate)
   */
  maxLen?: number;

  /**
   * Custom message ID (rarely needed)
   */
  id?: string;
}

publishBatch()

Publish multiple messages efficiently:

typescript
async publishBatch<T>(
  stream: string,
  messages: T[],
  options?: PublishOptions
): Promise<string[]>

Usage

typescript
const orders = [
  { id: 'order-1', total: 100 },
  { id: 'order-2', total: 200 },
  { id: 'order-3', total: 300 },
];

const messageIds = await this.producer.publishBatch('orders', orders);

console.log(messageIds);
// ['1706123456789-0', '1706123456789-1', '1706123456789-2']

Performance

Batch publishing uses Redis pipeline:

Single publish (3 messages): 3 round trips
Batch publish (3 messages): 1 round trip

getStreamInfo()

Get stream metadata:

typescript
async getStreamInfo(stream: string): Promise<StreamInfo>

Usage

typescript
const info = await this.producer.getStreamInfo('orders');

console.log({
  length: info.length,           // 12345
  groups: info.groups,           // 2
  firstEntry: info.firstEntry,   // { id: '...', timestamp: Date }
  lastEntry: info.lastEntry,     // { id: '...', timestamp: Date }
});

StreamInfo Interface

typescript
interface StreamInfo {
  length: number;
  firstEntry?: {
    id: string;
    timestamp: Date;
  };
  lastEntry?: {
    id: string;
    timestamp: Date;
  };
  groups: number;
}

trim()

Remove old messages:

typescript
async trim(stream: string, maxLen: number): Promise<number>

Usage

typescript
// Keep only last 10,000 messages
const trimmed = await this.producer.trim('orders', 10000);
console.log(`Trimmed ${trimmed} messages`);

Automatic Trimming

Use maxLen in publish options for automatic trimming:

typescript
// Automatically trims while publishing
await this.producer.publish('orders', data, { maxLen: 100000 });

Error Handling

typescript
import { StreamPublishError } from '@nestjs-redisx/streams';

try {
  await this.producer.publish('orders', data);
} catch (error) {
  if (error instanceof StreamPublishError) {
    console.error('Publish failed:', error.message);
    console.error('Stream:', error.stream);

    // Retry or fallback
    await this.retryQueue.add(data);
  }
  throw error;
}

Real-World Example

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { CreateOrderDto, Order, OrderStats, OrderRepository } from './types';

@Injectable()
export class OrderService {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly orderRepo: OrderRepository,
  ) {}

  async createOrder(dto: CreateOrderDto): Promise<Order> {
    // 1. Save to database
    const order = await this.orderRepo.create(dto);

    // 2. Publish event
    try {
      await this.producer.publish('orders', {
        type: 'ORDER_CREATED',
        orderId: order.id,
        customerId: order.customerId,
        items: order.items,
        total: order.total,
        createdAt: order.createdAt,
      }, {
        maxLen: 100000,
      });
    } catch (error) {
      // Log but don't fail order creation
      console.error('Failed to publish order event:', error);
    }

    return order;
  }

  async getOrderStats(): Promise<OrderStats> {
    const info = await this.producer.getStreamInfo('orders');

    return {
      totalEvents: info.length,
      oldestEvent: info.firstEntry?.timestamp,
      newestEvent: info.lastEntry?.timestamp,
    };
  }
}

Next Steps

Released under the MIT License.