Kafka消费者轮询和重新连接

Kafka Consumer poll and reconnection

我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。

1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行

    Consumer<String, String> consumer =  new KafkaConsumer<String,String>(props);
    consumer.subscribe(getConsumerRegisteredTopics());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records){
           processRecord (record)
        }
   }  

2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪活动的 Consumer。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。你能帮我理解 KafkaConsumer 轮询调用的行为吗?

提前致谢。

1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是 KafkaConsumer 'poll' 调用将不确定地阻塞,直到它能够连接和读取元数据。换句话说......你的消费者实际上并没有连接但正在等待kafka集群出现。

2) 轮询超时告诉消费者等待多长时间才能 return 任何数据。您必须确保在 poll returns 之后再次调用 poll 足够快以使您的消费者保持活跃。轮询调用的超时与 KafkaConsumer 的 keepalive 机制无关(这由 session.timeout.ms 属性 的消费者属性)。