Skip to content

Recipes

Production-ready streaming patterns.

1. Order Processing Pipeline

Multi-step order fulfillment:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import {
  OrderEvent,
  CreateOrderDto,
  Order,
  OrderRepository,
  PaymentService,
  InventoryService,
  ShippingService,
} from '../types';

@Injectable()
export class OrderPipeline {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly orderRepo: OrderRepository,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
    private readonly shippingService: ShippingService,
  ) {}

  // Step 1: Create Order
  async createOrder(dto: CreateOrderDto): Promise<Order> {
    const order = await this.orderRepo.create(dto);

    await this.producer.publish('orders', {
      type: 'ORDER_CREATED',
      orderId: order.id,
      customerId: order.customerId,
      items: order.items,
      total: order.total,
    });

    return order;
  }

  // Step 2: Process Payment
  @StreamConsumer({ stream: 'orders', group: 'payment-processor' })
  async processPayment(message: IStreamMessage<OrderEvent>): Promise<void> {
    if (message.data.type !== 'ORDER_CREATED') {
      await message.ack();
      return;
    }

    try {
      const payment = await this.paymentService.charge({
        orderId: message.data.orderId,
        amount: message.data.total,
      });

      await this.producer.publish('orders', {
        type: 'PAYMENT_COMPLETED',
        orderId: message.data.orderId,
        paymentId: payment.id,
      });

      await message.ack();
    } catch (error) {
      await this.producer.publish('orders', {
        type: 'PAYMENT_FAILED',
        orderId: message.data.orderId,
        error: (error as Error).message,
      });

      await message.reject(error as Error);
    }
  }

  // Step 3: Reserve Inventory
  @StreamConsumer({ stream: 'orders', group: 'inventory-manager' })
  async reserveInventory(message: IStreamMessage<OrderEvent>): Promise<void> {
    if (message.data.type !== 'PAYMENT_COMPLETED') {
      await message.ack();
      return;
    }

    try {
      await this.inventoryService.reserve(message.data.orderId);

      await this.producer.publish('orders', {
        type: 'INVENTORY_RESERVED',
        orderId: message.data.orderId,
      });

      await message.ack();
    } catch (error) {
      // Refund payment and fail order
      await this.paymentService.refund(message.data.paymentId);

      await this.producer.publish('orders', {
        type: 'ORDER_FAILED',
        orderId: message.data.orderId,
        error: 'Inventory unavailable',
      });

      await message.reject(error as Error);
    }
  }

  // Step 4: Ship Order
  @StreamConsumer({ stream: 'orders', group: 'shipping' })
  async shipOrder(message: IStreamMessage<OrderEvent>): Promise<void> {
    if (message.data.type !== 'INVENTORY_RESERVED') {
      await message.ack();
      return;
    }

    await this.shippingService.createShipment(message.data.orderId);

    await this.producer.publish('orders', {
      type: 'ORDER_SHIPPED',
      orderId: message.data.orderId,
    });

    await message.ack();
  }
}

2. Notification System

Fan-out notifications to multiple channels:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Notification, NotificationEvent, EmailService, SmsService, PushService } from '../types';

@Injectable()
export class NotificationSystem {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly emailService: EmailService,
    private readonly smsService: SmsService,
    private readonly pushService: PushService,
  ) {}

  // Publish notification event
  async sendNotification(notification: Notification): Promise<void> {
    await this.producer.publish('notifications', {
      id: notification.id,
      userId: notification.userId,
      type: notification.type,
      title: notification.title,
      message: notification.message,
      channels: notification.channels,  // ['email', 'sms', 'push']
    });
  }

  // Email channel
  @StreamConsumer({
    stream: 'notifications',
    group: 'email-sender',
    concurrency: 10,
  })
  async sendEmail(message: IStreamMessage<NotificationEvent>): Promise<void> {
    if (!message.data.channels.includes('email')) {
      await message.ack();
      return;
    }

    try {
      await this.emailService.send({
        to: await this.getUserEmail(message.data.userId),
        subject: message.data.title,
        body: message.data.message,
      });

      await message.ack();
    } catch (error) {
      await message.reject(error as Error);
    }
  }

  // SMS channel
  @StreamConsumer({
    stream: 'notifications',
    group: 'sms-sender',
    concurrency: 5,
  })
  async sendSMS(message: IStreamMessage<NotificationEvent>): Promise<void> {
    if (!message.data.channels.includes('sms')) {
      await message.ack();
      return;
    }

    try {
      await this.smsService.send({
        to: await this.getUserPhone(message.data.userId),
        message: message.data.message,
      });

      await message.ack();
    } catch (error) {
      await message.reject(error as Error);
    }
  }

  // Push notification channel
  @StreamConsumer({
    stream: 'notifications',
    group: 'push-sender',
    concurrency: 20,
  })
  async sendPush(message: IStreamMessage<NotificationEvent>): Promise<void> {
    if (!message.data.channels.includes('push')) {
      await message.ack();
      return;
    }

    try {
      const tokens = await this.getUserPushTokens(message.data.userId);

      await this.pushService.send({
        tokens,
        title: message.data.title,
        body: message.data.message,
      });

      await message.ack();
    } catch (error) {
      await message.reject(error as Error);
    }
  }

  private async getUserEmail(userId: string): Promise<string> {
    return `${userId}@example.com`;
  }

  private async getUserPhone(userId: string): Promise<string> {
    return '+1234567890';
  }

  private async getUserPushTokens(userId: string): Promise<string[]> {
    return ['token-1', 'token-2'];
  }
}

