Kafka 消费者异常和抵消提交
Kafka consumer exception and offset commits
我一直在尝试为 Spring Kafka 做一些 POC 工作。具体来说,我想尝试在 Kafka 中消费消息时处理错误的最佳实践。
我想知道是否有人能够提供帮助:
- 分享关于 Kafka 消费者应该做什么的最佳实践
出现故障时
- 帮我理解AckMode Record的工作原理,以及在监听方法抛出异常时如何防止提交到Kafka偏移队列。
2 的代码示例如下:
鉴于 AckMode 设置为 RECORD,根据 documentation:
commit the offset when the listener returns after processing the
record.
我原以为如果侦听器方法抛出异常,偏移量不会增加。但是,当我使用下面的 code/config/command 组合对其进行测试时,情况并非如此。偏移量仍会更新,并继续处理下一条消息。
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
我的代码:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
验证偏移量的命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
我正在使用 kafka_2.12-0.10.2.0 和 org.springframework.kafka:spring-kafka:1.1.3.RELEASE
容器(通过 ContainerProperties
)有一个 属性、ackOnError
,默认为真...
/**
* Set whether or not the container should commit offsets (ack messages) where the
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is
* effective only when the kafka property {@code enable.auto.commit} is {@code false};
* it is not applicable to manual ack modes. When this property is set to {@code true}
* (the default), all messages handled will have their offset committed. When set to
* {@code false}, offsets will be committed only for successfully handled messages.
* Manual acks will be always be applied. Bear in mind that, if the next message is
* successfully handled, its offset will be committed, effectively committing the
* offset of the failed message anyway, so this option has limited applicability.
* Perhaps useful for a component that starts throwing exceptions consistently;
* allowing it to resume when restarted from the last successfully processed message.
* @param ackOnError whether the container should acknowledge messages that throw
* exceptions.
*/
public void setAckOnError(boolean ackOnError) {
this.ackOnError = ackOnError;
}
不过请记住,如果下一条消息成功,它的偏移量无论如何都会被提交,这也有效地提交了失败的偏移量。
编辑
从 2.3 版开始,ackOnError
现在默认为 false
。
我一直在尝试为 Spring Kafka 做一些 POC 工作。具体来说,我想尝试在 Kafka 中消费消息时处理错误的最佳实践。
我想知道是否有人能够提供帮助:
- 分享关于 Kafka 消费者应该做什么的最佳实践 出现故障时
- 帮我理解AckMode Record的工作原理,以及在监听方法抛出异常时如何防止提交到Kafka偏移队列。
2 的代码示例如下:
鉴于 AckMode 设置为 RECORD,根据 documentation:
commit the offset when the listener returns after processing the record.
我原以为如果侦听器方法抛出异常,偏移量不会增加。但是,当我使用下面的 code/config/command 组合对其进行测试时,情况并非如此。偏移量仍会更新,并继续处理下一条消息。
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
我的代码:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
验证偏移量的命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
我正在使用 kafka_2.12-0.10.2.0 和 org.springframework.kafka:spring-kafka:1.1.3.RELEASE
容器(通过 ContainerProperties
)有一个 属性、ackOnError
,默认为真...
/**
* Set whether or not the container should commit offsets (ack messages) where the
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is
* effective only when the kafka property {@code enable.auto.commit} is {@code false};
* it is not applicable to manual ack modes. When this property is set to {@code true}
* (the default), all messages handled will have their offset committed. When set to
* {@code false}, offsets will be committed only for successfully handled messages.
* Manual acks will be always be applied. Bear in mind that, if the next message is
* successfully handled, its offset will be committed, effectively committing the
* offset of the failed message anyway, so this option has limited applicability.
* Perhaps useful for a component that starts throwing exceptions consistently;
* allowing it to resume when restarted from the last successfully processed message.
* @param ackOnError whether the container should acknowledge messages that throw
* exceptions.
*/
public void setAckOnError(boolean ackOnError) {
this.ackOnError = ackOnError;
}
不过请记住,如果下一条消息成功,它的偏移量无论如何都会被提交,这也有效地提交了失败的偏移量。
编辑
从 2.3 版开始,ackOnError
现在默认为 false
。