Kafka Consumer poll() 方法被阻塞

Kafka Consumer's poll() method gets blocked

我是 Kafka 0.9 的新手,在测试一些功能时我发现 Java 实现的消费者 (KafkaConsumer) 有一个奇怪的行为。

Kafka 代理位于 Ambari 外部机器中。

即使我可以实现一个生产者并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(轮询)时,它会卡住。

我知道生产者运行良好,因为我确实可以通过控制台消费者(在 ambari 上本地工作)使用消息。但是当我执行 Java Consumer 时,没有任何反应,只是卡住了。调试代码我可以看到它在 poll() 行被阻止:

    ConsumerRecords<String, String> records = consumer.poll(100);

顺便说一句,超时没有任何作用。不管你输入 0、100 还是 1000 毫秒,消费者都会被阻塞在这一行中,不会超时也不会抛出异常。

我尝试了各种替代属性,例如advertised.host.nameadvertised.listener、...依此类推,运气为零。

如有任何帮助,我们将不胜感激。提前致谢!

原因可能是您的消费代码所在的机器运行无法连接到zookeeper。在安装 Kafka 的机器上尝试 运行 相同的消费者代码(我试过这个并为我工作)。我还通过在 server.properties 文件中提及以下属性解决了这个问题:

advertised.host.name="ip address which you want to expose"

// In my case, it is the public IP of the EC2 machine, I have kafka and zookeeper installed on the same EC2 machine.

advertised.port=9092

关于声明:

ConsumerRecords<String, String> records = consumer.poll(100);

上面的说法并不意味着消费者会在100毫秒后超时;相反,现在是投票期。它在 100 毫秒内捕获的任何数据都会被读入记录集合。

在我的案例中,poll() 方法最终卡在无限循环 ensureCoordinatorReady() 中,Coordinator 词提到我协调器在另一台主机上运行。(出于测试目的,我只将一个代理主机添加到我的/etc/hosts 而总共有三个经纪人)。所以消费者正确得到消费者协调员。

于是解决方案就出来了: 在 /etc/hosts 文件

中正确配置主机 运行 kafka 代理