Skip to content

Patterns

Common architectural patterns with Redis Streams.

Fan-out Pattern

Distribute same message to multiple consumers.

Implementation

typescript
// Producer
@Injectable()
export class OrderService {
  async createOrder(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepo.create(dto);

    // Publish once
    await this.producer.publish('orders', {
      type: 'ORDER_CREATED',
      orderId: order.id,
      customerId: order.customerId,
      total: order.total,
    });

    return order;
  }
}

// Consumer 1: Processing
@StreamConsumer({ stream: 'orders', group: 'processing' })
async processOrder(message: IStreamMessage<OrderEvent>) {
  await this.fulfillmentService.fulfill(message.data);
  await message.ack();
}

// Consumer 2: Analytics
@StreamConsumer({ stream: 'orders', group: 'analytics' })
async trackMetrics(message: IStreamMessage<OrderEvent>) {
  await this.analytics.track(message.data);
  await message.ack();
}

// Consumer 3: Notifications
@StreamConsumer({ stream: 'orders', group: 'notifications' })
async sendNotification(message: IStreamMessage<OrderEvent>) {
  await this.emailService.sendConfirmation(message.data);
  await message.ack();
}

Priority Queue

Process high-priority messages first.

typescript
// Publish with priority
await this.producer.publish('tasks', {
  id: 'task-1',
  priority: 'high',
  data: {...},
});

await this.producer.publish('tasks', {
  id: 'task-2',
  priority: 'low',
  data: {...},
});

// Separate streams by priority
await this.producer.publish('tasks:high', highPriorityTask);
await this.producer.publish('tasks:low', lowPriorityTask);

Multiple Streams by Priority

typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { Task } from './types';

@Injectable()
export class PriorityProcessor {
  // High priority - more resources
  @StreamConsumer({
    stream: 'tasks:high',
    group: 'processors',
    concurrency: 10,
    batchSize: 50,
  })
  async processHigh(message: IStreamMessage<Task>): Promise<void> {
    await this.processTask(message.data);
    await message.ack();
  }

  // Low priority - fewer resources
  @StreamConsumer({
    stream: 'tasks:low',
    group: 'processors',
    concurrency: 2,
    batchSize: 10,
  })
  async processLow(message: IStreamMessage<Task>): Promise<void> {
    await this.processTask(message.data);
    await message.ack();
  }

  private async processTask(data: Task): Promise<void> {
    // Process the task based on type
  }
}

Saga Pattern

Orchestrate multi-step distributed transactions.

Implementation

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
  SagaEvent,
  CreateOrderDto,
  OrderRepository,
  PaymentService,
  InventoryService,
} from './types';

@Injectable()
export class OrderSaga {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly orderRepo: OrderRepository,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
  ) {}

  async createOrder(dto: CreateOrderDto): Promise<void> {
    const order = await this.orderRepo.create(dto);

    await this.producer.publish('orders', {
      type: 'ORDER_CREATED',
      orderId: order.id,
      sagaId: `saga-${Date.now()}`,
      step: 'PAYMENT',
      data: order,
    });
  }

  // Step 2: Process Payment
  @StreamConsumer({ stream: 'orders', group: 'payment-processor' })
  async processPayment(message: IStreamMessage<SagaEvent>): Promise<void> {
    if (message.data.step !== 'PAYMENT') return await message.ack();

    try {
      await this.paymentService.charge(message.data.data);

      await this.producer.publish('orders', {
        type: 'PAYMENT_COMPLETED',
        sagaId: message.data.sagaId,
        step: 'INVENTORY',
        data: message.data.data,
      });

      await message.ack();
    } catch (error) {
      await this.producer.publish('orders', {
        type: 'PAYMENT_FAILED',
        sagaId: message.data.sagaId,
        step: 'ROLLBACK',
        data: message.data.data,
      });

      await message.ack();
    }
  }

  // Step 3: Reserve Inventory
  @StreamConsumer({ stream: 'orders', group: 'inventory-processor' })
  async reserveInventory(message: IStreamMessage<SagaEvent>): Promise<void> {
    if (message.data.step !== 'INVENTORY') return await message.ack();

    try {
      await this.inventoryService.reserve(message.data.data);

      await this.producer.publish('orders', {
        type: 'INVENTORY_RESERVED',
        sagaId: message.data.sagaId,
        step: 'NOTIFY',
        data: message.data.data,
      });

      await message.ack();
    } catch (error) {
      await this.producer.publish('orders', {
        type: 'INVENTORY_FAILED',
        sagaId: message.data.sagaId,
        step: 'ROLLBACK',
        data: message.data.data,
      });

      await message.ack();
    }
  }

  // Rollback Handler
  @StreamConsumer({ stream: 'orders', group: 'rollback-processor' })
  async handleRollback(message: IStreamMessage<SagaEvent>): Promise<void> {
    if (message.data.step !== 'ROLLBACK') return await message.ack();

    // Rollback based on how far we got
    if (message.data.type === 'PAYMENT_FAILED') {
      await this.orderRepo.cancel(message.data.data.orderId);
    } else if (message.data.type === 'INVENTORY_FAILED') {
      await this.paymentService.refund(message.data.data);
      await this.orderRepo.cancel(message.data.data.orderId);
    }

    await message.ack();
  }
}

Event Sourcing

Store all state changes as events.

typescript
// Aggregate Root
export class Order {
  private events: OrderEvent[] = [];

  create(dto: CreateOrderDto) {
    this.apply(new OrderCreatedEvent(dto));
  }

  confirm() {
    this.apply(new OrderConfirmedEvent(this.id));
  }

  cancel() {
    this.apply(new OrderCancelledEvent(this.id));
  }

