Spring 具有至少一种语义的 Kafka 消费者
Spring Kafka Consumer with At Least One Semantics
这是我的要求
- 我需要读取 2 个主题的数据。
- 我需要一次读取 1 条消息并调用 DB。
- 一旦数据库调用完成(无论成功或失败),我需要提交 kafka 偏移量。
我已经完成了以下配置。那么这是在一次读取和处理 1 条消息时实现 AT LEAST ONCE 语义的正确方法吗?
我对 Kafka Consumer 读取的重复消息没有意见。(尽管我想减少重复)。
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
return factory;
}
@KafkaListener(topics = "mytopic-1")
public void consumeFromTopic1(String message, ConsumerRecordMetadata meta)
{
dbservice.callDB(message);
}
@KafkaListener(topics = "mytopic-2")
public void consumeFromTopic2(String message, ConsumerRecordMetadata meta)
{
dbservice.saveInDB(message);
}
不,ENABLE_AUTO_COMMIT_CONFIG
应该为 false,当侦听器退出时,侦听器容器将以确定性的方式可靠地提交偏移量。
这是我的要求
- 我需要读取 2 个主题的数据。
- 我需要一次读取 1 条消息并调用 DB。
- 一旦数据库调用完成(无论成功或失败),我需要提交 kafka 偏移量。
我已经完成了以下配置。那么这是在一次读取和处理 1 条消息时实现 AT LEAST ONCE 语义的正确方法吗?
我对 Kafka Consumer 读取的重复消息没有意见。(尽管我想减少重复)。
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
return factory;
}
@KafkaListener(topics = "mytopic-1")
public void consumeFromTopic1(String message, ConsumerRecordMetadata meta)
{
dbservice.callDB(message);
}
@KafkaListener(topics = "mytopic-2")
public void consumeFromTopic2(String message, ConsumerRecordMetadata meta)
{
dbservice.saveInDB(message);
}
不,ENABLE_AUTO_COMMIT_CONFIG
应该为 false,当侦听器退出时,侦听器容器将以确定性的方式可靠地提交偏移量。