卡夫卡消费者在 CommitFailedException 之后不断循环处理一堆消息

kafka consumer keeps looping over a bunch of messages after CommitFailedException

我运行正在使用多线程 kafka 091 消费者[新]。

我生成 client.id 的方法是使用 "hostname the consumer is running on" + "AtomicInt" + "the PID of the process".

的组合

当我必须停止使用者并重新启动时,我 运行 遇到了问题。 Consumer不断尝试处理之前运行(大约100个)没有消费的offset。但是它一直失败并显示此消息。

  2016-10-21 14:22:55,293 [pool-3-thread-6] INFO  o.a.k.c.c.i.AbstractCoordinator  : Marking the coordinator 2147483647 dead.
2016-10-21 14:22:55,295 [pool-3-thread-6] ERROR o.a.k.c.c.i.ConsumerCoordinator  : Error UNKNOWN_MEMBER_ID occurred while committing offsets for group x.cg
2016-10-21 14:22:55,296 [pool-3-thread-6] ERROR o.a.k.c.c.i.ConsumerCoordinator  : Offset commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
        at com.kfc.kafka.consumer.KFCConsumer$KafkaConsumerRunner.run(KFCConsumer.java:102)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2016-10-21 14:22:55,397 [pool-3-thread-6] INFO  o.a.k.c.c.i.AbstractCoordinator  : Attempt to join group x.cg failed due to unknown member id, resetting and retrying.
......... 
2016-10-21 14:22:58,124 [pool-3-thread-3] INFO  o.a.k.c.c.i.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.

从 kakfa 日志中,我看到发生了很多重新平衡。

[2016-10-21 21:28:18,196] INFO [GroupCoordinator 1]: Stabilized group x.cg generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:18,196] INFO [GroupCoordinator 1]: Stabilized group x.cg generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:18,200] INFO [GroupCoordinator 1]: Assignment received from leader for group x.cg for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:18,200] INFO [GroupCoordinator 1]: Assignment received from leader for group x.cg for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:18,952] INFO [GroupCoordinator 1]: Preparing to restabilize group x.cg with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:18,952] INFO [GroupCoordinator 1]: Preparing to restabilize group x.cg with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:48,233] INFO [GroupCoordinator 1]: Stabilized group x.cg generation 2 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:48,233] INFO [GroupCoordinator 1]: Stabilized group x.cg generation 2 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:48,243] INFO [GroupCoordinator 1]: Assignment received from leader for group x.cg for generation 2 (kafka.coordinator.GroupCoordinator)
[2016-10-21 21:28:48,243] INFO [GroupCoordinator 1]: Assignment received from leader for group x.cg for generation 2 (kafka.coordinator.GroupCoordin

原来我们有长时间的、反复出现的停顿[网络速度慢、外部组件出现问题等] w.r.t 我们的消费者正在与之交互的外部组件。

解决方案是将我们的消费者分成三个具有不同消费者组和 Kafka 配置的消费者(heartbeatinterval.ms、session.timeout.ms、request.timeout.ms、maxPartitionFetchBytes)。 让 3 个不同的消费者对上述属性进行自定义配置帮助我们摆脱了上述问题。

一般的想法是不要在消费者内部进行大量外部通信,因为这会增加 Kafka 消费者行为的不确定性,并且当您确实有外部通信时,请确保 Kafka 消费者配置与外部的 SLA 一致组件。