Kafka Consumer poll() 方法被阻塞
Kafka Consumer's poll() method gets blocked
我是 Kafka 0.9 的新手,在测试一些功能时我发现 Java 实现的消费者 (KafkaConsumer
) 有一个奇怪的行为。
Kafka 代理位于 Ambari 外部机器中。
即使我可以实现一个生产者并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(轮询)时,它会卡住。
我知道生产者运行良好,因为我确实可以通过控制台消费者(在 ambari 上本地工作)使用消息。但是当我执行 Java Consumer 时,没有任何反应,只是卡住了。调试代码我可以看到它在 poll()
行被阻止:
ConsumerRecords<String, String> records = consumer.poll(100);
顺便说一句,超时没有任何作用。不管你输入 0、100 还是 1000 毫秒,消费者都会被阻塞在这一行中,不会超时也不会抛出异常。
我尝试了各种替代属性,例如advertised.host.name、advertised.listener、...依此类推,运气为零。
如有任何帮助,我们将不胜感激。提前致谢!
原因可能是您的消费代码所在的机器运行无法连接到zookeeper。在安装 Kafka 的机器上尝试 运行 相同的消费者代码(我试过这个并为我工作)。我还通过在 server.properties
文件中提及以下属性解决了这个问题:
advertised.host.name="ip address which you want to expose"
// In my case, it is the public IP of the EC2 machine, I have kafka and zookeeper installed on the same EC2 machine.
advertised.port=9092
关于声明:
ConsumerRecords<String, String> records = consumer.poll(100);
上面的说法并不意味着消费者会在100毫秒后超时;相反,现在是投票期。它在 100 毫秒内捕获的任何数据都会被读入记录集合。
在我的案例中,poll() 方法最终卡在无限循环 ensureCoordinatorReady() 中,Coordinator 词提到我协调器在另一台主机上运行。(出于测试目的,我只将一个代理主机添加到我的/etc/hosts 而总共有三个经纪人)。所以消费者正确得到消费者协调员。
于是解决方案就出来了:
在 /etc/hosts 文件
中正确配置主机 运行 kafka 代理
我是 Kafka 0.9 的新手,在测试一些功能时我发现 Java 实现的消费者 (KafkaConsumer
) 有一个奇怪的行为。
Kafka 代理位于 Ambari 外部机器中。
即使我可以实现一个生产者并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(轮询)时,它会卡住。
我知道生产者运行良好,因为我确实可以通过控制台消费者(在 ambari 上本地工作)使用消息。但是当我执行 Java Consumer 时,没有任何反应,只是卡住了。调试代码我可以看到它在 poll()
行被阻止:
ConsumerRecords<String, String> records = consumer.poll(100);
顺便说一句,超时没有任何作用。不管你输入 0、100 还是 1000 毫秒,消费者都会被阻塞在这一行中,不会超时也不会抛出异常。
我尝试了各种替代属性,例如advertised.host.name、advertised.listener、...依此类推,运气为零。
如有任何帮助,我们将不胜感激。提前致谢!
原因可能是您的消费代码所在的机器运行无法连接到zookeeper。在安装 Kafka 的机器上尝试 运行 相同的消费者代码(我试过这个并为我工作)。我还通过在 server.properties
文件中提及以下属性解决了这个问题:
advertised.host.name="ip address which you want to expose"
// In my case, it is the public IP of the EC2 machine, I have kafka and zookeeper installed on the same EC2 machine.
advertised.port=9092
关于声明:
ConsumerRecords<String, String> records = consumer.poll(100);
上面的说法并不意味着消费者会在100毫秒后超时;相反,现在是投票期。它在 100 毫秒内捕获的任何数据都会被读入记录集合。
在我的案例中,poll() 方法最终卡在无限循环 ensureCoordinatorReady() 中,Coordinator 词提到我协调器在另一台主机上运行。(出于测试目的,我只将一个代理主机添加到我的/etc/hosts 而总共有三个经纪人)。所以消费者正确得到消费者协调员。
于是解决方案就出来了: 在 /etc/hosts 文件
中正确配置主机 运行 kafka 代理