Kafka 侦听器中的线程休眠
Thread Sleep in the Kafka Listener
我正在尝试 pause/resume Kafka 容器。使用以下代码片段执行此操作:
kafkaListenerEndpointRegistry.getListenerContainer("MAIN").pause();
当我调用暂停时,我还需要做一个thread.sleep,这样批处理中的消息就不会被处理。对于批处理中的每条消息,我正在调用另一个具有速率限制的 API。要维持此速率限制,我需要停止对消息的处理。
如果Main线程休眠,是否会阻止Listener发送心跳?它是否也停止了后台的心跳线程?
文档说,“当容器暂停时,它会继续轮询()消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。”
但是我正在暂停容器并让线程休眠。这将如何影响流量?
您绝不能休眠使用者线程,以避免重新平衡。
相反,减少 max.poll.records
以便暂停更快生效(消费者不会真正暂停,直到处理完上一个轮询收到的记录)。
您可以在暂停消费者后抛出异常,但您需要以某种方式恢复容器。
我开了一个新问题来改进这个行为https://github.com/spring-projects/spring-kafka/issues/2280
如果您受到速率限制,请考虑使用 KafkaTemplate.receive()
方法、按计划或轮询 Spring Integration adapter,而不是使用 message-driven 方法。
我正在尝试 pause/resume Kafka 容器。使用以下代码片段执行此操作:
kafkaListenerEndpointRegistry.getListenerContainer("MAIN").pause();
当我调用暂停时,我还需要做一个thread.sleep,这样批处理中的消息就不会被处理。对于批处理中的每条消息,我正在调用另一个具有速率限制的 API。要维持此速率限制,我需要停止对消息的处理。
如果Main线程休眠,是否会阻止Listener发送心跳?它是否也停止了后台的心跳线程? 文档说,“当容器暂停时,它会继续轮询()消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。” 但是我正在暂停容器并让线程休眠。这将如何影响流量?
您绝不能休眠使用者线程,以避免重新平衡。
相反,减少 max.poll.records
以便暂停更快生效(消费者不会真正暂停,直到处理完上一个轮询收到的记录)。
您可以在暂停消费者后抛出异常,但您需要以某种方式恢复容器。
我开了一个新问题来改进这个行为https://github.com/spring-projects/spring-kafka/issues/2280
如果您受到速率限制,请考虑使用 KafkaTemplate.receive()
方法、按计划或轮询 Spring Integration adapter,而不是使用 message-driven 方法。