检查 Kafka 队列是否为空
Check if Kafka Queue is Empty
现在我有将几百条消息写入 kafka 队列的功能。但是,当所有这些消息都已被使用时,我还需要执行其他功能。有没有办法在 kafka 队列上放置一个侦听器,以便在它被清空时得到通知?
我认为你可以通过两种方式解决这个问题:
- Kafka 的 Fetch Response 包含一个
HighwaterMarkOffset
,它本质上是分区中最后一条消息的偏移量。您可以检查您的消息是否具有该偏移量,如果有 - 您已经到达终点。但是,如果您让生产者和消费者同时工作,这将不起作用 - 消费者可以更快地使用消息,从而比您需要的更早停止。
- 发送 "poison pill" 消息 - 假设您需要生成 100 条消息。然后你的生产者发送这 100 条消息 + 1 条特殊消息(例如一些 UUID,但确保它在正常情况下不会出现在你的逻辑中)这意味着 "the end"。在消费者方面,您将检查收到的消息是否是毒丸,如果是则关闭。
现在我有将几百条消息写入 kafka 队列的功能。但是,当所有这些消息都已被使用时,我还需要执行其他功能。有没有办法在 kafka 队列上放置一个侦听器,以便在它被清空时得到通知?
我认为你可以通过两种方式解决这个问题:
- Kafka 的 Fetch Response 包含一个
HighwaterMarkOffset
,它本质上是分区中最后一条消息的偏移量。您可以检查您的消息是否具有该偏移量,如果有 - 您已经到达终点。但是,如果您让生产者和消费者同时工作,这将不起作用 - 消费者可以更快地使用消息,从而比您需要的更早停止。 - 发送 "poison pill" 消息 - 假设您需要生成 100 条消息。然后你的生产者发送这 100 条消息 + 1 条特殊消息(例如一些 UUID,但确保它在正常情况下不会出现在你的逻辑中)这意味着 "the end"。在消费者方面,您将检查收到的消息是否是毒丸,如果是则关闭。