如果第一个代理关闭,Kafka 消费者将无法消费

Kafka consumer fails to consume if first broker is down

我正在使用最新版本的 kafka(kafka_2.12-1.0.0.tgz)。我已经设置了包含 3 个代理的简单集群(只是在每个实例的属性文件中更改了 broker.id=1 和 listeners=PLAINTEXT://:9092)。集群启动后,我使用以下命令创建了主题

./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo

然后使用以下命令启动kafka消费者和生产者

./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

当所有代理都启动时,一切正常。但是,如果我先杀死(按开始顺序)经纪人消息发送给经纪人但消费者无法收到任何 message.Messages 不会丢失。启动后,代理消费者立即收到消息。

关闭代理实例后的消费者日志:

[2018-01-09 13:39:31,130] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,132] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,344] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,451] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,848] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,950] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:32,363] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:33,092] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,216] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,218] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=2, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: The coordinator is not available. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:39:34,219] WARN [Consumer clientId=consumer-1, groupId=test] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

再次启动missing broker后的consumer日志:

[2018-01-09 13:41:21,739] ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition demo-0 at offset 3: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:21,739] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=2, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,353] ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition demo-0 at offset 3: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,354] WARN [Consumer clientId=consumer-1, groupId=test] Asynchronous auto-commit of offsets {demo-0=OffsetAndMetadata{offset=3, metadata=''}, demo-1=OffsetAndMetadata{offset=3, metadata=''}, demo-2=OffsetAndMetadata{offset=2, metadata=''}, demo-3=OffsetAndMetadata{offset=2, metadata=''}, demo-4=OffsetAndMetadata{offset=1, metadata=''}, demo-5=OffsetAndMetadata{offset=1, metadata=''}, demo-6=OffsetAndMetadata{offset=3, metadata=''}, demo-7=OffsetAndMetadata{offset=2, metadata=''}, demo-8=OffsetAndMetadata{offset=3, metadata=''}, demo-9=OffsetAndMetadata{offset=2, metadata=''}, demo-10=OffsetAndMetadata{offset=3, metadata=''}, demo-11=OffsetAndMetadata{offset=3, metadata=''}, demo-12=OffsetAndMetadata{offset=2, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

谢谢

尝试检查 "offsets.topic.replication.factor" server-*.properties 文件

例如:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/#brokerconfigs

yml 文件上使用 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR 解决此问题。

例如在 docker-swarm.

上使用 2 个工人
environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2