kafka 从组中删除连接
kafka remove connection from group
我使用 kafka 和 spring 启动应用程序来消费消息
阅读主题中的一些消息后,我将收到此错误
[2020-07-07 17:05:32,265] INFO [GroupCoordinator 1001]: Member consumer-1-708fe639-905d-432a-b6ba-17bf58adcf03 in group biker-service-prod has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]: Preparing to rebalance group biker-service-prod in state PreparingRebalance with old generation 460 (__consumer_offsets-7) (reason: removing member consumer-1-708fe639-905d-432a-b6ba-17bf58adcf03 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]: Group biker-service-prod with generation 461 is now empty (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
这是我在消费者端的配置
spring.kafka.listener.poll-timeout=3000000
spring.kafka.consumer.heartbeat-interval=500
spring.kafka.consumer.fetch-max-wait=3000000
spring.kafka.consumer.auto-commit-interval=1000
我该如何解决?
经纪人已确定您的消费者已死亡,因为它没有在所需的时间间隔内调用 poll()
。
查看这些 consumer-side 参数:
max.poll.interval.ms
session.timeout.ms
这些将决定您的消费者何时被视为已死亡并抛出您显示的异常。注意你需要处理消息的时间,如果它大于定义的max.poll.interval.ms
,你应该:
- 减少处理消息所需的时间并再次调用
poll()
。
或
- 增加
max.poll.interval.ms
和 session.timeout.ms
间隔。
我使用 kafka 和 spring 启动应用程序来消费消息 阅读主题中的一些消息后,我将收到此错误
[2020-07-07 17:05:32,265] INFO [GroupCoordinator 1001]: Member consumer-1-708fe639-905d-432a-b6ba-17bf58adcf03 in group biker-service-prod has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]: Preparing to rebalance group biker-service-prod in state PreparingRebalance with old generation 460 (__consumer_offsets-7) (reason: removing member consumer-1-708fe639-905d-432a-b6ba-17bf58adcf03 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator) [2020-07-07 17:05:32,267] INFO [GroupCoordinator 1001]: Group biker-service-prod with generation 461 is now empty (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)
这是我在消费者端的配置
spring.kafka.listener.poll-timeout=3000000
spring.kafka.consumer.heartbeat-interval=500
spring.kafka.consumer.fetch-max-wait=3000000
spring.kafka.consumer.auto-commit-interval=1000
我该如何解决?
经纪人已确定您的消费者已死亡,因为它没有在所需的时间间隔内调用 poll()
。
查看这些 consumer-side 参数:
max.poll.interval.ms
session.timeout.ms
这些将决定您的消费者何时被视为已死亡并抛出您显示的异常。注意你需要处理消息的时间,如果它大于定义的max.poll.interval.ms
,你应该:
- 减少处理消息所需的时间并再次调用
poll()
。
或
- 增加
max.poll.interval.ms
和session.timeout.ms
间隔。