spring kafka offset increment even auto commit offset set to false
spring kafka offset increment even auto commit offset is set to false
我正在尝试为 kafka
上收到的消息实施 manual offset commit
。我已将偏移量提交设置为 false
,但偏移量值一直在增加。
不知道是什么原因。需要帮助解决问题。
下面是代码
application.yml
spring:
application:
name: kafka-consumer-sample
resources:
cache:
period: 60m
kafka:
bootstrapServers: localhost:9092
options:
enable:
auto:
commit: false
KafkaConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Consumed Kafka Record: " + record);
record.timestampType();
System.out.println("record.timestamp() = " + record.timestamp());
System.out.println("***********************************");
System.out.println(record.timestamp());
System.out.println("record.key() = " + record.key());
System.out.println("Consumed String Message : " + record.value());
}
}
输出如下
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11
属性如下
auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
这是在我重启消费者之后。我希望更早的数据也能打印出来。
我的理解对吗?
请注意,我正在重新启动我的 springboot 应用程序,希望消息从头开始。我的 kafka 服务器和 zookeeper 没有终止。
如果auto
确认被禁用属性 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
,那么你必须将容器级别的确认模式设置为MANUAL
并且不要提交 offset
因为默认情况下它设置为 BATCH.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
因为禁用自动确认时 container 级别确认设置为 BATCH
public void setAckMode(ContainerProperties.AckMode ackMode)
设置自动确认(在配置属性中)为假时使用的确认模式。
- RECORD:每条记录传递给侦听器后确认。
- BATCH:在从消费者收到的每批记录传递给侦听器后确认
- TIME:在此毫秒数后确认; (应该大于#setPollTimeout(long) pollTimeout.
- COUNT:至少收到此数量的记录后确认
- 手动:侦听器负责确认 - 使用 AcknowledgingMessageListener。
参数:
ackMode - the ContainerProperties.AckMode; default BATCH.
Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.
如果您想始终从头开始阅读,则必须将 属性 auto.offset.reset
设置为 earliest
config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
注意:确保groupId
必须是新的,在kafka
中没有任何偏移量
我正在尝试为 kafka
上收到的消息实施 manual offset commit
。我已将偏移量提交设置为 false
,但偏移量值一直在增加。
不知道是什么原因。需要帮助解决问题。
下面是代码
application.yml
spring:
application:
name: kafka-consumer-sample
resources:
cache:
period: 60m
kafka:
bootstrapServers: localhost:9092
options:
enable:
auto:
commit: false
KafkaConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Consumed Kafka Record: " + record);
record.timestampType();
System.out.println("record.timestamp() = " + record.timestamp());
System.out.println("***********************************");
System.out.println(record.timestamp());
System.out.println("record.key() = " + record.key());
System.out.println("Consumed String Message : " + record.value());
}
}
输出如下
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11
属性如下
auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
这是在我重启消费者之后。我希望更早的数据也能打印出来。
我的理解对吗? 请注意,我正在重新启动我的 springboot 应用程序,希望消息从头开始。我的 kafka 服务器和 zookeeper 没有终止。
如果auto
确认被禁用属性 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
,那么你必须将容器级别的确认模式设置为MANUAL
并且不要提交 offset
因为默认情况下它设置为 BATCH.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
因为禁用自动确认时 container 级别确认设置为 BATCH
public void setAckMode(ContainerProperties.AckMode ackMode)
设置自动确认(在配置属性中)为假时使用的确认模式。
- RECORD:每条记录传递给侦听器后确认。
- BATCH:在从消费者收到的每批记录传递给侦听器后确认
- TIME:在此毫秒数后确认; (应该大于#setPollTimeout(long) pollTimeout.
- COUNT:至少收到此数量的记录后确认
- 手动:侦听器负责确认 - 使用 AcknowledgingMessageListener。
参数:
ackMode - the ContainerProperties.AckMode; default BATCH.
Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.
如果您想始终从头开始阅读,则必须将 属性 auto.offset.reset
设置为 earliest
config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
注意:确保groupId
必须是新的,在kafka