如何在由 KafkaTemplate 创建的事务中发送 Kafka 偏移量?
How to send Kafka offsets in transaction, which is created by KafkaTemplate?
我想实施 read-process-write
模式 - https://www.confluent.io/blog/transactions-apache-kafka/。所以,我需要消费记录,处理它们,然后提交消耗的偏移量。
我使用 org.apache.kafka.clients.consumer.KafkaConsumer
来消费消息。我的意思是,它不是 spring 相关的消费者。
我使用 org.springframework.kafka.core.KafkaTemplate
来生成消息。我这样创建它的 bean:
@Bean
public Map<String, Object> producerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
@Bean
public DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory() {
DefaultKafkaProducerFactory<String, String> kafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
kafkaProducerFactory.setTransactionIdPrefix("transaction-id-prefix");
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory) {
return new KafkaTemplate<>(defaultKafkaProducerFactory);
}
我生成这样的结果消息:
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_IN_MS));
List<List<String>> outputMessages = produceOutput(consumerRecords);
kafkaTemplate.executeInTransaction(kafkaProducer -> {
for (List<String> resultTasks : outputMessages) {
for (String resultTask : resultTasks) {
kafkaProducer.send("topic", "key", resultTask);
}
}
kafkaProducer.sendOffsetsToTransaction(getOffsetsForCommit(consumerRecords), "consumerGroupId");
return true;
});
最后,我有这个错误:
java.lang.IllegalArgumentException: No transaction in process
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.kafka.core.KafkaTemplate.sendOffsetsToTransaction(KafkaTemplate.java:345)
此方法抛出异常:
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(this.producerFactory);
Assert.isTrue(resourceHolder != null, "No transaction in process"); // here
if (resourceHolder.getProducer() != null) {
resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
}
}
那么,如何正确地提交这些偏移量呢?
这是一个错误; sendOffsetsToTransaction()
在 executeInTransaction
中不起作用 - 它假定 Spring 事务绑定到线程。
作为 work-around,您可以在方法上使用 @Transactional
或使用带有 KafkaTransactionManager
的交易模板来启动 Spring 交易,而不是使用 executeInTransaction()
.
TransactionTemplate tt = new TransactionTemplate(tm);
...
this.tt.execute(s -> {
template.send(...);
template.sendOffsetsToTransaction(...);
return null;
});
请打开 GitHub Issue,我们会解决这个问题。
我想实施 read-process-write
模式 - https://www.confluent.io/blog/transactions-apache-kafka/。所以,我需要消费记录,处理它们,然后提交消耗的偏移量。
我使用 org.apache.kafka.clients.consumer.KafkaConsumer
来消费消息。我的意思是,它不是 spring 相关的消费者。
我使用 org.springframework.kafka.core.KafkaTemplate
来生成消息。我这样创建它的 bean:
@Bean
public Map<String, Object> producerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
@Bean
public DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory() {
DefaultKafkaProducerFactory<String, String> kafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
kafkaProducerFactory.setTransactionIdPrefix("transaction-id-prefix");
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory) {
return new KafkaTemplate<>(defaultKafkaProducerFactory);
}
我生成这样的结果消息:
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_IN_MS));
List<List<String>> outputMessages = produceOutput(consumerRecords);
kafkaTemplate.executeInTransaction(kafkaProducer -> {
for (List<String> resultTasks : outputMessages) {
for (String resultTask : resultTasks) {
kafkaProducer.send("topic", "key", resultTask);
}
}
kafkaProducer.sendOffsetsToTransaction(getOffsetsForCommit(consumerRecords), "consumerGroupId");
return true;
});
最后,我有这个错误:
java.lang.IllegalArgumentException: No transaction in process
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.kafka.core.KafkaTemplate.sendOffsetsToTransaction(KafkaTemplate.java:345)
此方法抛出异常:
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(this.producerFactory);
Assert.isTrue(resourceHolder != null, "No transaction in process"); // here
if (resourceHolder.getProducer() != null) {
resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
}
}
那么,如何正确地提交这些偏移量呢?
这是一个错误; sendOffsetsToTransaction()
在 executeInTransaction
中不起作用 - 它假定 Spring 事务绑定到线程。
作为 work-around,您可以在方法上使用 @Transactional
或使用带有 KafkaTransactionManager
的交易模板来启动 Spring 交易,而不是使用 executeInTransaction()
.
TransactionTemplate tt = new TransactionTemplate(tm);
...
this.tt.execute(s -> {
template.send(...);
template.sendOffsetsToTransaction(...);
return null;
});
请打开 GitHub Issue,我们会解决这个问题。