如何使用@KafkaHandler 的 EntityManager/Hibernate 交易

How to use EntityManager/Hibernate Transaction with @KafkaHandler

假设我有一个 @KafkaListener class,里面有一个 @KafkaHandler 方法,用于处理任何收到的消息并执行一些数据库操作。

我想在 class 中精细控制如何以及何时提交(或回滚)数据库更改(即手动管理数据库事务)。无论DB事务结果如何,都可以提交消费的消息偏移量。

这是我所拥有的简化版本:

@Service
@RequiredArgsConstructor
@KafkaListener(
    topics = "${kafka.topic.foo}",
    groupId = "${spring.kafka.consumer.group-id-foo}",
    containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {

  // ...
  private final EntityManager entityManager;

  @KafkaHandler
  public void handleMessage(FooMessage msg) {
    // ...
    handleDBOperations(msg);
    // ...
  }

  void handleDBOperations(msg) {
    try {
      entityManager.getTransaction().begin();

      // ...

      entityManager.getTransaction().commit();
    } catch (Exception e) {
      log.error(e.getLocalizedMessage(), e);
      entityManager.getTransaction().rollback();
    }
  }

}

收到消息并调用 entityManager.getTransaction().begin(); 时,会导致异常:

java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead

为什么不允许我在这里创建交易?

如果我删除 EntityManager 并向具有数据库操作的方法添加 @Transactional 注释(尽管这不是我想要的),则会导致另一个异常:

TransactionRequiredException Executing an update/delete query

好像完全忽略了注解。这在某种程度上与拥有自己的事务管理的 Kafka 消费者有关吗?

简而言之,我在这里做错了什么以及如何在 @KafkaHandler 方法中管理数据库事务?

感谢任何帮助。 提前致谢。

尝试使用 Springs TransactionTemplate: https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html

如果您的用例很(那么)简单,Springs 声明式事务管理应该也能让您实现您要求的行为: https://docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html/ch11s05.html