3. Analytics Event Processing

Real-time analytics with multiple projections:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { AnalyticsEvent, WarehouseService, TimeseriesDb } from '../types';

@Injectable()
export class AnalyticsEngine {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly warehouseService: WarehouseService,
    private readonly timeseriesDb: TimeseriesDb,
  ) {}

  // Track event
  async track(event: AnalyticsEvent): Promise<void> {
    await this.producer.publish('analytics', {
      eventType: event.type,
      userId: event.userId,
      properties: event.properties,
      timestamp: new Date(),
    });
  }

  // Real-time metrics
  @StreamConsumer({
    stream: 'analytics',
    group: 'metrics',
    concurrency: 10,
  })
  async updateMetrics(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
    const event = message.data;

    // Update counters in-memory or via external store
    console.log(`Tracking metric: ${event.eventType} for user ${event.userId}`);

    await message.ack();
  }

  // User behavior tracking
  @StreamConsumer({
    stream: 'analytics',
    group: 'behavior',
    concurrency: 5,
  })
  async trackBehavior(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
    const event = message.data;

    // Store in time-series database
    await this.timeseriesDb.insert({
      measurement: 'user_events',
      tags: {
        userId: event.userId,
        eventType: event.eventType,
      },
      fields: event.properties,
      timestamp: message.timestamp,
    });

    await message.ack();
  }

  // Data warehouse sync
  @StreamConsumer({
    stream: 'analytics',
    group: 'warehouse',
    batchSize: 100,  // Batch for efficiency
  })
  async syncWarehouse(message: IStreamMessage<AnalyticsEvent>): Promise<void> {
    // Batch insert to data warehouse
    await this.warehouseService.insert('events', message.data);
    await message.ack();
  }
}

4. Audit Logging

Immutable audit trail:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { AuditEvent, AuditRepo, AlertService } from '../types';

@Injectable()
export class AuditLogger {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly auditRepo: AuditRepo,
    private readonly alertService: AlertService,
  ) {}

  async log(event: AuditEvent): Promise<void> {
    await this.producer.publish('audit', {
      action: event.action,
      userId: event.userId,
      resource: event.resource,
      resourceId: event.resourceId,
      changes: event.changes,
      timestamp: new Date(),
      ipAddress: event.ipAddress,
      userAgent: event.userAgent,
    });
  }

  // Store in database
  @StreamConsumer({
    stream: 'audit',
    group: 'storage',
  })
  async storeAuditLog(message: IStreamMessage<AuditEvent>): Promise<void> {
    await this.auditRepo.create({
      messageId: message.id,
      ...message.data,
    });

    await message.ack();
  }

  // Alert on sensitive actions
  @StreamConsumer({
    stream: 'audit',
    group: 'alerts',
  })
  async checkAlerts(message: IStreamMessage<AuditEvent>): Promise<void> {
    const event = message.data;

    const sensitiveActions = [
      'USER_DELETED',
      'PERMISSIONS_CHANGED',
      'DATA_EXPORTED',
    ];

    if (sensitiveActions.includes(event.action)) {
      await this.alertService.send({
        title: 'Sensitive Action Performed',
        message: `${event.userId} performed ${event.action}`,
        severity: 'high',
      });
    }

    await message.ack();
  }

  // Query audit log
  async getAuditTrail(resourceId: string): Promise<AuditEvent[]> {
    // Read from stream (for recent events) or database (for historical)
    const events = await this.auditRepo.find({
      where: { resourceId },
      order: { timestamp: 'DESC' },
    });

    return events;
  }
}

