KafkaConsumer 永远不会退出 .poll 方法 - GroupCoordinatorNotAvailableException
KafkaConsumer never exits .poll method - GroupCoordinatorNotAvailableException
我在 java 中实现了 KafkaConsumer
,目前它永远不会退出 .poll
方法。当我在调试模式下深入研究源代码时,我发现它卡在 AbstractCoordinator.ensureCoordinatorKnown()
的 while 循环中,因为从未找到协调器。
循环中从 sendGroupMetadataRequest()
返回的 future 第一次以 org.apache.kafka.clients.consumer.internals.SendFailedException
失败,随后每次都以 org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
失败。有谁知道为什么会发生这种情况?
如果我使用控制台 producer/consumer 我能够成功发送和接收消息,只有当我使用我的 KafkaConsumer 实现时。此外,消费者确实在我的两台服务器上工作,所以我知道这不是消费者的实现。
以下是我的消费者创建时使用的属性:
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
编辑:
topic肯定是在consumer启动前创建的
编辑 2:
我删除了集群中的所有代理并重新创建了它们,现在我在不同的地方失败了。在 AbstractCoordinator.ensureActiveGroup()
中尝试重新加入时,从 performGroupJoin()
返回的未来反复失败 org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
。仍然不确定发生了什么。
编辑 3:
我删除了代理并使用不同的 ID 重新创建了它们,现在 .poll()
方法正在返回并且它正在成功使用消息。我仍然想知道为什么它一开始就失败了,这样我就可以确保它不会再次发生。
删除代理并创建新代理解决了问题。不过仍然不确定经纪人出了什么问题。
我在 java 中实现了 KafkaConsumer
,目前它永远不会退出 .poll
方法。当我在调试模式下深入研究源代码时,我发现它卡在 AbstractCoordinator.ensureCoordinatorKnown()
的 while 循环中,因为从未找到协调器。
循环中从 sendGroupMetadataRequest()
返回的 future 第一次以 org.apache.kafka.clients.consumer.internals.SendFailedException
失败,随后每次都以 org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
失败。有谁知道为什么会发生这种情况?
如果我使用控制台 producer/consumer 我能够成功发送和接收消息,只有当我使用我的 KafkaConsumer 实现时。此外,消费者确实在我的两台服务器上工作,所以我知道这不是消费者的实现。
以下是我的消费者创建时使用的属性:
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
编辑:
topic肯定是在consumer启动前创建的
编辑 2:
我删除了集群中的所有代理并重新创建了它们,现在我在不同的地方失败了。在 AbstractCoordinator.ensureActiveGroup()
中尝试重新加入时,从 performGroupJoin()
返回的未来反复失败 org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
。仍然不确定发生了什么。
编辑 3:
我删除了代理并使用不同的 ID 重新创建了它们,现在 .poll()
方法正在返回并且它正在成功使用消息。我仍然想知道为什么它一开始就失败了,这样我就可以确保它不会再次发生。
删除代理并创建新代理解决了问题。不过仍然不确定经纪人出了什么问题。