Kafka 领导人选举导致 Kafka Streams 崩溃

Kafka leader election causes Kafka Streams crash

我有一个 Kafka Streams 应用程序从 Kafka 集群消费和生产,该集群具有 3 个代理,复制因子为 3。除了消费者偏移主题(50 个分区),所有其他主题每个只有一个分区。

当代理尝试首选副本选举时,Streams 应用程序(运行 在与代理完全不同的实例上)失败并显示错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

考虑到它 运行 在不属于 Kafka 集群的服务器上,Streams 应用程序尝试成为分区的领导者是否正常?

我可以通过以下方式按需重现此行为:

  1. 杀死其中一个经纪人(如预期的那样,其他 2 个经纪人接管了所有以被杀死的经纪人为领导者的分区的领导者)
  2. 让被杀死的经纪人复活
  3. bin/kafka-preferred-replica-election.sh --zookeeper localhost
  4. 触发首选副本领导者选举

我的问题似乎与此类似reported failure, so I'm wondering if this is a new Kafka Streams bug. My full stack trace is literally exactly the same as the gist linked in the reported failure (here)。

另一个可能有趣的细节是,在领导者选举期间,我在代理的 controller.log 中收到这些消息:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

我最初认为这个连接错误是罪魁祸首,但在领导者选举使 Streams 应用程序崩溃后,如果我重新启动 Streams 应用程序,它会正常工作直到下一次选举,而我根本不需要接触代理。

所有服务器(3 个 Kafka 代理和 Streams 应用程序)都在 运行 EC2 实例上。

此问题现已在 0.10.2.1 中修复。如果您无法接受,请确保您在流配置中按如下方式设置了这两个参数:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));