KafkaConsumer 永远不会退出 .poll 方法 - GroupCoordinatorNotAvailableException

KafkaConsumer never exits .poll method - GroupCoordinatorNotAvailableException

我在 java 中实现了 KafkaConsumer,目前它永远不会退出 .poll 方法。当我在调试模式下深入研究源代码时,我发现它卡在 AbstractCoordinator.ensureCoordinatorKnown() 的 while 循环中,因为从未找到协调器。

循环中从 sendGroupMetadataRequest() 返回的 future 第一次以 org.apache.kafka.clients.consumer.internals.SendFailedException 失败,随后每次都以 org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available. 失败。有谁知道为什么会发生这种情况?

如果我使用控制台 producer/consumer 我能够成功发送和接收消息,只有当我使用我的 KafkaConsumer 实现时。此外,消费者确实在我的两台服务器上工作,所以我知道这不是消费者的实现。

以下是我的消费者创建时使用的属性:

Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");

编辑:

topic肯定是在consumer启动前创建的

编辑 2: 我删除了集群中的所有代理并重新创建了它们,现在我在不同的地方失败了。在 AbstractCoordinator.ensureActiveGroup() 中尝试重新加入时,从 performGroupJoin() 返回的未来反复失败 org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.。仍然不确定发生了什么。

编辑 3: 我删除了代理并使用不同的 ID 重新创建了它们,现在 .poll() 方法正在返回并且它正在成功使用消息。我仍然想知道为什么它一开始就失败了,这样我就可以确保它不会再次发生。

删除代理并创建新代理解决了问题。不过仍然不确定经纪人出了什么问题。