在 kafka 中,当使用事务生成消息时,消费者偏移量加倍
In kafka, When producing message with transactional, Consumer offset doubled up
我正在使用 springboot 2、kafk 2.2.0、spring-kafka 2.2.5
制作项目
我制作了kafka exactly once
环境,消息的生产和消费都很好。
但是 kafka-consumer-groups.sh
是这样说的。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test_topic 0 23 24 1
test_topic 1 25 26 1
test_topic 2 21 22 1
我只向 kafka 发送了一条消息,但 LOG-END-OFFSET
加倍了,并且始终存在 1 个滞后。 (在我的 java 应用程序中,按预期生产和消费作品)
我不知道为什么 LOG-END-OFFSET 加倍了。
如果删除exactly once
配置,LOG-END-OFFSET
和CURRENT-OFFSET
计数没有问题。
这是我的 kafkaTemplate
设置代码。
@Bean
@Primary
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// exactly once producer setup
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
factory.setTransactionIdPrefix("my.transaction.");
return factory;
}
@Bean
@Primary
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Primary
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
我的生产者代码。
kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));
我检查了 LOG-END-OFFSET 翻倍的时间,现在是 produce transaction commit
时机。
我哪里做错配置了?
使用事务时,Kafka 在日志中插入“control batches”以指示消息是否是事务的一部分。
这些批次也被分配了偏移量,所以这就是为什么您看到偏移量增加了 2,即使您只发送了一条记录。
如果您想自己检查,可以使用 DumpLogSegments 工具显示您的日志内容并查看控制批次:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true
我使用事务生产者发送了一条记录,你可以看到第二个条目有 isControl: true
。
我正在使用 springboot 2、kafk 2.2.0、spring-kafka 2.2.5
制作项目我制作了kafka exactly once
环境,消息的生产和消费都很好。
但是 kafka-consumer-groups.sh
是这样说的。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test_topic 0 23 24 1
test_topic 1 25 26 1
test_topic 2 21 22 1
我只向 kafka 发送了一条消息,但 LOG-END-OFFSET
加倍了,并且始终存在 1 个滞后。 (在我的 java 应用程序中,按预期生产和消费作品)
我不知道为什么 LOG-END-OFFSET 加倍了。
如果删除exactly once
配置,LOG-END-OFFSET
和CURRENT-OFFSET
计数没有问题。
这是我的 kafkaTemplate
设置代码。
@Bean
@Primary
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// exactly once producer setup
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
factory.setTransactionIdPrefix("my.transaction.");
return factory;
}
@Bean
@Primary
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Primary
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
我的生产者代码。
kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));
我检查了 LOG-END-OFFSET 翻倍的时间,现在是 produce transaction commit
时机。
我哪里做错配置了?
使用事务时,Kafka 在日志中插入“control batches”以指示消息是否是事务的一部分。
这些批次也被分配了偏移量,所以这就是为什么您看到偏移量增加了 2,即使您只发送了一条记录。
如果您想自己检查,可以使用 DumpLogSegments 工具显示您的日志内容并查看控制批次:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true
我使用事务生产者发送了一条记录,你可以看到第二个条目有 isControl: true
。