Kafka 消费者在消息处理失败后恢复
Kafka consumer recover after failed message processing
我在我的一个项目中使用简单的 kafka Consumer,我想要的逻辑是当消费者无法处理某些消息时,它将提交最后正确处理的消息,然后在下一次轮询时它将从失败的消息继续。
我尝试使用以下代码手动提交每条消息:
public void fetchMessages() {
ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000);
for (ConsumerRecord message : messages) {
logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]");
try {
MyObject myObject = (MyObject) message.value();
logger.info("Handling message," + myObject);
handleMessage(myObject);
commitMessage(message);
} catch (Exception e) {
logger.error("Error handling message"); throw e;
}
}
}
private void commitMessage(ConsumerRecord message) {
long nextOffset = message.offset() + 1;
TopicPartition topicPartition = new TopicPartition(kafkaTopic,message.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset);
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
offsetAndMetadataMap.put(topicPartition,offsetAndMetadata);
logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]");
kafkaConsumer.commitSync(offsetAndMetadataMap);
}
但是例如,当我获取 3 条消息时,每条消息都来自不同的分区,我成功处理了第一条消息,然后未能处理第二条消息,我只是退出 ConsumerRecord
s for 循环,我希望得到在下一次 poll
迭代中我还没有提交相同的 2 条消息。相反,消费者只是继续接收新消息,永远不会返回失败的消息。
还尝试对失败的消息应用 seek
,然后退出循环,但它在 1 个分区上工作,对许多分区都不起作用。
kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());
一些细节:
- 主题有 12 个分区
- 所有分区一个消费者
- 消费者每分钟执行一次轮询循环
- enable.auto.commit: 假
我的代码或我的逻辑有什么问题?
我发现了查找的工作原理,并且在失败的消息上我必须查找当前消费者所有分区的所有偏移量。
private void seekAllPartitions() {
logger.info("Processing of some kafka message was failed, seeking all partitions to last committed");
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(kafkaTopic);
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition topicPartition = new TopicPartition(kafkaTopic, partitionInfo.partition());
OffsetAndMetadata committedForPartition = kafkaConsumer.committed(topicPartition);
if (committedForPartition != null) {
kafkaConsumer.seek(topicPartition,committedForPartition.offset());
}
}
}
当某个分区上某个消费者组的最后一个偏移量尚未设置时,需要对 committedForPartition 进行空检查(未知)
我在我的一个项目中使用简单的 kafka Consumer,我想要的逻辑是当消费者无法处理某些消息时,它将提交最后正确处理的消息,然后在下一次轮询时它将从失败的消息继续。
我尝试使用以下代码手动提交每条消息:
public void fetchMessages() {
ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000);
for (ConsumerRecord message : messages) {
logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]");
try {
MyObject myObject = (MyObject) message.value();
logger.info("Handling message," + myObject);
handleMessage(myObject);
commitMessage(message);
} catch (Exception e) {
logger.error("Error handling message"); throw e;
}
}
}
private void commitMessage(ConsumerRecord message) {
long nextOffset = message.offset() + 1;
TopicPartition topicPartition = new TopicPartition(kafkaTopic,message.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset);
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
offsetAndMetadataMap.put(topicPartition,offsetAndMetadata);
logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]");
kafkaConsumer.commitSync(offsetAndMetadataMap);
}
但是例如,当我获取 3 条消息时,每条消息都来自不同的分区,我成功处理了第一条消息,然后未能处理第二条消息,我只是退出 ConsumerRecord
s for 循环,我希望得到在下一次 poll
迭代中我还没有提交相同的 2 条消息。相反,消费者只是继续接收新消息,永远不会返回失败的消息。
还尝试对失败的消息应用 seek
,然后退出循环,但它在 1 个分区上工作,对许多分区都不起作用。
kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());
一些细节:
- 主题有 12 个分区
- 所有分区一个消费者
- 消费者每分钟执行一次轮询循环
- enable.auto.commit: 假
我的代码或我的逻辑有什么问题?
我发现了查找的工作原理,并且在失败的消息上我必须查找当前消费者所有分区的所有偏移量。
private void seekAllPartitions() {
logger.info("Processing of some kafka message was failed, seeking all partitions to last committed");
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(kafkaTopic);
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition topicPartition = new TopicPartition(kafkaTopic, partitionInfo.partition());
OffsetAndMetadata committedForPartition = kafkaConsumer.committed(topicPartition);
if (committedForPartition != null) {
kafkaConsumer.seek(topicPartition,committedForPartition.offset());
}
}
}
当某个分区上某个消费者组的最后一个偏移量尚未设置时,需要对 committedForPartition 进行空检查(未知)