如何在 KafkaJS 的 Jest 测试中等待 Kafka 响应?

How to wait for Kafka respone in a Jest test with KafkaJS?

考虑这个测试,其中一条消息从测试发送到主题 'out',被测试的代码应该使用它并通过发送消息到主题 'in' 来回复。为了通过,我想确保已将消息发送到主题 'in'。

it('...', async () => {
  /* initialize kafkaConsumer and kafkaProducer here */

  async function someCallback() {
    // ...
  }

  await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
  await kafkaConsumer.run({ eachMessage: someCallback })

  await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })

  // How do I block here until someCallback is called?
})

我阅读了有关使用 done 的信息,但我无法使用它,因为测试本身已定义 async,我需要它才能使用 await。有没有其他我不知道的方法?

您可以看看我们如何测试 KafkaJS 本身以获取一些启发。例如,here's a basic consumer test.

我们真的没有做任何花哨的事情,只是从 eachMessage 回调中将消息添加到一个数组,然后等待一个承诺,该承诺会定期检查我们是否达到了预期的消息数量。像这样:

it('consumes messages', async () => {
  const messages = [{ value: 'hello world' }]
  const consumedMessages = []

  consumer.run({
    eachMessage: ({ message }) => {
      consumedMessages.push(message);
    }
  })

  await producer.send({ topic, messages })

  await waitFor(() => consumedMessages.length === messages.length)
})

其中 waitFor 本质上是一个函数,它 returns 一个承诺并启动一个 setTimeout 来检查谓词并在谓词为真时解析承诺(或者如果遇到超时则拒绝)。

需要牢记的一些陷阱:

  • 在每个 运行 上使用一个新的 groupId,这样多个 运行 就不会相互干扰。
  • 出于同样的原因,在每次测试中使用一个新主题 运行。
  • 如果您在消费者加入群组并订阅主题之前生成消息,则默认情况下不会显示这些消息。使用 fromBeginning: true 订阅或等待您的消费者订阅并加入群组,然后再生成(检测事件会在群组加入时发出一个事件,您可以像我们等待消息被消费一样等待).

经过 Tommy Brunn 的回答一段时间后,我发现了一些错误,最后我得到了这个:

export const waitForKafkaMessages = async (
  kafka: Kafka,
  messagesAmount: number,
  topic: string,
  fromBeginning: boolean,
  groupId: string,
): Promise<KafkaMessage[]> => {
  const consumer: Consumer = kafka.consumer({ groupId })
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning })

  let resolveOnConsumption: (messages: KafkaMessage[]) => void
  let rejectOnError: (e: Error) => void

  const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
    resolveOnConsumption = resolve
    rejectOnError = reject
  }).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below

  const messages: KafkaMessage[] = []
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ message, partition, topic }) => {
      try {
        // eachMessage is called by eachBatch which can consume more than messagesAmount.
        // This is why we manually commit only messagesAmount messages.
        if (messages.length < messagesAmount) {
          messages.push(message)

          // +1 because we need to commit the next assigned offset.
          await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
        }

        if (messages.length === messagesAmount) {
          // I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
          // This is why we close it in the promise's finally block

          resolveOnConsumption(messages)
        }
      } catch (e) {
        rejectOnError(e)
      }
    },
  })

  return returnThisPromise
}