  private apply(event: OrderEvent) {
    this.events.push(event);
    // Apply state change
    this.applyEvent(event);
  }

  getUncommittedEvents(): OrderEvent[] {
    return this.events;
  }
}

// Event Store
@Injectable()
export class OrderEventStore {
  async save(order: Order): Promise<void> {
    const events = order.getUncommittedEvents();

    for (const event of events) {
      await this.producer.publish('orders', event);
    }
  }

  async load(orderId: string): Promise<Order> {
    const order = new Order();

    // Read all events for this order
    const events = await this.getEventsForOrder(orderId);

    for (const event of events) {
      order.applyEvent(event);
    }

    return order;
  }

  private async getEventsForOrder(orderId: string): Promise<OrderEvent[]> {
    // Read from stream start
    const info = await this.producer.getStreamInfo('orders');

    // Fetch all messages and filter by orderId
    // In production, use secondary index or separate streams per aggregate
    return events;
  }
}

// Projection
@StreamConsumer({ stream: 'orders', group: 'order-projection' })
async projectOrder(message: IStreamMessage<OrderEvent>) {
  const event = message.data;

  switch (event.type) {
    case 'ORDER_CREATED':
      await this.orderView.create(event.data);
      break;
    case 'ORDER_CONFIRMED':
      await this.orderView.update(event.data.orderId, { status: 'confirmed' });
      break;
    case 'ORDER_CANCELLED':
      await this.orderView.update(event.data.orderId, { status: 'cancelled' });
      break;
  }

  await message.ack();
}

CQRS (Command Query Responsibility Segregation)

Separate read and write models.

typescript
// Command Side (Write)
@Injectable()
export class OrderCommandHandler {
  async createOrder(command: CreateOrderCommand): Promise<void> {
    // Validate command
    await this.validator.validate(command);

    // Create order
    const order = await this.orderRepo.create(command);

    // Publish event
    await this.producer.publish('orders', {
      type: 'ORDER_CREATED',
      orderId: order.id,
      data: order,
    });
  }
}

// Query Side (Read)
@Injectable()
export class OrderQueryHandler {
  async getOrder(orderId: string): Promise<OrderView> {
    // Read from optimized read model
    return await this.orderViewRepo.findOne(orderId);
  }

  async listOrders(filter: OrderFilter): Promise<OrderView[]> {
    // Query from denormalized view
    return await this.orderViewRepo.find(filter);
  }
}

// Read Model Projector
@StreamConsumer({ stream: 'orders', group: 'read-model' })
async updateReadModel(message: IStreamMessage<OrderEvent>) {
  const event = message.data;

  // Update denormalized read model
  await this.orderViewRepo.upsert({
    id: event.orderId,
    ...event.data,
    lastUpdated: new Date(),
  });

  await message.ack();
}

Outbox Pattern

Ensure reliable event publishing with database transactions.

typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
  CreateOrderDto,
  Order,
  OrderRepository,
  OutboxRepo,
  DatabaseTransaction,
} from './types';

@Injectable()
export class OutboxService {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly orderRepo: OrderRepository,
    private readonly outboxRepo: OutboxRepo,
    private readonly db: DatabaseTransaction,
  ) {}

  async createOrderWithEvents(dto: CreateOrderDto): Promise<Order> {
    return await this.db.transaction(async (tx) => {
      // 1. Create order in database
      const order = await this.orderRepo.create(dto, { transaction: tx });

      // 2. Write event to outbox table
      await this.outboxRepo.create({
        aggregateId: order.id,
        eventType: 'ORDER_CREATED',
        payload: order,
        createdAt: new Date(),
      }, { transaction: tx });

      return order;
    });
  }

  // Background job to publish outbox events
  // @Cron('*/5 * * * * *')  // Every 5 seconds
  async publishOutboxEvents(): Promise<void> {
    const events = await this.outboxRepo.findUnpublished();

    for (const event of events) {
      try {
        // Publish to stream
        await this.producer.publish('orders', {
          type: event.eventType,
          data: event.payload,
        });

        // Mark as published
        await this.outboxRepo.markPublished(event.id);
      } catch (error) {
        console.error('Failed to publish outbox event', error);
      }
    }
  }
}

Work Queue Pattern

Distribute work items to workers.

typescript
import { Injectable, Inject, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Job, EmailService, ReportService, ImageService } from './types';

// Producer: Add work to queue
@Injectable()
export class JobQueueService {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
  ) {}

  async addJob(job: Job): Promise<void> {
    await this.producer.publish('jobs', {
      id: job.id,
      type: job.type,
      data: job.data,
      priority: job.priority,
      createdAt: new Date(),
    });
  }
}

// Consumer: Process work items
@Injectable()
export class JobWorker {
  private readonly logger = new Logger(JobWorker.name);

  constructor(
    private readonly emailService: EmailService,
    private readonly reportService: ReportService,
    private readonly imageService: ImageService,
  ) {}

  @StreamConsumer({
    stream: 'jobs',
    group: 'workers',
    concurrency: 5,
  })
  async processJob(message: IStreamMessage<Job>): Promise<void> {
    const job = message.data;

    this.logger.log(`Processing job ${job.id} of type ${job.type}`);

    try {
      await this.executeJob(job);
      await message.ack();
    } catch (error) {
      this.logger.error(`Job ${job.id} failed:`, error);
      await message.reject(error);
    }
  }

  private async executeJob(job: Job): Promise<void> {
    switch (job.type) {
      case 'send-email':
        await this.emailService.send(job.data);
        break;
      case 'generate-report':
        await this.reportService.generate(job.data);
        break;
      case 'process-image':
        await this.imageService.process(job.data);
        break;
      default:
        throw new Error(`Unknown job type: ${job.type}`);
    }
  }
}

Next Steps

Released under the MIT License.