Spring Kafka事务导致producer每条消息offset增加2
Spring Kafka transaction causes producer per message offset increased by two
我在使用 Spring(boot) Kafka 的微服务中有一个消费-转换-生产工作流。我需要实现 Kafka 事务提供的 exactly-once 语义。
下面是代码片段:
配置
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
return defaultKafkaProducerFactory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
@Bean
@Qualifier("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager(KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
concurrentKafkaListenerContainerFactory.setBatchListener(true);
concurrentKafkaListenerContainerFactory.setConcurrency(nexusConsumerConcurrency);
//concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate());
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
concurrentKafkaListenerContainerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
return concurrentKafkaListenerContainerFactory;
}
监听器
@KafkaListener(topics = "${kafka.xxx.consumerTopic}", groupId = "${kafka.xxx.consumerGroup}", containerFactory = "concurrentKafkaListenerContainerFactory")
public void listen(@Payload List<String> msgs, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Integer> offsets) {
int i = -1;
for (String msg : msgs) {
++i;
LOGGER.debug("partition={}; offset={}; msg={}", partitions.get(i), offsets.get(i), msg);
String json = transform(msg);
kafkaTemplate.executeInTransaction(kt -> kt.send(producerTopic, json));
}
}
但是在产品环境中,我遇到了一个奇怪的问题。生产者每发送一条消息,偏移量就会增加两个,而消费者不会提交消费偏移量。
主题 1 的消费者偏移量
Topic1 消费者详情
生产到topic2
但是生产者发送的消息数与消费者的消息数相同。生产者的下游可以持续接收来自topic2的消息。在日志中没有发现错误或异常。
我想知道为什么消费-转换-生产工作流程看起来没问题(也保证了精确一次的语义),但是消费的偏移量没有提交并且产生的消息偏移量增量是两个而不是每个消息的 1 个。
如何解决?谢谢!
请注意您的自动提交设置。如我所见,您将其设置为 false:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
所以,在这种情况下,您需要提交 "manually" 或将自动提交设置为 true。
这就是它的设计方式。 Kafka 日志是不可变的,因此在事务结束时使用额外的 "slot" 来指示事务是提交还是回滚。这允许具有 read_committed
隔离级别的消费者跳过回滚事务。
如果您在一个事务中发布 10 条记录,您将看到偏移量增加 11。如果您只发布一条,它将增加 2。
如果你想让发布参与消费者启动的交易(exactly-once),你不应该使用executeInTransaction
;这将开始一个新的交易。
/**
* Execute some arbitrary operation(s) on the operations and return the result.
* The operations are invoked within a local transaction and do not participate
* in a global transaction (if present).
* @param callback the callback.
* @param <T> the result type.
* @return the result.
* @since 1.1
*/
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
不过,我不明白为什么消费者抵消仍不会发送到消费者启动的交易。您应该打开 DEBUG 日志记录以查看发生了什么(如果在您修复模板代码后它仍然发生)。
编辑
监听器退出时监听器容器发送给事务的消费偏移量(+1);打开提交日志,你会看到它...
@SpringBootApplication
public class So59152915Application {
public static void main(String[] args) {
SpringApplication.run(So59152915Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(id = "foo", topics = "so59152915-1", clientIdPrefix = "so59152915")
public void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
System.out.println(in + "@" + offset);
this.template.send("so59152915-2", in.toUpperCase());
Thread.sleep(2000);
}
@KafkaListener(id = "bar", topics = "so59152915-2")
public void listen2(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic1() {
return new NewTopic("so59152915-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so59152915-2", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
this.template.executeInTransaction(t -> {
IntStream.range(0, 11).forEach(i -> t.send("so59152915-1", "foo" + i));
try {
System.out.println("Hit enter to commit sends");
System.in.read();
}
catch (IOException e) {
e.printStackTrace();
}
return null;
});
};
}
}
@Component
class Configurer {
Configurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.getContainerProperties().setCommitLogLevel(Level.INFO);
}
}
和
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.auto-offset-reset=earliest
和
foo0@56
2019-12-04 10:07:18.551 INFO 55430 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=57, leaderEpoch=null, metadata=''}}
foo1@57
FOO0
2019-12-04 10:07:18.558 INFO 55430 --- [ bar-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-2-0=OffsetAndMetadata{offset=63, leaderEpoch=null, metadata=''}}
2019-12-04 10:07:20.562 INFO 55430 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=58, leaderEpoch=null, metadata=''}}
foo2@58
我在使用 Spring(boot) Kafka 的微服务中有一个消费-转换-生产工作流。我需要实现 Kafka 事务提供的 exactly-once 语义。 下面是代码片段:
配置
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
return defaultKafkaProducerFactory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
@Bean
@Qualifier("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager(KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
concurrentKafkaListenerContainerFactory.setBatchListener(true);
concurrentKafkaListenerContainerFactory.setConcurrency(nexusConsumerConcurrency);
//concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate());
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
concurrentKafkaListenerContainerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
return concurrentKafkaListenerContainerFactory;
}
监听器
@KafkaListener(topics = "${kafka.xxx.consumerTopic}", groupId = "${kafka.xxx.consumerGroup}", containerFactory = "concurrentKafkaListenerContainerFactory")
public void listen(@Payload List<String> msgs, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Integer> offsets) {
int i = -1;
for (String msg : msgs) {
++i;
LOGGER.debug("partition={}; offset={}; msg={}", partitions.get(i), offsets.get(i), msg);
String json = transform(msg);
kafkaTemplate.executeInTransaction(kt -> kt.send(producerTopic, json));
}
}
但是在产品环境中,我遇到了一个奇怪的问题。生产者每发送一条消息,偏移量就会增加两个,而消费者不会提交消费偏移量。
主题 1 的消费者偏移量
Topic1 消费者详情
生产到topic2
但是生产者发送的消息数与消费者的消息数相同。生产者的下游可以持续接收来自topic2的消息。在日志中没有发现错误或异常。
我想知道为什么消费-转换-生产工作流程看起来没问题(也保证了精确一次的语义),但是消费的偏移量没有提交并且产生的消息偏移量增量是两个而不是每个消息的 1 个。
如何解决?谢谢!
请注意您的自动提交设置。如我所见,您将其设置为 false:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
所以,在这种情况下,您需要提交 "manually" 或将自动提交设置为 true。
这就是它的设计方式。 Kafka 日志是不可变的,因此在事务结束时使用额外的 "slot" 来指示事务是提交还是回滚。这允许具有 read_committed
隔离级别的消费者跳过回滚事务。
如果您在一个事务中发布 10 条记录,您将看到偏移量增加 11。如果您只发布一条,它将增加 2。
如果你想让发布参与消费者启动的交易(exactly-once),你不应该使用executeInTransaction
;这将开始一个新的交易。
/**
* Execute some arbitrary operation(s) on the operations and return the result.
* The operations are invoked within a local transaction and do not participate
* in a global transaction (if present).
* @param callback the callback.
* @param <T> the result type.
* @return the result.
* @since 1.1
*/
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
不过,我不明白为什么消费者抵消仍不会发送到消费者启动的交易。您应该打开 DEBUG 日志记录以查看发生了什么(如果在您修复模板代码后它仍然发生)。
编辑
监听器退出时监听器容器发送给事务的消费偏移量(+1);打开提交日志,你会看到它...
@SpringBootApplication
public class So59152915Application {
public static void main(String[] args) {
SpringApplication.run(So59152915Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
@KafkaListener(id = "foo", topics = "so59152915-1", clientIdPrefix = "so59152915")
public void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
System.out.println(in + "@" + offset);
this.template.send("so59152915-2", in.toUpperCase());
Thread.sleep(2000);
}
@KafkaListener(id = "bar", topics = "so59152915-2")
public void listen2(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic1() {
return new NewTopic("so59152915-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so59152915-2", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
this.template.executeInTransaction(t -> {
IntStream.range(0, 11).forEach(i -> t.send("so59152915-1", "foo" + i));
try {
System.out.println("Hit enter to commit sends");
System.in.read();
}
catch (IOException e) {
e.printStackTrace();
}
return null;
});
};
}
}
@Component
class Configurer {
Configurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.getContainerProperties().setCommitLogLevel(Level.INFO);
}
}
和
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.auto-offset-reset=earliest
和
foo0@56
2019-12-04 10:07:18.551 INFO 55430 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=57, leaderEpoch=null, metadata=''}}
foo1@57
FOO0
2019-12-04 10:07:18.558 INFO 55430 --- [ bar-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-2-0=OffsetAndMetadata{offset=63, leaderEpoch=null, metadata=''}}
2019-12-04 10:07:20.562 INFO 55430 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=58, leaderEpoch=null, metadata=''}}
foo2@58