5. Webhook Delivery

Reliable webhook delivery with retries:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Webhook, WebhookEvent, WebhookRepo, AlertService } from '../types';

@Injectable()
export class WebhookDelivery {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly webhookRepo: WebhookRepo,
    private readonly alertService: AlertService,
  ) {}

  async send(webhook: Webhook): Promise<void> {
    await this.producer.publish('webhooks', {
      id: webhook.id,
      url: webhook.url,
      event: webhook.event,
      payload: webhook.payload,
      signature: this.generateSignature(webhook.payload),
    });
  }

  @StreamConsumer({
    stream: 'webhooks',
    group: 'delivery',
    concurrency: 10,
    maxRetries: 5,  // Retry up to 5 times
  })
  async deliver(message: IStreamMessage<WebhookEvent>): Promise<void> {
    const webhook = message.data;

    try {
      const response = await fetch(webhook.url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'X-Webhook-Signature': webhook.signature,
          'X-Webhook-ID': webhook.id,
          'X-Webhook-Attempt': message.attempt.toString(),
        },
        body: JSON.stringify(webhook.payload),
        signal: AbortSignal.timeout(30000),  // 30s timeout
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      // Log successful delivery
      await this.webhookRepo.updateStatus(webhook.id, 'delivered');
      await message.ack();
    } catch (error) {
      // Log failed attempt
      await this.webhookRepo.logAttempt(webhook.id, {
        attempt: message.attempt,
        error: (error as Error).message,
        timestamp: new Date(),
      });

      if (message.attempt >= 5) {
        // Final failure - move to DLQ
        await this.webhookRepo.updateStatus(webhook.id, 'failed');
        await this.alertService.send({
          title: 'Webhook Delivery Failed',
          message: `Failed to deliver webhook ${webhook.id} after 5 attempts`,
        });
      }

      await message.reject(error as Error);
    }
  }

  private generateSignature(payload: unknown): string {
    return 'sha256-signature'; // Stub: use crypto.createHmac in production
  }
}

6. Image Processing Queue

Process uploaded images asynchronously:

typescript
import { Injectable, Inject, Logger } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { ImageUpload, ImageJob, ImageRepo } from '../types';

@Injectable()
export class ImageProcessor {
  private readonly logger = new Logger(ImageProcessor.name);

  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly imageRepo: ImageRepo,
  ) {}

  async queueImage(upload: ImageUpload): Promise<void> {
    await this.producer.publish('images', {
      id: upload.id,
      userId: upload.userId,
      url: upload.url,
      operations: [
        { type: 'resize', width: 800, height: 600 },
        { type: 'thumbnail', width: 200, height: 200 },
        { type: 'watermark', text: 'Copyright 2025' },
      ],
    });
  }

  @StreamConsumer({
    stream: 'images',
    group: 'processors',
    concurrency: 3,  // CPU-intensive, limit concurrency
  })
  async process(message: IStreamMessage<ImageJob>): Promise<void> {
    const job = message.data;

    try {
      // Download image
      const image = await this.downloadImage(job.url);

      // Process each operation
      for (const operation of job.operations) {
        switch (operation.type) {
          case 'resize':
            await this.resizeImage(image, operation.width, operation.height);
            break;
          case 'thumbnail':
            await this.createThumbnail(image, operation.width, operation.height);
            break;
          case 'watermark':
            await this.addWatermark(image, operation.text);
            break;
        }
      }

      // Upload processed images
      const urls = await this.uploadImages(job.id, image);

      // Update database
      await this.imageRepo.update(job.id, {
        status: 'processed',
        processedUrls: urls,
      });

      await message.ack();
    } catch (error) {
      this.logger.error(`Image processing failed: ${job.id}`, error);
      await message.reject(error as Error);
    }
  }

  private async downloadImage(url: string): Promise<Buffer> {
    return Buffer.from(''); // Stub
  }

  private async resizeImage(image: Buffer, width?: number, height?: number): Promise<void> {
    // Resize logic
  }

  private async createThumbnail(image: Buffer, width?: number, height?: number): Promise<void> {
    // Thumbnail logic
  }

  private async addWatermark(image: Buffer, text?: string): Promise<void> {
    // Watermark logic
  }

  private async uploadImages(id: string, image: Buffer): Promise<string[]> {
    return []; // Stub
  }
}

