如何使用@KafkaHandler 的 EntityManager/Hibernate 交易
How to use EntityManager/Hibernate Transaction with @KafkaHandler
假设我有一个 @KafkaListener
class,里面有一个 @KafkaHandler
方法,用于处理任何收到的消息并执行一些数据库操作。
我想在 class 中精细控制如何以及何时提交(或回滚)数据库更改(即手动管理数据库事务)。无论DB事务结果如何,都可以提交消费的消息偏移量。
这是我所拥有的简化版本:
@Service
@RequiredArgsConstructor
@KafkaListener(
topics = "${kafka.topic.foo}",
groupId = "${spring.kafka.consumer.group-id-foo}",
containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {
// ...
private final EntityManager entityManager;
@KafkaHandler
public void handleMessage(FooMessage msg) {
// ...
handleDBOperations(msg);
// ...
}
void handleDBOperations(msg) {
try {
entityManager.getTransaction().begin();
// ...
entityManager.getTransaction().commit();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
entityManager.getTransaction().rollback();
}
}
}
收到消息并调用 entityManager.getTransaction().begin();
时,会导致异常:
java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead
为什么不允许我在这里创建交易?
如果我删除 EntityManager
并向具有数据库操作的方法添加 @Transactional
注释(尽管这不是我想要的),则会导致另一个异常:
TransactionRequiredException Executing an update/delete query
好像完全忽略了注解。这在某种程度上与拥有自己的事务管理的 Kafka 消费者有关吗?
简而言之,我在这里做错了什么以及如何在 @KafkaHandler
方法中管理数据库事务?
感谢任何帮助。
提前致谢。
尝试使用 Springs TransactionTemplate:
https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html
如果您的用例很(那么)简单,Springs 声明式事务管理应该也能让您实现您要求的行为:
https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html
假设我有一个 @KafkaListener
class,里面有一个 @KafkaHandler
方法,用于处理任何收到的消息并执行一些数据库操作。
我想在 class 中精细控制如何以及何时提交(或回滚)数据库更改(即手动管理数据库事务)。无论DB事务结果如何,都可以提交消费的消息偏移量。
这是我所拥有的简化版本:
@Service
@RequiredArgsConstructor
@KafkaListener(
topics = "${kafka.topic.foo}",
groupId = "${spring.kafka.consumer.group-id-foo}",
containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {
// ...
private final EntityManager entityManager;
@KafkaHandler
public void handleMessage(FooMessage msg) {
// ...
handleDBOperations(msg);
// ...
}
void handleDBOperations(msg) {
try {
entityManager.getTransaction().begin();
// ...
entityManager.getTransaction().commit();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
entityManager.getTransaction().rollback();
}
}
}
收到消息并调用 entityManager.getTransaction().begin();
时,会导致异常:
java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead
为什么不允许我在这里创建交易?
如果我删除 EntityManager
并向具有数据库操作的方法添加 @Transactional
注释(尽管这不是我想要的),则会导致另一个异常:
TransactionRequiredException Executing an update/delete query
好像完全忽略了注解。这在某种程度上与拥有自己的事务管理的 Kafka 消费者有关吗?
简而言之,我在这里做错了什么以及如何在 @KafkaHandler
方法中管理数据库事务?
感谢任何帮助。 提前致谢。
尝试使用 Springs TransactionTemplate: https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html
如果您的用例很(那么)简单,Springs 声明式事务管理应该也能让您实现您要求的行为: https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html