如何在 spring 启动应用程序中使用 spring 事务配置 kafka 事务管理器
How to configure kafka transaction manager with spring transaction in spring boot app
我在 spring 引导应用程序中使用 Kafka。我想在一笔交易中执行操作,如下所示。
listen(){
produce()
saveInDb()
}
和
operation(){
saveInDB()
produce()
}
我使用以下配置启用了 Kafka 事务
spring:
kafka:
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
producer:
transaction-id-prefix: tx-
consumer:
enable-auto-commit: false
isolation-level: read_committed
并使用此配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
但是我在尝试使用 spring @Transactional 注释时出现错误
@Transactional
operation(){
saveInDB()
produce()
}
No bean named 'transactionManager' available: No matching TransactionManager bean found for qualifier 'transactionManager' - neither qualifier match nor bean name match!
我在这里遵循了 spring 文档
https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager
我在配置中缺少什么?
我没有在配置中定义 transactionManager bean。
Spring 无法找到它,因为 KafkaTransactionManager
扩展了 AbstractPlatformTransactionManager
并且 JpaTransactionManager
也扩展了相同的 class.
将这个 bean 定义为主 bean 解决了这个问题。
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
Kafka 事务与其链接,KafkaTemplate
将事务与事务管理器同步。
引用https://docs.spring.io/spring-kafka/reference/html/#transactions
我在 spring 引导应用程序中使用 Kafka。我想在一笔交易中执行操作,如下所示。
listen(){
produce()
saveInDb()
}
和
operation(){
saveInDB()
produce()
}
我使用以下配置启用了 Kafka 事务
spring:
kafka:
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
producer:
transaction-id-prefix: tx-
consumer:
enable-auto-commit: false
isolation-level: read_committed
并使用此配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-");
return factory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
但是我在尝试使用 spring @Transactional 注释时出现错误
@Transactional
operation(){
saveInDB()
produce()
}
No bean named 'transactionManager' available: No matching TransactionManager bean found for qualifier 'transactionManager' - neither qualifier match nor bean name match!
我在这里遵循了 spring 文档 https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager
我在配置中缺少什么?
我没有在配置中定义 transactionManager bean。
Spring 无法找到它,因为 KafkaTransactionManager
扩展了 AbstractPlatformTransactionManager
并且 JpaTransactionManager
也扩展了相同的 class.
将这个 bean 定义为主 bean 解决了这个问题。
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
Kafka 事务与其链接,KafkaTemplate
将事务与事务管理器同步。
引用https://docs.spring.io/spring-kafka/reference/html/#transactions