如何设置 NestJS 微服务 integration/e2e 测试 KafkaJS 消费者(MessagePattern/EventPattern 处理程序)
How to set up NestJS microservices integration/e2e testing KafkaJS consumer (MessagePattern/EventPattern handler)
我正在使用 NestJS/KafkaJS 验证集成测试。
我已经实现了所有功能,除了我正在发送的主题的事件侦听器(消费者)上的函数没有被调用。
我在某处读到在消费者事件 GROUP_JOIN 完成之前您不能使用消息,不确定这是否正确,and/or 我如何让我的 e2e 测试等待这个会发生吗?
这是 e2e 测试的设置 -
describe('InventoryController(e2e)', () => {
let app: INestApplication
let client: ClientKafka
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [InventoryModule, KafkaClientModule],
}).compile()
app = moduleFixture.createNestApplication()
await app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'test_clientid',
brokers: process.env.KAFKA_BROKERS.split(' '), // []
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_CLUSTER_APIKEY,
password: process.env.KAFKA_CLUSTER_SECRET,
},
},
consumer: {
groupId: 'test_consumerids',
},
},
})
await app.startAllMicroservices()
await app.init()
client = moduleFixture.get<ClientKafka>('test_name')
await client.connect()
await app.listen(process.env.port || 3000)
})
afterAll(async () => {
await app.close()
await client.close()
})
it('/ (GET)', async () => {
return request(app.getHttpServer()).get('/inventory/kafka-inventory-test')
})
it('Emits a message to a topic', async () => {
await client.emit('inventory-test', { foo: 'bar' })
})
客户端发送消息正常,
在我的控制器中,我有主题 'inventory-test'
的事件处理程序
@EventPattern('inventory-test')
async consumeInventoryTest(
// eslint-disable-next-line
@Payload() inventoryMessage: any,
@Ctx() context: KafkaContext,
): Promise<void> {
console.log('inventory-test consumer')
}
我还使用 app.getMicroservices() 方法记录了微服务,可以在 messageHandlers 对象下看到它有 'inventory-test' 其中 returns 一个函数
server: ServerKafka {
messageHandlers: Map(1) { 'inventory-test' => [Function] }
当我在本地 运行 应用程序时,消息处理程序也在工作。
我一直在搜索 google 很多关于 kafkajs 和 nest 的文档,那里没有很多信息
感谢任何advice/help!
我终于解决了这个问题,您需要在客户端发出消息后等待新的承诺,以便处理程序有时间阅读它。
我正在使用 NestJS/KafkaJS 验证集成测试。
我已经实现了所有功能,除了我正在发送的主题的事件侦听器(消费者)上的函数没有被调用。
我在某处读到在消费者事件 GROUP_JOIN 完成之前您不能使用消息,不确定这是否正确,and/or 我如何让我的 e2e 测试等待这个会发生吗?
这是 e2e 测试的设置 -
describe('InventoryController(e2e)', () => {
let app: INestApplication
let client: ClientKafka
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [InventoryModule, KafkaClientModule],
}).compile()
app = moduleFixture.createNestApplication()
await app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'test_clientid',
brokers: process.env.KAFKA_BROKERS.split(' '), // []
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_CLUSTER_APIKEY,
password: process.env.KAFKA_CLUSTER_SECRET,
},
},
consumer: {
groupId: 'test_consumerids',
},
},
})
await app.startAllMicroservices()
await app.init()
client = moduleFixture.get<ClientKafka>('test_name')
await client.connect()
await app.listen(process.env.port || 3000)
})
afterAll(async () => {
await app.close()
await client.close()
})
it('/ (GET)', async () => {
return request(app.getHttpServer()).get('/inventory/kafka-inventory-test')
})
it('Emits a message to a topic', async () => {
await client.emit('inventory-test', { foo: 'bar' })
})
客户端发送消息正常,
在我的控制器中,我有主题 'inventory-test'
的事件处理程序 @EventPattern('inventory-test')
async consumeInventoryTest(
// eslint-disable-next-line
@Payload() inventoryMessage: any,
@Ctx() context: KafkaContext,
): Promise<void> {
console.log('inventory-test consumer')
}
我还使用 app.getMicroservices() 方法记录了微服务,可以在 messageHandlers 对象下看到它有 'inventory-test' 其中 returns 一个函数
server: ServerKafka {
messageHandlers: Map(1) { 'inventory-test' => [Function] }
当我在本地 运行 应用程序时,消息处理程序也在工作。
我一直在搜索 google 很多关于 kafkajs 和 nest 的文档,那里没有很多信息
感谢任何advice/help!
我终于解决了这个问题,您需要在客户端发出消息后等待新的承诺,以便处理程序有时间阅读它。