即使从未调用过 acknowledge() 也正在提交偏移量
Offset is being committed even though acknowledge() is never called
我正在实现一个微服务,它从 Kafka 队列中读取消息并将它们写入数据库。我正在使用 spring-boot 1.5.6.RELEASE
和 spring-kafka 1.3.0.RELEASE
。为了避免丢失数据,我需要确保消息在提交偏移量之前已保存在数据库中,因此我将 enable.auto.commit
设置为 false 和 AckMode
到 MANUAL_IMMEDIATE。这是我的 Kafka 配置:
@Configuration
@EnableKafka
public class KafkaConfiguration {
...
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {
{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
...
}
对于监听器的实现,我使用了 @KafkaListener
注释。消息在数据库中持久化后,我使用 acknowledge()
方法提交偏移量。这是我的听众的样子:
@KafkaListener(topics = "${kafka.myTopic}")
public void receive(ConsumerRecord<String, String> payload, Acknowledgment acknowledgment) {
// persist message here
acknowledgment.acknowledge();
latch.countDown();
}
为了测试我的应用程序,我停止了数据库,以便当业务逻辑尝试保留消息时,在 acknowledge()
方法提交偏移量之前会抛出运行时异常:
1) 停止了数据库。
2) 发送内容为MESSAGE_1.
的消息
3) 启动数据库。
4) 再发送一条内容为MESSAGE_2.
的消息
最终结果是数据库只包含MESSAGE_2,所以第一条消息丢失了。我可以在数据库中获取两条消息的唯一方法是在启动数据库后重新启动微服务时:
1) 停止了数据库。
2) 发送内容为MESSAGE_1.
的消息
3) 启动数据库。
4) 重启微服务
5) 再发送一条内容为MESSAGE_2.
的消息
这次两条消息都在数据库中。我的问题是为什么在第一种情况下偏移量是提交事件,尽管抛出了运行时异常并且从未调用过 acknowledge()
?实现我的 kafka 侦听器的正确方法是什么,这样如果在处理接收到的消息期间发生某些事情我就不会丢失数据?
提前致谢!
你必须研究 Apache Kafka 是如何工作的。
commit offset是针对同组的新消费者或者同一个重启的。对于当前 运行 消费者来说,这没有意义,Broker 会跟踪内存中的当前偏移量,因此所有这些提交都与记录获取过程无关。
你要考虑seek
消费回你感兴趣的位置:https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#seek
另请参阅此 GH 问题:https://github.com/spring-projects/spring-kafka/issues/470
默认情况下,kafka 将在指定的时间间隔后提交偏移量,当使用手动确认时,您应该始终 acknowledge 处理记录和 nack 失败的记录。
下面是示例代码
@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
acknowledgment.acknowledge();
} else {
acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
}
value++;
}
在这里我确认记录是否为偶数,否则我正在寻找位置回到当前记录并在 10 毫秒后重试。
我正在实现一个微服务,它从 Kafka 队列中读取消息并将它们写入数据库。我正在使用 spring-boot 1.5.6.RELEASE
和 spring-kafka 1.3.0.RELEASE
。为了避免丢失数据,我需要确保消息在提交偏移量之前已保存在数据库中,因此我将 enable.auto.commit
设置为 false 和 AckMode
到 MANUAL_IMMEDIATE。这是我的 Kafka 配置:
@Configuration
@EnableKafka
public class KafkaConfiguration {
...
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {
{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
...
}
对于监听器的实现,我使用了 @KafkaListener
注释。消息在数据库中持久化后,我使用 acknowledge()
方法提交偏移量。这是我的听众的样子:
@KafkaListener(topics = "${kafka.myTopic}")
public void receive(ConsumerRecord<String, String> payload, Acknowledgment acknowledgment) {
// persist message here
acknowledgment.acknowledge();
latch.countDown();
}
为了测试我的应用程序,我停止了数据库,以便当业务逻辑尝试保留消息时,在 acknowledge()
方法提交偏移量之前会抛出运行时异常:
1) 停止了数据库。
2) 发送内容为MESSAGE_1.
的消息3) 启动数据库。
4) 再发送一条内容为MESSAGE_2.
的消息最终结果是数据库只包含MESSAGE_2,所以第一条消息丢失了。我可以在数据库中获取两条消息的唯一方法是在启动数据库后重新启动微服务时:
1) 停止了数据库。
2) 发送内容为MESSAGE_1.
的消息3) 启动数据库。
4) 重启微服务
5) 再发送一条内容为MESSAGE_2.
的消息这次两条消息都在数据库中。我的问题是为什么在第一种情况下偏移量是提交事件,尽管抛出了运行时异常并且从未调用过 acknowledge()
?实现我的 kafka 侦听器的正确方法是什么,这样如果在处理接收到的消息期间发生某些事情我就不会丢失数据?
提前致谢!
你必须研究 Apache Kafka 是如何工作的。
commit offset是针对同组的新消费者或者同一个重启的。对于当前 运行 消费者来说,这没有意义,Broker 会跟踪内存中的当前偏移量,因此所有这些提交都与记录获取过程无关。
你要考虑seek
消费回你感兴趣的位置:https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#seek
另请参阅此 GH 问题:https://github.com/spring-projects/spring-kafka/issues/470
默认情况下,kafka 将在指定的时间间隔后提交偏移量,当使用手动确认时,您应该始终 acknowledge 处理记录和 nack 失败的记录。
下面是示例代码
@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
acknowledgment.acknowledge();
} else {
acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
}
value++;
}
在这里我确认记录是否为偶数,否则我正在寻找位置回到当前记录并在 10 毫秒后重试。