Kafka 侦听器中的线程休眠

Thread Sleep in the Kafka Listener

我正在尝试 pause/resume Kafka 容器。使用以下代码片段执行此操作:

kafkaListenerEndpointRegistry.getListenerContainer("MAIN").pause();

当我调用暂停时,我还需要做一个thread.sleep,这样批处理中的消息就不会被处理。对于批处理中的每条消息,我正在调用另一个具有速率限制的 API。要维持此速率限制,我需要停止对消息的处理。

如果Main线程休眠,是否会阻止Listener发送心跳?它是否也停止了后台的心跳线程? 文档说,“当容器暂停时,它会继续轮询()消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。” 但是我正在暂停容器并让线程休眠。这将如何影响流量?

您绝不能休眠使用者线程,以避免重新平衡。

相反,减少 max.poll.records 以便暂停更快生效(消费者不会真正暂停,直到处理完上一个轮询收到的记录)。

您可以在暂停消费者后抛出异常,但您需要以某种方式恢复容器。

我开了一个新问题来改进这个行为https://github.com/spring-projects/spring-kafka/issues/2280

如果您受到速率限制,请考虑使用 KafkaTemplate.receive() 方法、按计划或轮询 Spring Integration adapter,而不是使用 message-driven 方法。