Kafka消费者轮询和重新连接
Kafka Consumer poll and reconnection
我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。
1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行
Consumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}
2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪活动的 Consumer。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。你能帮我理解 KafkaConsumer 轮询调用的行为吗?
提前致谢。
1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是 KafkaConsumer 'poll' 调用将不确定地阻塞,直到它能够连接和读取元数据。换句话说......你的消费者实际上并没有连接但正在等待kafka集群出现。
2) 轮询超时告诉消费者等待多长时间才能 return 任何数据。您必须确保在 poll returns 之后再次调用 poll 足够快以使您的消费者保持活跃。轮询调用的超时与 KafkaConsumer 的 keepalive 机制无关(这由 session.timeout.ms 属性 的消费者属性)。
我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。
1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行
Consumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}
2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪活动的 Consumer。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。你能帮我理解 KafkaConsumer 轮询调用的行为吗?
提前致谢。
1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是 KafkaConsumer 'poll' 调用将不确定地阻塞,直到它能够连接和读取元数据。换句话说......你的消费者实际上并没有连接但正在等待kafka集群出现。
2) 轮询超时告诉消费者等待多长时间才能 return 任何数据。您必须确保在 poll returns 之后再次调用 poll 足够快以使您的消费者保持活跃。轮询调用的超时与 KafkaConsumer 的 keepalive 机制无关(这由 session.timeout.ms 属性 的消费者属性)。