防止kafka consumer长时间进程超时
Prevent kafka consumer from timing out for long process
我需要防止 kafka 消费者在应用程序等待特定进程完成时超时。我的方法是暂停分区,然后在该过程完成后恢复分区。
List<TopicPartition> partitionList = new ArrayList<>();
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);
while(//waiting for the process to complete){
Thread.sleep(10000);
kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);
问题
pause是自动发送heartbeat到kafka还是需要定时轮询发送heartbeat?
我的方法是最好的吗?或者有更好的方法吗?
从Kafka 0.10.1开始,消费者确实有一个后台线程来发送心跳:https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
因此,您无需调用 poll()
向代理发送心跳。但是,还有第二次超时 max.poll.interval.ms
-- 您必须在这段时间内调用 poll()
以避免第二次超时。默认值为 5 分钟。如果您的等待时间比这更长,您可以增加此超时。如果这样做,您也不需要暂停任何分区等。
如果您使用的是旧版本,您可以暂停并定期调用 poll()
的方法是发送定期心跳以避免超时的唯一方法。
我需要防止 kafka 消费者在应用程序等待特定进程完成时超时。我的方法是暂停分区,然后在该过程完成后恢复分区。
List<TopicPartition> partitionList = new ArrayList<>();
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);
while(//waiting for the process to complete){
Thread.sleep(10000);
kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);
问题
pause是自动发送heartbeat到kafka还是需要定时轮询发送heartbeat?
我的方法是最好的吗?或者有更好的方法吗?
从Kafka 0.10.1开始,消费者确实有一个后台线程来发送心跳:https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
因此,您无需调用 poll()
向代理发送心跳。但是,还有第二次超时 max.poll.interval.ms
-- 您必须在这段时间内调用 poll()
以避免第二次超时。默认值为 5 分钟。如果您的等待时间比这更长,您可以增加此超时。如果这样做,您也不需要暂停任何分区等。
如果您使用的是旧版本,您可以暂停并定期调用 poll()
的方法是发送定期心跳以避免超时的唯一方法。