Recipes
Production-ready streaming patterns.
1. Order Processing Pipeline
Multi-step order fulfillment:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
OrderEvent,
CreateOrderDto,
Order,
OrderRepository,
PaymentService,
InventoryService,
ShippingService,
} from '../types';
@Injectable()
export class OrderPipeline {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly orderRepo: OrderRepository,
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
private readonly shippingService: ShippingService,
) {}
// Step 1: Create Order
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepo.create(dto);
await this.producer.publish('orders', {
type: 'ORDER_CREATED',
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total,
});
return order;
}
// Step 2: Process Payment
@StreamConsumer({ stream: 'orders', group: 'payment-processor' })
async processPayment(message: IStreamMessage<OrderEvent>): Promise<void> {
if (message.data.type !== 'ORDER_CREATED') {
await message.ack();
return;
}
try {
const payment = await this.paymentService.charge({
orderId: message.data.orderId,
amount: message.data.total,
});
await this.producer.publish('orders', {
type: 'PAYMENT_COMPLETED',
orderId: message.data.orderId,
paymentId: payment.id,
});
await message.ack();
} catch (error) {
await this.producer.publish('orders', {
type: 'PAYMENT_FAILED',
orderId: message.data.orderId,
error: (error as Error).message,
});
await message.reject(error as Error);
}
}
// Step 3: Reserve Inventory
@StreamConsumer({ stream: 'orders', group: 'inventory-manager' })
async reserveInventory(message: IStreamMessage<OrderEvent>): Promise<void> {
if (message.data.type !== 'PAYMENT_COMPLETED') {
await message.ack();
return;
}
try {
await this.inventoryService.reserve(message.data.orderId);
await this.producer.publish('orders', {
type: 'INVENTORY_RESERVED',
orderId: message.data.orderId,
});
await message.ack();
} catch (error) {
// Refund payment and fail order
await this.paymentService.refund(message.data.paymentId);
await this.producer.publish('orders', {
type: 'ORDER_FAILED',
orderId: message.data.orderId,
error: 'Inventory unavailable',
});
await message.reject(error as Error);
}
}
// Step 4: Ship Order
@StreamConsumer({ stream: 'orders', group: 'shipping' })
async shipOrder(message: IStreamMessage<OrderEvent>): Promise<void> {
if (message.data.type !== 'INVENTORY_RESERVED') {
await message.ack();
return;
}
await this.shippingService.createShipment(message.data.orderId);
await this.producer.publish('orders', {
type: 'ORDER_SHIPPED',
orderId: message.data.orderId,
});
await message.ack();
}
}2. Notification System
Fan-out notifications to multiple channels:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Notification, NotificationEvent, EmailService, SmsService, PushService } from '../types';
@Injectable()
export class NotificationSystem {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly emailService: EmailService,
private readonly smsService: SmsService,
private readonly pushService: PushService,
) {}
// Publish notification event
async sendNotification(notification: Notification): Promise<void> {
await this.producer.publish('notifications', {
id: notification.id,
userId: notification.userId,
type: notification.type,
title: notification.title,
message: notification.message,
channels: notification.channels, // ['email', 'sms', 'push']
});
}
// Email channel
@StreamConsumer({
stream: 'notifications',
group: 'email-sender',
concurrency: 10,
})
async sendEmail(message: IStreamMessage<NotificationEvent>): Promise<void> {
if (!message.data.channels.includes('email')) {
await message.ack();
return;
}
try {
await this.emailService.send({
to: await this.getUserEmail(message.data.userId),
subject: message.data.title,
body: message.data.message,
});
await message.ack();
} catch (error) {
await message.reject(error as Error);
}
}
// SMS channel
@StreamConsumer({
stream: 'notifications',
group: 'sms-sender',
concurrency: 5,
})
async sendSMS(message: IStreamMessage<NotificationEvent>): Promise<void> {
if (!message.data.channels.includes('sms')) {
await message.ack();
return;
}
try {
await this.smsService.send({
to: await this.getUserPhone(message.data.userId),
message: message.data.message,
});
await message.ack();
} catch (error) {
await message.reject(error as Error);
}
}
// Push notification channel
@StreamConsumer({
stream: 'notifications',
group: 'push-sender',
concurrency: 20,
})
async sendPush(message: IStreamMessage<NotificationEvent>): Promise<void> {
if (!message.data.channels.includes('push')) {
await message.ack();
return;
}
try {
const tokens = await this.getUserPushTokens(message.data.userId);
await this.pushService.send({
tokens,
title: message.data.title,
body: message.data.message,
});
await message.ack();
} catch (error) {
await message.reject(error as Error);
}
}
private async getUserEmail(userId: string): Promise<string> {
return `${userId}@example.com`;
}
private async getUserPhone(userId: string): Promise<string> {
return '+1234567890';
}
private async getUserPushTokens(userId: string): Promise<string[]> {
return ['token-1', 'token-2'];
}
}3. Analytics Event Processing
Real-time analytics with multiple projections:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { AnalyticsEvent, WarehouseService, TimeseriesDb } from '../types';
@Injectable()
export class AnalyticsEngine {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly warehouseService: WarehouseService,
private readonly timeseriesDb: TimeseriesDb,
) {}
// Track event
async track(event: AnalyticsEvent): Promise<void> {
await this.producer.publish('analytics', {
eventType: event.type,
userId: event.userId,
properties: event.properties,
timestamp: new Date(),
});
}
// Real-time metrics
@StreamConsumer({
stream: 'analytics',
group: 'metrics',
concurrency: 10,
})
async updateMetrics(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
const event = message.data;
// Update counters in-memory or via external store
console.log(`Tracking metric: ${event.eventType} for user ${event.userId}`);
await message.ack();
}
// User behavior tracking
@StreamConsumer({
stream: 'analytics',
group: 'behavior',
concurrency: 5,
})
async trackBehavior(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
const event = message.data;
// Store in time-series database
await this.timeseriesDb.insert({
measurement: 'user_events',
tags: {
userId: event.userId,
eventType: event.eventType,
},
fields: event.properties,
timestamp: message.timestamp,
});
await message.ack();
}
// Data warehouse sync
@StreamConsumer({
stream: 'analytics',
group: 'warehouse',
batchSize: 100, // Batch for efficiency
})
async syncWarehouse(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
// Batch insert to data warehouse
await this.warehouseService.insert('events', message.data);
await message.ack();
}
}4. Audit Logging
Immutable audit trail:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { AuditEvent, AuditRepo, AlertService } from '../types';
@Injectable()
export class AuditLogger {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly auditRepo: AuditRepo,
private readonly alertService: AlertService,
) {}
async log(event: AuditEvent): Promise<void> {
await this.producer.publish('audit', {
action: event.action,
userId: event.userId,
resource: event.resource,
resourceId: event.resourceId,
changes: event.changes,
timestamp: new Date(),
ipAddress: event.ipAddress,
userAgent: event.userAgent,
});
}
// Store in database
@StreamConsumer({
stream: 'audit',
group: 'storage',
})
async storeAuditLog(message: IStreamMessage<AuditEvent>): Promise<void> {
await this.auditRepo.create({
messageId: message.id,
...message.data,
});
await message.ack();
}
// Alert on sensitive actions
@StreamConsumer({
stream: 'audit',
group: 'alerts',
})
async checkAlerts(message: IStreamMessage<AuditEvent>): Promise<void> {
const event = message.data;
const sensitiveActions = [
'USER_DELETED',
'PERMISSIONS_CHANGED',
'DATA_EXPORTED',
];
if (sensitiveActions.includes(event.action)) {
await this.alertService.send({
title: 'Sensitive Action Performed',
message: `${event.userId} performed ${event.action}`,
severity: 'high',
});
}
await message.ack();
}
// Query audit log
async getAuditTrail(resourceId: string): Promise<AuditEvent[]> {
// Read from stream (for recent events) or database (for historical)
const events = await this.auditRepo.find({
where: { resourceId },
order: { timestamp: 'DESC' },
});
return events;
}
}5. Webhook Delivery
Reliable webhook delivery with retries:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Webhook, WebhookEvent, WebhookRepo, AlertService } from '../types';
@Injectable()
export class WebhookDelivery {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly webhookRepo: WebhookRepo,
private readonly alertService: AlertService,
) {}
async send(webhook: Webhook): Promise<void> {
await this.producer.publish('webhooks', {
id: webhook.id,
url: webhook.url,
event: webhook.event,
payload: webhook.payload,
signature: this.generateSignature(webhook.payload),
});
}
@StreamConsumer({
stream: 'webhooks',
group: 'delivery',
concurrency: 10,
maxRetries: 5, // Retry up to 5 times
})
async deliver(message: IStreamMessage<WebhookEvent>): Promise<void> {
const webhook = message.data;
try {
const response = await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': webhook.signature,
'X-Webhook-ID': webhook.id,
'X-Webhook-Attempt': message.attempt.toString(),
},
body: JSON.stringify(webhook.payload),
signal: AbortSignal.timeout(30000), // 30s timeout
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
// Log successful delivery
await this.webhookRepo.updateStatus(webhook.id, 'delivered');
await message.ack();
} catch (error) {
// Log failed attempt
await this.webhookRepo.logAttempt(webhook.id, {
attempt: message.attempt,
error: (error as Error).message,
timestamp: new Date(),
});
if (message.attempt >= 5) {
// Final failure - move to DLQ
await this.webhookRepo.updateStatus(webhook.id, 'failed');
await this.alertService.send({
title: 'Webhook Delivery Failed',
message: `Failed to deliver webhook ${webhook.id} after 5 attempts`,
});
}
await message.reject(error as Error);
}
}
private generateSignature(payload: unknown): string {
return 'sha256-signature'; // Stub: use crypto.createHmac in production
}
}6. Image Processing Queue
Process uploaded images asynchronously:
typescript
import { Injectable, Inject, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { ImageUpload, ImageJob, ImageRepo } from '../types';
@Injectable()
export class ImageProcessor {
private readonly logger = new Logger(ImageProcessor.name);
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly imageRepo: ImageRepo,
) {}
async queueImage(upload: ImageUpload): Promise<void> {
await this.producer.publish('images', {
id: upload.id,
userId: upload.userId,
url: upload.url,
operations: [
{ type: 'resize', width: 800, height: 600 },
{ type: 'thumbnail', width: 200, height: 200 },
{ type: 'watermark', text: 'Copyright 2025' },
],
});
}
@StreamConsumer({
stream: 'images',
group: 'processors',
concurrency: 3, // CPU-intensive, limit concurrency
})
async process(message: IStreamMessage<ImageJob>): Promise<void> {
const job = message.data;
try {
// Download image
const image = await this.downloadImage(job.url);
// Process each operation
for (const operation of job.operations) {
switch (operation.type) {
case 'resize':
await this.resizeImage(image, operation.width, operation.height);
break;
case 'thumbnail':
await this.createThumbnail(image, operation.width, operation.height);
break;
case 'watermark':
await this.addWatermark(image, operation.text);
break;
}
}
// Upload processed images
const urls = await this.uploadImages(job.id, image);
// Update database
await this.imageRepo.update(job.id, {
status: 'processed',
processedUrls: urls,
});
await message.ack();
} catch (error) {
this.logger.error(`Image processing failed: ${job.id}`, error);
await message.reject(error as Error);
}
}
private async downloadImage(url: string): Promise<Buffer> {
return Buffer.from(''); // Stub
}
private async resizeImage(image: Buffer, width?: number, height?: number): Promise<void> {
// Resize logic
}
private async createThumbnail(image: Buffer, width?: number, height?: number): Promise<void> {
// Thumbnail logic
}
private async addWatermark(image: Buffer, text?: string): Promise<void> {
// Watermark logic
}
private async uploadImages(id: string, image: Buffer): Promise<string[]> {
return []; // Stub
}
}7. Email Campaign
Send emails in batches with rate limiting:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Campaign, CampaignEmail, EmailService, CampaignRepo } from '../types';
@Injectable()
export class EmailCampaign {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly emailService: EmailService,
private readonly campaignRepo: CampaignRepo,
) {}
async sendCampaign(campaign: Campaign): Promise<void> {
const recipients = await this.getRecipients(campaign.id);
// Publish each recipient as separate message
for (const recipient of recipients) {
await this.producer.publish('campaigns', {
campaignId: campaign.id,
recipientId: recipient.id,
email: recipient.email,
template: campaign.template,
variables: this.getVariables(recipient),
});
}
}
@StreamConsumer({
stream: 'campaigns',
group: 'senders',
concurrency: 50, // High concurrency for I/O
batchSize: 100,
})
async sendEmail(message: IStreamMessage<CampaignEmail>): Promise<void> {
const email = message.data;
try {
// Send email
await this.emailService.send({
to: email.email,
template: email.template,
variables: email.variables,
});
// Track delivery
await this.campaignRepo.incrementSent(email.campaignId);
await message.ack();
} catch (error) {
if ((error as Error).message.includes('rate limit')) {
// Rate limited - retry after delay
await new Promise(resolve => setTimeout(resolve, 1000));
await message.reject(error as Error);
} else {
// Other error - log and mark as bounced
await this.campaignRepo.incrementBounced(email.campaignId);
await message.ack(); // Don't retry
}
}
}
private async getRecipients(campaignId: string): Promise<Array<{ id: string; email: string }>> {
return []; // Stub: fetch from database
}
private getVariables(recipient: any): Record<string, string> {
return {}; // Stub: build template variables
}
}8. Data Export
Long-running export jobs:
typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { ExportRequest, ExportJob, ExportRepo, S3Service } from '../types';
@Injectable()
export class DataExporter {
constructor(
@Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
private readonly exportRepo: ExportRepo,
private readonly s3: S3Service,
) {}
async exportData(request: ExportRequest): Promise<string> {
const jobId = `export-${Date.now()}`;
await this.producer.publish('exports', {
jobId,
userId: request.userId,
type: request.type,
filters: request.filters,
format: request.format,
});
return jobId;
}
@StreamConsumer({
stream: 'exports',
group: 'workers',
concurrency: 2, // Limit concurrent exports
})
async processExport(message: IStreamMessage<ExportJob>): Promise<void> {
const job = message.data;
try {
// Update status
await this.exportRepo.updateStatus(job.jobId, 'processing');
// Fetch data in chunks
const data = await this.fetchData(job.type, job.filters);
// Generate file
const file = await this.generateFile(data, job.format);
// Upload to S3
const url = await this.s3.upload(file);
// Update status and send notification
await this.exportRepo.updateStatus(job.jobId, 'completed');
await this.notifyUser(job.userId, url);
await message.ack();
} catch (error) {
await this.exportRepo.updateStatus(job.jobId, 'failed');
await this.notifyUserError(job.userId, (error as Error).message);
await message.reject(error as Error);
}
}
private async fetchData(type: string, filters: Record<string, unknown>): Promise<unknown[]> {
return []; // Stub: fetch from database
}
private async generateFile(data: unknown[], format: string): Promise<Buffer> {
return Buffer.from(''); // Stub: generate CSV/Excel/JSON
}
private async notifyUser(userId: string, url: string): Promise<void> {
// Stub: send notification with download link
}
private async notifyUserError(userId: string, error: string): Promise<void> {
// Stub: send error notification
}
}Next Steps
- Troubleshooting — Debug common issues
- Overview — Back to overview