Spring kafka 消费者不遵守 auto-offset-reset = latest
Spring kafka consumer doesn't respect auto-offset-reset = latest
我们有 1 个 Kafka 主题和 1 个分区:
从 spring 引导 kafka 消费者看到一个相当奇怪的行为。 Spring kafka consumer 在重启时总是从主题开始消费。
我已经配置了 spring kafka listener 如下
kafka 监听器:
@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.debug("SG message received. Parsing...");
TransmissionMessage transmissionMessage;
SGTransmission transmission = parseMessage(message);
//Porcess Transmission......
}
消费者配置和 spring 消费者容器自动装配 bean
@Resource
public Environment env;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// I know this isnt right, should be run in 1 thread as there isonly
//partition in the topic
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
return propsMap;
}
spring 应用程序 yaml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
properties:
consumer:
# If this consumer does not have an offset yet, start at latest offset.
# Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
auto-offset-reset: latest
group-id: ${KAFKA_GROUP_ID}
每次消费者崩溃并重新启动时,所有消息都会从头开始读取。正如您在 application.yaml
中看到的那样,情况不应该如此
auto-offset-reset: latest
我可能忽略了代理端或消费者端的一些其他配置,这导致消费者每次重新启动时都从头开始读取?
您一定已经以某种方式提交了初始偏移量,也许是在您完成此配置之前。
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
这意味着您有责任提交偏移量。
使用 AckMode.BATCH
(默认值)或 AckMode.RECORD
。
或者,使用 kafka-consumer-groups
CLI 工具删除当前提交的偏移量(您也可以使用相同的工具列出当前偏移量)。
或者每次使用群的UUID获取一个新的群
编辑
您还可以让您的侦听器 class 实现 ConsumerSeekAware
并在 onPartitionsAssigned()
中调用 callback.seekToEnd(partitions)
。
我们有 1 个 Kafka 主题和 1 个分区:
从 spring 引导 kafka 消费者看到一个相当奇怪的行为。 Spring kafka consumer 在重启时总是从主题开始消费。 我已经配置了 spring kafka listener 如下
kafka 监听器:
@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.debug("SG message received. Parsing...");
TransmissionMessage transmissionMessage;
SGTransmission transmission = parseMessage(message);
//Porcess Transmission......
}
消费者配置和 spring 消费者容器自动装配 bean
@Resource
public Environment env;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// I know this isnt right, should be run in 1 thread as there isonly
//partition in the topic
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
return propsMap;
}
spring 应用程序 yaml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
properties:
consumer:
# If this consumer does not have an offset yet, start at latest offset.
# Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
auto-offset-reset: latest
group-id: ${KAFKA_GROUP_ID}
每次消费者崩溃并重新启动时,所有消息都会从头开始读取。正如您在 application.yaml
中看到的那样,情况不应该如此auto-offset-reset: latest
我可能忽略了代理端或消费者端的一些其他配置,这导致消费者每次重新启动时都从头开始读取?
您一定已经以某种方式提交了初始偏移量,也许是在您完成此配置之前。
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
这意味着您有责任提交偏移量。
使用 AckMode.BATCH
(默认值)或 AckMode.RECORD
。
或者,使用 kafka-consumer-groups
CLI 工具删除当前提交的偏移量(您也可以使用相同的工具列出当前偏移量)。
或者每次使用群的UUID获取一个新的群
编辑
您还可以让您的侦听器 class 实现 ConsumerSeekAware
并在 onPartitionsAssigned()
中调用 callback.seekToEnd(partitions)
。