防止kafka consumer长时间进程超时

Prevent kafka consumer from timing out for long process

我需要防止 kafka 消费者在应用程序等待特定进程完成时超时。我的方法是暂停分区,然后在该过程完成后恢复分区。

List<TopicPartition> partitionList = new ArrayList<>(); 
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);

while(//waiting for the process to complete){
    Thread.sleep(10000);                            
    kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);

问题

pause是自动发送heartbeat到kafka还是需要定时轮询发送heartbeat?

我的方法是最好的吗?或者有更好的方法吗?

从Kafka 0.10.1开始,消费者确实有一个后台线程来发送心跳:https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

因此,您无需调用 poll() 向代理发送心跳。但是,还有第二次超时 max.poll.interval.ms -- 您必须在这段时间内调用 poll() 以避免第二次超时。默认值为 5 分钟。如果您的等待时间比这更长,您可以增加此超时。如果这样做,您也不需要暂停任何分区等。

如果您使用的是旧版本,您可以暂停并定期调用 poll() 的方法是发送定期心跳以避免超时的唯一方法。