Testing
Test stream-based applications.
Mock Producer
typescript
import { Test } from '@nestjs/testing';
import { STREAM_PRODUCER, IStreamProducer } from '@nestjs-redisx/streams';
import { vi, type MockedObject } from 'vitest';
describe('OrderService', () => {
let service: OrderService;
let mockProducer: MockedObject<IStreamProducer>;
beforeEach(async () => {
mockProducer = {
publish: vi.fn().mockResolvedValue('1706123456789-0'),
publishBatch: vi.fn().mockResolvedValue(['1706123456789-0']),
getStreamInfo: vi.fn().mockResolvedValue({
length: 100,
groups: 1,
firstEntry: { id: '1706123456789-0', timestamp: new Date() },
lastEntry: { id: '1706123456799-0', timestamp: new Date() },
}),
trim: vi.fn().mockResolvedValue(0),
};
const module = await Test.createTestingModule({
providers: [
OrderService,
{
provide: STREAM_PRODUCER,
useValue: mockProducer,
},
],
}).compile();
service = module.get(OrderService);
});
it('should publish order event when order created', async () => {
// Given
const dto = { items: [], total: 100 };
// When
await service.createOrder(dto);
// Then
expect(mockProducer.publish).toHaveBeenCalledWith(
'orders',
expect.objectContaining({
type: 'ORDER_CREATED',
orderId: expect.any(String),
total: 100,
})
);
});
it('should not fail if publish fails', async () => {
// Given
mockProducer.publish.mockRejectedValue(new Error('Redis down'));
// When/Then - should not throw
await expect(service.createOrder({ items: [], total: 100 }))
.resolves.not.toThrow();
});
});Mock Consumer
typescript
describe('OrderProcessor', () => {
let processor: OrderProcessor;
let mockOrderService: MockedObject<OrderService>;
beforeEach(() => {
mockOrderService = {
process: vi.fn().mockResolvedValue(undefined),
};
processor = new OrderProcessor(mockOrderService);
});
it('should process message and ack', async () => {
// Given
const message = createMockMessage({
id: '1706123456789-0',
data: { orderId: 'order-123' },
});
// When
await processor.handleOrder(message);
// Then
expect(mockOrderService.process).toHaveBeenCalledWith(message.data);
expect(message.ack).toHaveBeenCalled();
});
it('should reject on error', async () => {
// Given
const error = new Error('Processing failed');
mockOrderService.process.mockRejectedValue(error);
const message = createMockMessage({
id: '1706123456789-0',
data: { orderId: 'order-123' },
});
// When
await processor.handleOrder(message);
// Then
expect(message.reject).toHaveBeenCalledWith(error);
});
});
// Helper function
function createMockMessage<T>(options: {
id: string;
data: T;
attempt?: number;
}): IStreamMessage<T> {
return {
id: options.id,
stream: 'test-stream',
data: options.data,
attempt: options.attempt || 1,
timestamp: new Date(),
ack: vi.fn().mockResolvedValue(undefined),
reject: vi.fn().mockResolvedValue(undefined),
};
}Integration Tests
typescript
import { Test } from '@nestjs/testing';
import { RedisModule } from '@nestjs-redisx/core';
import { StreamsPlugin, STREAM_PRODUCER, STREAM_CONSUMER } from '@nestjs-redisx/streams';
import Redis from 'ioredis';
describe('Streams (integration)', () => {
let app: INestApplication;
let redis: Redis;
let producer: IStreamProducer;
beforeAll(async () => {
const module = await Test.createTestingModule({
imports: [
RedisModule.forRoot({
clients: {
host: 'localhost',
port: 6379,
},
plugins: [
new StreamsPlugin({
consumer: {
batchSize: 10,
concurrency: 1,
},
}),
],
}),
OrderModule,
],
}).compile();
app = module.createNestApplication();
await app.init();
producer = app.get(STREAM_PRODUCER);
redis = new Redis({ host: 'localhost', port: 6379 });
});
afterAll(async () => {
await redis.flushall();
await redis.quit();
await app.close();
});
afterEach(async () => {
// Clean up test streams
await redis.del('test-stream');
await redis.del('test-stream:dlq');
});
describe('publish', () => {
it('should publish message to stream', async () => {
// When
const messageId = await producer.publish('test-stream', {
orderId: 'order-123',
total: 100,
});
// Then
expect(messageId).toMatch(/^\d+-\d+$/);
const info = await producer.getStreamInfo('test-stream');
expect(info.length).toBe(1);
});
it('should publish batch of messages', async () => {
// Given
const messages = [
{ orderId: 'order-1', total: 100 },
{ orderId: 'order-2', total: 200 },
{ orderId: 'order-3', total: 300 },
];
// When
const messageIds = await producer.publishBatch('test-stream', messages);
// Then
expect(messageIds).toHaveLength(3);
const info = await producer.getStreamInfo('test-stream');
expect(info.length).toBe(3);
});
});
describe('consume', () => {
it('should consume and acknowledge message', async () => {
// Given
const data = { orderId: 'order-123', total: 100 };
await producer.publish('test-stream', data);
const processedMessages = [];
// When
const consumer = app.get(STREAM_CONSUMER);
await consumer.createGroup('test-stream', 'test-group', '0');
const handle = consumer.consume(
'test-stream',
'test-group',
'test-consumer',
async (message) => {
processedMessages.push(message.data);
await message.ack();
},
{ batchSize: 10 }
);
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 1000));
await consumer.stop(handle);
// Then
expect(processedMessages).toHaveLength(1);
expect(processedMessages[0]).toEqual(data);
// Check pending count
const pending = await consumer.getPending('test-stream', 'test-group');
expect(pending.count).toBe(0);
});
it('should retry failed message', async () => {
// Given
await producer.publish('test-stream', { orderId: 'order-123' });
let attempts = 0;
// When
const consumer = app.get(STREAM_CONSUMER);
await consumer.createGroup('test-stream', 'test-group', '0');
const handle = consumer.consume(
'test-stream',
'test-group',
'test-consumer',
async (message) => {
attempts++;
if (attempts < 3) {
// Fail first 2 attempts
await message.reject(new Error('Temporary failure'));
} else {
// Succeed on 3rd attempt
await message.ack();
}
},
{ batchSize: 10, maxRetries: 5 }
);
// Wait for retries
await new Promise(resolve => setTimeout(resolve, 5000));
await consumer.stop(handle);
// Then
expect(attempts).toBeGreaterThanOrEqual(3);
});
});
});Test Patterns
Test Consumer with Spy
typescript
describe('OrderProcessor', () => {
it('should call service method', async () => {
// Given
const spy = vi.spyOn(processor['orderService'], 'process');
const message = createMockMessage({ id: 'msg-1', data: { orderId: 'order-123' } });
// When
await processor.handleOrder(message);
// Then
expect(spy).toHaveBeenCalledWith(message.data);
});
});Test Idempotency
typescript
describe('IdempotentConsumer', () => {
it('should process message only once', async () => {
// Given
const processSpy = vi.spyOn(processor, 'processOrder');
const message = createMockMessage({ id: 'msg-1', data: { orderId: 'order-123' } });
// When - process same message twice
await processor.handle(message);
await processor.handle(message);
// Then - processed only once
expect(processSpy).toHaveBeenCalledTimes(1);
});
});Test Retry Logic
typescript
describe('RetryLogic', () => {
it('should retry on transient error', async () => {
// Given
let attempts = 0;
const service = {
process: vi.fn().mockImplementation(() => {
attempts++;
if (attempts < 3) {
throw new Error('Transient error');
}
}),
};
const processor = new OrderProcessor(service);
const message = createMockMessage({ id: 'msg-1', data: { orderId: 'order-123' } });
// When
await processor.handle(message); // Attempt 1 - fail
await processor.handle(message); // Attempt 2 - fail
await processor.handle(message); // Attempt 3 - success
// Then
expect(attempts).toBe(3);
expect(message.ack).toHaveBeenCalledTimes(1);
expect(message.reject).toHaveBeenCalledTimes(2);
});
});Test DLQ
typescript
describe('DeadLetterQueue', () => {
it('should move to DLQ after max retries', async () => {
// Given
const maxRetries = 3;
const mockDlqService = {
add: vi.fn().mockResolvedValue('dlq-msg-id'),
getMessages: vi.fn().mockResolvedValue([]),
requeue: vi.fn(),
purge: vi.fn(),
};
const message = createMockMessage({
id: '1706123456789-0',
data: { orderId: 'order-123' },
attempt: maxRetries + 1, // Exceeded max retries
});
// When
await processor.handle(message);
// Then - message rejected, DLQ service handles storage
expect(message.reject).toHaveBeenCalled();
});
});Test Utilities
typescript
// test/helpers/stream.helper.ts
export class StreamTestHelper {
static async createTestStream(
redis: Redis,
stream: string,
count: number
): Promise<void> {
for (let i = 0; i < count; i++) {
await redis.xadd(stream, '*', 'data', JSON.stringify({ id: i }));
}
}
static async clearStream(redis: Redis, stream: string): Promise<void> {
await redis.del(stream);
await redis.del(`${stream}:dlq`);
}
static async waitForConsumption(
consumer: IStreamConsumer,
stream: string,
group: string,
targetPending: number = 0,
timeoutMs: number = 5000
): Promise<void> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
const pending = await consumer.getPending(stream, group);
if (pending.count === targetPending) {
return;
}
await new Promise(resolve => setTimeout(resolve, 100));
}
throw new Error('Timeout waiting for consumption');
}
}Testing Best Practices
1. Clean up after tests:
typescript
afterEach(async () => {
await redis.del('test-stream');
await redis.del('test-stream:dlq');
});2. Use descriptive test data:
typescript
// ✅ Good
const order = { orderId: 'test-order-123', total: 100 };
// ❌ Bad
const order = { id: 'abc', value: 1 };3. Test error scenarios:
typescript
it('should handle Redis connection failure', async () => {
mockProducer.publish.mockRejectedValue(new Error('Connection refused'));
// Test graceful degradation
});4. Test concurrent processing:
typescript
it('should handle concurrent messages', async () => {
const messages = Array(10).fill(null).map((_, i) =>
createMockMessage({ id: `msg-${i}`, data: { orderId: `order-${i}` } })
);
await Promise.all(messages.map(msg => processor.handle(msg)));
expect(processedCount).toBe(10);
});5. Mock external dependencies:
typescript
// Mock external services
const mockEmailService = {
send: vi.fn().mockResolvedValue(undefined),
};
const mockPaymentGateway = {
charge: vi.fn().mockResolvedValue({ id: 'charge-123' }),
};Next Steps
- Recipes — Real-world examples
- Troubleshooting — Debug test failures