7. Email Campaign

Send emails in batches with rate limiting:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { Campaign, CampaignEmail, EmailService, CampaignRepo } from '../types';

@Injectable()
export class EmailCampaign {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly emailService: EmailService,
    private readonly campaignRepo: CampaignRepo,
  ) {}

  async sendCampaign(campaign: Campaign): Promise<void> {
    const recipients = await this.getRecipients(campaign.id);

    // Publish each recipient as separate message
    for (const recipient of recipients) {
      await this.producer.publish('campaigns', {
        campaignId: campaign.id,
        recipientId: recipient.id,
        email: recipient.email,
        template: campaign.template,
        variables: this.getVariables(recipient),
      });
    }
  }

  @StreamConsumer({
    stream: 'campaigns',
    group: 'senders',
    concurrency: 50,  // High concurrency for I/O
    batchSize: 100,
  })
  async sendEmail(message: IStreamMessage<CampaignEmail>): Promise<void> {
    const email = message.data;

    try {
      // Send email
      await this.emailService.send({
        to: email.email,
        template: email.template,
        variables: email.variables,
      });

      // Track delivery
      await this.campaignRepo.incrementSent(email.campaignId);

      await message.ack();
    } catch (error) {
      if ((error as Error).message.includes('rate limit')) {
        // Rate limited - retry after delay
        await new Promise(resolve => setTimeout(resolve, 1000));
        await message.reject(error as Error);
      } else {
        // Other error - log and mark as bounced
        await this.campaignRepo.incrementBounced(email.campaignId);
        await message.ack();  // Don't retry
      }
    }
  }

  private async getRecipients(campaignId: string): Promise<Array<{ id: string; email: string }>> {
    return []; // Stub: fetch from database
  }

  private getVariables(recipient: any): Record<string, string> {
    return {}; // Stub: build template variables
  }
}

8. Data Export

Long-running export jobs:

typescript
import { Injectable, Inject } from '@nestjs/common';
import { StreamConsumer, IStreamMessage, STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { ExportRequest, ExportJob, ExportRepo, S3Service } from '../types';

@Injectable()
export class DataExporter {
  constructor(
    @Inject(STREAM_PRODUCER) private readonly producer: IStreamProducer,
    private readonly exportRepo: ExportRepo,
    private readonly s3: S3Service,
  ) {}

  async exportData(request: ExportRequest): Promise<string> {
    const jobId = `export-${Date.now()}`;

    await this.producer.publish('exports', {
      jobId,
      userId: request.userId,
      type: request.type,
      filters: request.filters,
      format: request.format,
    });

    return jobId;
  }

  @StreamConsumer({
    stream: 'exports',
    group: 'workers',
    concurrency: 2,  // Limit concurrent exports
  })
  async processExport(message: IStreamMessage<ExportJob>): Promise<void> {
    const job = message.data;

    try {
      // Update status
      await this.exportRepo.updateStatus(job.jobId, 'processing');

      // Fetch data in chunks
      const data = await this.fetchData(job.type, job.filters);

      // Generate file
      const file = await this.generateFile(data, job.format);

      // Upload to S3
      const url = await this.s3.upload(file);

      // Update status and send notification
      await this.exportRepo.updateStatus(job.jobId, 'completed');
      await this.notifyUser(job.userId, url);

      await message.ack();
    } catch (error) {
      await this.exportRepo.updateStatus(job.jobId, 'failed');
      await this.notifyUserError(job.userId, (error as Error).message);
      await message.reject(error as Error);
    }
  }

  private async fetchData(type: string, filters: Record<string, unknown>): Promise<unknown[]> {
    return []; // Stub: fetch from database
  }

  private async generateFile(data: unknown[], format: string): Promise<Buffer> {
    return Buffer.from(''); // Stub: generate CSV/Excel/JSON
  }

  private async notifyUser(userId: string, url: string): Promise<void> {
    // Stub: send notification with download link
  }

  private async notifyUserError(userId: string, error: string): Promise<void> {
    // Stub: send error notification
  }
}

Next Steps

Released under the MIT License.