从 EventBus @ConsumeEvent 调用时,Quarkus Panache Repository 调用静默失败

Quarkus Panache Repository call silently fails when called from an EventBus @ConsumeEvent

Quarkus 1.8.3.Final

直接调用访问 PanacheRepository 的方法按预期工作,但是当通过 EventBus 调用相同的方法时,调用到达方法并执行每一行,直到到达任何存储库调用,然后静默 fails/exits 没有任何迹象表明发生了什么。

根据日志,直接调用在 Quarkus 主线程中执行,事件总线调用在 vert.x-eventloop-thread-2 中执行。

还尝试了以下步骤的组合,结果相同:

@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        // Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
        eventBus.sendAndForget("test_topic", "eventbus call");
        // This  prints the log in listEntities() and then lists all the entities in the repository.
        listEntities("direct call");
    }

    @ConsumeEvent("test_topic")
    public void listEntities(String testMessage) {
        log.info("Printing all entities via: " + testMessage);
        repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
    }

这是日志的 EventBus 部分的摘录:

2020-10-19 21:24:39,612 INFO  [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )

更新bugreport created

事实证明,EventBus 吞噬了有关阻塞调用的异常,并且像 Uni 一样返回反应性响应,甚至显式地将数据库调用包装到 Uni 中仍将使用相同的线程,而不是工作线程不出所料。

解决方案是在事件侦听器上使用 blocking=true。所以它没有设置预期的行为(正如我所想的那样),而是为阻塞调用准备事件循环..

工作代码是:

@Slf4j
@Startup
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        eventBus.sendAndForget("test_topic", "eventbus call");
    }

    @ConsumeEvent(value = "test_topic", blocking = true)
    @Transactional
    public Uni<Void> listEntities(String testMessage) {
        log.info("Printing all entities via: " + testMessage);
        try {
            repository.findAll().forEach(log::info);
            return Uni.createFrom().voidItem();
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }
}