KafkaConsumer.close() 为什么?
KafkaConsumer.close() Why?
我在这里询问有关关闭kafka消费者的问题。即使使用线程退出,我是否需要关闭 kafka 消费者?关闭它不会因任何更改而泄漏资源吗?
这是一个代码示例:
public class MyThread extends Thread{
private KafkaConsumer<String, Message> kafkaConsumer;
@Override
public void run() {
kafkaConsumer = initConsumer();
while(true){
kafkaconsumer.poll(1000000)
//Code goes here.
}
}
}
当 MyThread
使用 System.exit
退出时 kafkaConsumer
是否关闭?
引自 "Kafka - the definitive guide" 作者:Gwen Shapira、Neha Narkhede、Todd Palino(O' Reilly Media):
Always close() the consumer before exiting. This will close the network connections and sockets. It will also trigger a rebalance immediately rather than wait for the group coordinator to discover that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time in which consumers can't consume messages from a subset of the partitions.
我在这里询问有关关闭kafka消费者的问题。即使使用线程退出,我是否需要关闭 kafka 消费者?关闭它不会因任何更改而泄漏资源吗?
这是一个代码示例:
public class MyThread extends Thread{
private KafkaConsumer<String, Message> kafkaConsumer;
@Override
public void run() {
kafkaConsumer = initConsumer();
while(true){
kafkaconsumer.poll(1000000)
//Code goes here.
}
}
}
当 MyThread
使用 System.exit
退出时 kafkaConsumer
是否关闭?
引自 "Kafka - the definitive guide" 作者:Gwen Shapira、Neha Narkhede、Todd Palino(O' Reilly Media):
Always close() the consumer before exiting. This will close the network connections and sockets. It will also trigger a rebalance immediately rather than wait for the group coordinator to discover that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time in which consumers can't consume messages from a subset of the partitions.