如何在 KafkaJS 的 Jest 测试中等待 Kafka 响应?
How to wait for Kafka respone in a Jest test with KafkaJS?
考虑这个测试,其中一条消息从测试发送到主题 'out',被测试的代码应该使用它并通过发送消息到主题 'in' 来回复。为了通过,我想确保已将消息发送到主题 'in'。
it('...', async () => {
/* initialize kafkaConsumer and kafkaProducer here */
async function someCallback() {
// ...
}
await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
await kafkaConsumer.run({ eachMessage: someCallback })
await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })
// How do I block here until someCallback is called?
})
我阅读了有关使用 done
的信息,但我无法使用它,因为测试本身已定义 async
,我需要它才能使用 await
。有没有其他我不知道的方法?
您可以看看我们如何测试 KafkaJS 本身以获取一些启发。例如,here's a basic consumer test.
我们真的没有做任何花哨的事情,只是从 eachMessage
回调中将消息添加到一个数组,然后等待一个承诺,该承诺会定期检查我们是否达到了预期的消息数量。像这样:
it('consumes messages', async () => {
const messages = [{ value: 'hello world' }]
const consumedMessages = []
consumer.run({
eachMessage: ({ message }) => {
consumedMessages.push(message);
}
})
await producer.send({ topic, messages })
await waitFor(() => consumedMessages.length === messages.length)
})
其中 waitFor
本质上是一个函数,它 returns 一个承诺并启动一个 setTimeout 来检查谓词并在谓词为真时解析承诺(或者如果遇到超时则拒绝)。
需要牢记的一些陷阱:
- 在每个 运行 上使用一个新的
groupId
,这样多个 运行 就不会相互干扰。
- 出于同样的原因,在每次测试中使用一个新主题 运行。
- 如果您在消费者加入群组并订阅主题之前生成消息,则默认情况下不会显示这些消息。使用
fromBeginning: true
订阅或等待您的消费者订阅并加入群组,然后再生成(检测事件会在群组加入时发出一个事件,您可以像我们等待消息被消费一样等待).
经过 Tommy Brunn 的回答一段时间后,我发现了一些错误,最后我得到了这个:
export const waitForKafkaMessages = async (
kafka: Kafka,
messagesAmount: number,
topic: string,
fromBeginning: boolean,
groupId: string,
): Promise<KafkaMessage[]> => {
const consumer: Consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
let resolveOnConsumption: (messages: KafkaMessage[]) => void
let rejectOnError: (e: Error) => void
const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
resolveOnConsumption = resolve
rejectOnError = reject
}).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
const messages: KafkaMessage[] = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, partition, topic }) => {
try {
// eachMessage is called by eachBatch which can consume more than messagesAmount.
// This is why we manually commit only messagesAmount messages.
if (messages.length < messagesAmount) {
messages.push(message)
// +1 because we need to commit the next assigned offset.
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
}
if (messages.length === messagesAmount) {
// I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
// This is why we close it in the promise's finally block
resolveOnConsumption(messages)
}
} catch (e) {
rejectOnError(e)
}
},
})
return returnThisPromise
}
考虑这个测试,其中一条消息从测试发送到主题 'out',被测试的代码应该使用它并通过发送消息到主题 'in' 来回复。为了通过,我想确保已将消息发送到主题 'in'。
it('...', async () => {
/* initialize kafkaConsumer and kafkaProducer here */
async function someCallback() {
// ...
}
await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
await kafkaConsumer.run({ eachMessage: someCallback })
await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })
// How do I block here until someCallback is called?
})
我阅读了有关使用 done
的信息,但我无法使用它,因为测试本身已定义 async
,我需要它才能使用 await
。有没有其他我不知道的方法?
您可以看看我们如何测试 KafkaJS 本身以获取一些启发。例如,here's a basic consumer test.
我们真的没有做任何花哨的事情,只是从 eachMessage
回调中将消息添加到一个数组,然后等待一个承诺,该承诺会定期检查我们是否达到了预期的消息数量。像这样:
it('consumes messages', async () => {
const messages = [{ value: 'hello world' }]
const consumedMessages = []
consumer.run({
eachMessage: ({ message }) => {
consumedMessages.push(message);
}
})
await producer.send({ topic, messages })
await waitFor(() => consumedMessages.length === messages.length)
})
其中 waitFor
本质上是一个函数,它 returns 一个承诺并启动一个 setTimeout 来检查谓词并在谓词为真时解析承诺(或者如果遇到超时则拒绝)。
需要牢记的一些陷阱:
- 在每个 运行 上使用一个新的
groupId
,这样多个 运行 就不会相互干扰。 - 出于同样的原因,在每次测试中使用一个新主题 运行。
- 如果您在消费者加入群组并订阅主题之前生成消息,则默认情况下不会显示这些消息。使用
fromBeginning: true
订阅或等待您的消费者订阅并加入群组,然后再生成(检测事件会在群组加入时发出一个事件,您可以像我们等待消息被消费一样等待).
经过 Tommy Brunn 的回答一段时间后,我发现了一些错误,最后我得到了这个:
export const waitForKafkaMessages = async (
kafka: Kafka,
messagesAmount: number,
topic: string,
fromBeginning: boolean,
groupId: string,
): Promise<KafkaMessage[]> => {
const consumer: Consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
let resolveOnConsumption: (messages: KafkaMessage[]) => void
let rejectOnError: (e: Error) => void
const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
resolveOnConsumption = resolve
rejectOnError = reject
}).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
const messages: KafkaMessage[] = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, partition, topic }) => {
try {
// eachMessage is called by eachBatch which can consume more than messagesAmount.
// This is why we manually commit only messagesAmount messages.
if (messages.length < messagesAmount) {
messages.push(message)
// +1 because we need to commit the next assigned offset.
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
}
if (messages.length === messagesAmount) {
// I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
// This is why we close it in the promise's finally block
resolveOnConsumption(messages)
}
} catch (e) {
rejectOnError(e)
}
},
})
return returnThisPromise
}