Spring Kafka - 一段时间后重新读取偏移量
Spring Kafka - Re reading an offset after sometime
我正在使用@KafkaListener 和道具作为
max.poll.records 到 50。(处理每条记录需要 40-60 秒)
启用自动提交=false
确认模式为手动即时
逻辑如下
@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {
try{
process(record);
ack.acknowledge();
}
Catch(e){
reprocess() // pause container and seek
}
}
其他属性如max.poll.interval.ms、session.timeout.ms或心跳都是默认值
我不明白这里出了什么问题,
假设如果 500 条消息发布到 2 个分区
我不确定为什么消费者不按照 max.poll.records prop 轮询记录实际上它会在应用程序启动或生产者发布消息后立即轮询所有 500 条消息
据观察,在处理一些记录后大约 5-7 分钟,消费者再次读取偏移量..实际上读取经过精细处理和确认..
一个小时后,日志文件显示相同的消息被读取了多次。
感谢任何帮助
谢谢
默认 max.poll.interval.ms
是 300,000 毫秒(5 分钟)。
您需要减少 max.poll.records
或增加间隔 - 否则 Kafka 将由于消费者无响应而强制重新平衡。
这么长的处理时间,我会推荐max.poll.records=1
;您显然不需要更高的吞吐量。
我正在使用@KafkaListener 和道具作为
max.poll.records 到 50。(处理每条记录需要 40-60 秒)
启用自动提交=false
确认模式为手动即时
逻辑如下
@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {
try{
process(record);
ack.acknowledge();
}
Catch(e){
reprocess() // pause container and seek
}
}
其他属性如max.poll.interval.ms、session.timeout.ms或心跳都是默认值
我不明白这里出了什么问题,
假设如果 500 条消息发布到 2 个分区
我不确定为什么消费者不按照 max.poll.records prop 轮询记录实际上它会在应用程序启动或生产者发布消息后立即轮询所有 500 条消息
据观察,在处理一些记录后大约 5-7 分钟,消费者再次读取偏移量..实际上读取经过精细处理和确认..
一个小时后,日志文件显示相同的消息被读取了多次。
感谢任何帮助 谢谢
默认 max.poll.interval.ms
是 300,000 毫秒(5 分钟)。
您需要减少 max.poll.records
或增加间隔 - 否则 Kafka 将由于消费者无响应而强制重新平衡。
这么长的处理时间,我会推荐max.poll.records=1
;您显然不需要更高的吞吐量。