Patterns
Common architectural patterns with Redis Streams.
Fan-out Pattern
Distribute same message to multiple consumers.
Implementation
typescript
// Producer
@Injectable()
export class OrderService {
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepo.create(dto);
// Publish once
await this.producer.publish('orders', {
type: 'ORDER_CREATED',
orderId: order.id,
customerId: order.customerId,
total: order.total,
});
return order;
}
}
// Consumer 1: Processing
@StreamConsumer({ stream: 'orders', group: 'processing' })
async processOrder(message: IStreamMessage<OrderEvent>) {
await this.fulfillmentService.fulfill(message.data);
await message.ack();
}
// Consumer 2: Analytics
@StreamConsumer({ stream: 'orders', group: 'analytics' })
async trackMetrics(message: IStreamMessage<OrderEvent>) {
await this.analytics.track(message.data);
await message.ack();
}
// Consumer 3: Notifications
@StreamConsumer({ stream: 'orders', group: 'notifications' })
async sendNotification(message: IStreamMessage<OrderEvent>) {
await this.emailService.sendConfirmation(message.data);
await message.ack();
}Priority Queue
Process high-priority messages first.
typescript
// Publish with priority
await this.producer.publish('tasks', {
id: 'task-1',
priority: 'high',
data: {...},
});
await this.producer.publish('tasks', {
id: 'task-2',
priority: 'low',
data: {...},
});
// Separate streams by priority
await this.producer.publish('tasks:high', highPriorityTask);
await this.producer.publish('tasks:low', lowPriorityTask);Multiple Streams by Priority
typescript
import { Injectable } from '@nestjs/common';
import { StreamConsumer, IStreamMessage } from '@nestjs-redisx/streams';
import { Task } from './types';
@Injectable()
export class PriorityProcessor {
// High priority - more resources
@StreamConsumer({
stream: 'tasks:high',
group: 'processors',
concurrency: 10,
batchSize: 50,
})
async processHigh(message: IStreamMessage<Task>): Promise<void> {
await this.processTask(message.data);
await message.ack();
}
// Low priority - fewer resources
@StreamConsumer({
stream: 'tasks:low',
group: 'processors',
concurrency: 2,
batchSize: 10,
})
async processLow(message: IStreamMessage<Task>): Promise<void> {
await this.processTask(message.data);
await message.ack();
}
private async processTask(data: Task): Promise<void> {
// Process the task based on type
}
}Saga Pattern
Orchestrate multi-step distributed transactions.
Implementation
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
SagaEvent,
CreateOrderDto,
OrderRepository,
PaymentService,
InventoryService,
} from './types';
@Injectable()
export class OrderSaga {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly orderRepo: OrderRepository,
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
) {}
async createOrder(dto: CreateOrderDto): Promise<void> {
const order = await this.orderRepo.create(dto);
await this.producer.publish('orders', {
type: 'ORDER_CREATED',
orderId: order.id,
sagaId: `saga-${Date.now()}`,
step: 'PAYMENT',
data: order,
});
}
// Step 2: Process Payment
@StreamConsumer({ stream: 'orders', group: 'payment-processor' })
async processPayment(message: IStreamMessage<SagaEvent>): Promise<void> {
if (message.data.step !== 'PAYMENT') return await message.ack();
try {
await this.paymentService.charge(message.data.data);
await this.producer.publish('orders', {
type: 'PAYMENT_COMPLETED',
sagaId: message.data.sagaId,
step: 'INVENTORY',
data: message.data.data,
});
await message.ack();
} catch (error) {
await this.producer.publish('orders', {
type: 'PAYMENT_FAILED',
sagaId: message.data.sagaId,
step: 'ROLLBACK',
data: message.data.data,
});
await message.ack();
}
}
// Step 3: Reserve Inventory
@StreamConsumer({ stream: 'orders', group: 'inventory-processor' })
async reserveInventory(message: IStreamMessage<SagaEvent>): Promise<void> {
if (message.data.step !== 'INVENTORY') return await message.ack();
try {
await this.inventoryService.reserve(message.data.data);
await this.producer.publish('orders', {
type: 'INVENTORY_RESERVED',
sagaId: message.data.sagaId,
step: 'NOTIFY',
data: message.data.data,
});
await message.ack();
} catch (error) {
await this.producer.publish('orders', {
type: 'INVENTORY_FAILED',
sagaId: message.data.sagaId,
step: 'ROLLBACK',
data: message.data.data,
});
await message.ack();
}
}
// Rollback Handler
@StreamConsumer({ stream: 'orders', group: 'rollback-processor' })
async handleRollback(message: IStreamMessage<SagaEvent>): Promise<void> {
if (message.data.step !== 'ROLLBACK') return await message.ack();
// Rollback based on how far we got
if (message.data.type === 'PAYMENT_FAILED') {
await this.orderRepo.cancel(message.data.data.orderId);
} else if (message.data.type === 'INVENTORY_FAILED') {
await this.paymentService.refund(message.data.data);
await this.orderRepo.cancel(message.data.data.orderId);
}
await message.ack();
}
}Event Sourcing
Store all state changes as events.
typescript
// Aggregate Root
export class Order {
private events: OrderEvent[] = [];
create(dto: CreateOrderDto) {
this.apply(new OrderCreatedEvent(dto));
}
confirm() {
this.apply(new OrderConfirmedEvent(this.id));
}
cancel() {
this.apply(new OrderCancelledEvent(this.id));
}
private apply(event: OrderEvent) {
this.events.push(event);
// Apply state change
this.applyEvent(event);
}
getUncommittedEvents(): OrderEvent[] {
return this.events;
}
}
// Event Store
@Injectable()
export class OrderEventStore {
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
for (const event of events) {
await this.producer.publish('orders', event);
}
}
async load(orderId: string): Promise<Order> {
const order = new Order();
// Read all events for this order
const events = await this.getEventsForOrder(orderId);
for (const event of events) {
order.applyEvent(event);
}
return order;
}
private async getEventsForOrder(orderId: string): Promise<OrderEvent[]> {
// Read from stream start
const info = await this.producer.getStreamInfo('orders');
// Fetch all messages and filter by orderId
// In production, use secondary index or separate streams per aggregate
return events;
}
}
// Projection
@StreamConsumer({ stream: 'orders', group: 'order-projection' })
async projectOrder(message: IStreamMessage<OrderEvent>) {
const event = message.data;
switch (event.type) {
case 'ORDER_CREATED':
await this.orderView.create(event.data);
break;
case 'ORDER_CONFIRMED':
await this.orderView.update(event.data.orderId, { status: 'confirmed' });
break;
case 'ORDER_CANCELLED':
await this.orderView.update(event.data.orderId, { status: 'cancelled' });
break;
}
await message.ack();
}CQRS (Command Query Responsibility Segregation)
Separate read and write models.
typescript
// Command Side (Write)
@Injectable()
export class OrderCommandHandler {
async createOrder(command: CreateOrderCommand): Promise<void> {
// Validate command
await this.validator.validate(command);
// Create order
const order = await this.orderRepo.create(command);
// Publish event
await this.producer.publish('orders', {
type: 'ORDER_CREATED',
orderId: order.id,
data: order,
});
}
}
// Query Side (Read)
@Injectable()
export class OrderQueryHandler {
async getOrder(orderId: string): Promise<OrderView> {
// Read from optimized read model
return await this.orderViewRepo.findOne(orderId);
}
async listOrders(filter: OrderFilter): Promise<OrderView[]> {
// Query from denormalized view
return await this.orderViewRepo.find(filter);
}
}
// Read Model Projector
@StreamConsumer({ stream: 'orders', group: 'read-model' })
async updateReadModel(message: IStreamMessage<OrderEvent>) {
const event = message.data;
// Update denormalized read model
await this.orderViewRepo.upsert({
id: event.orderId,
...event.data,
lastUpdated: new Date(),
});
await message.ack();
}Outbox Pattern
Ensure reliable event publishing with database transactions.
typescript
import { Injectable, Inject } from '@nestjs/common';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
CreateOrderDto,
Order,
OrderRepository,
OutboxRepo,
DatabaseTransaction,
} from './types';
@Injectable()
export class OutboxService {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly orderRepo: OrderRepository,
private readonly outboxRepo: OutboxRepo,
private readonly db: DatabaseTransaction,
) {}
async createOrderWithEvents(dto: CreateOrderDto): Promise<Order> {
return await this.db.transaction(async (tx) => {
// 1. Create order in database
const order = await this.orderRepo.create(dto, { transaction: tx });
// 2. Write event to outbox table
await this.outboxRepo.create({
aggregateId: order.id,
eventType: 'ORDER_CREATED',
payload: order,
createdAt: new Date(),
}, { transaction: tx });
return order;
});
}
// Background job to publish outbox events
// @Cron('*/5 * * * * *') // Every 5 seconds
async publishOutboxEvents(): Promise<void> {
const events = await this.outboxRepo.findUnpublished();
for (const event of events) {
try {
// Publish to stream
await this.producer.publish('orders', {
type: event.eventType,
data: event.payload,
});
// Mark as published
await this.outboxRepo.markPublished(event.id);
} catch (error) {
console.error('Failed to publish outbox event', error);
}
}
}
}Work Queue Pattern
Distribute work items to workers.
typescript
import { Injectable, Inject, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Job, EmailService, ReportService, ImageService } from './types';
// Producer: Add work to queue
@Injectable()
export class JobQueueService {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
) {}
async addJob(job: Job): Promise<void> {
await this.producer.publish('jobs', {
id: job.id,
type: job.type,
data: job.data,
priority: job.priority,
createdAt: new Date(),
});
}
}
// Consumer: Process work items
@Injectable()
export class JobWorker {
private readonly logger = new Logger(JobWorker.name);
constructor(
private readonly emailService: EmailService,
private readonly reportService: ReportService,
private readonly imageService: ImageService,
) {}
@StreamConsumer({
stream: 'jobs',
group: 'workers',
concurrency: 5,
})
async processJob(message: IStreamMessage<Job>): Promise<void> {
const job = message.data;
this.logger.log(`Processing job ${job.id} of type ${job.type}`);
try {
await this.executeJob(job);
await message.ack();
} catch (error) {
this.logger.error(`Job ${job.id} failed:`, error);
await message.reject(error);
}
}
private async executeJob(job: Job): Promise<void> {
switch (job.type) {
case 'send-email':
await this.emailService.send(job.data);
break;
case 'generate-report':
await this.reportService.generate(job.data);
break;
case 'process-image':
await this.imageService.process(job.data);
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
}
}Next Steps
- Monitoring — Track streaming metrics
- Recipes — Real-world examples