从 EventBus @ConsumeEvent 调用时,Quarkus Panache Repository 调用静默失败
Quarkus Panache Repository call silently fails when called from an EventBus @ConsumeEvent
Quarkus 1.8.3.Final
直接调用访问 PanacheRepository 的方法按预期工作,但是当通过 EventBus 调用相同的方法时,调用到达方法并执行每一行,直到到达任何存储库调用,然后静默 fails/exits 没有任何迹象表明发生了什么。
根据日志,直接调用在 Quarkus 主线程中执行,事件总线调用在 vert.x-eventloop-thread-2 中执行。
还尝试了以下步骤的组合,结果相同:
- 将 EventBus 的消费者端包装到 Mutiny Uni。
- 让消费者 return 大学。
- 使消费者显式阻塞=false(默认)。
- 正在尝试 io.vertx.core.eventbus.EventBus 和 io.vertx.mutiny.core.eventbus.EventBus 实现。
- 将消费者置于相同和不同的服务中。
- 在被调用的方法上使用@Transactional 注释。
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
// Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
eventBus.sendAndForget("test_topic", "eventbus call");
// This prints the log in listEntities() and then lists all the entities in the repository.
listEntities("direct call");
}
@ConsumeEvent("test_topic")
public void listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
}
这是日志的 EventBus 部分的摘录:
2020-10-19 21:24:39,612 INFO [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )
事实证明,EventBus 吞噬了有关阻塞调用的异常,并且像 Uni 一样返回反应性响应,甚至显式地将数据库调用包装到 Uni 中仍将使用相同的线程,而不是工作线程不出所料。
解决方案是在事件侦听器上使用 blocking=true。所以它没有设置预期的行为(正如我所想的那样),而是为阻塞调用准备事件循环..
工作代码是:
@Slf4j
@Startup
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
eventBus.sendAndForget("test_topic", "eventbus call");
}
@ConsumeEvent(value = "test_topic", blocking = true)
@Transactional
public Uni<Void> listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
try {
repository.findAll().forEach(log::info);
return Uni.createFrom().voidItem();
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
}
Quarkus 1.8.3.Final
直接调用访问 PanacheRepository 的方法按预期工作,但是当通过 EventBus 调用相同的方法时,调用到达方法并执行每一行,直到到达任何存储库调用,然后静默 fails/exits 没有任何迹象表明发生了什么。
根据日志,直接调用在 Quarkus 主线程中执行,事件总线调用在 vert.x-eventloop-thread-2 中执行。
还尝试了以下步骤的组合,结果相同:
- 将 EventBus 的消费者端包装到 Mutiny Uni。
- 让消费者 return 大学。
- 使消费者显式阻塞=false(默认)。
- 正在尝试 io.vertx.core.eventbus.EventBus 和 io.vertx.mutiny.core.eventbus.EventBus 实现。
- 将消费者置于相同和不同的服务中。
- 在被调用的方法上使用@Transactional 注释。
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
// Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
eventBus.sendAndForget("test_topic", "eventbus call");
// This prints the log in listEntities() and then lists all the entities in the repository.
listEntities("direct call");
}
@ConsumeEvent("test_topic")
public void listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
}
这是日志的 EventBus 部分的摘录:
2020-10-19 21:24:39,612 INFO [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )
事实证明,EventBus 吞噬了有关阻塞调用的异常,并且像 Uni 一样返回反应性响应,甚至显式地将数据库调用包装到 Uni 中仍将使用相同的线程,而不是工作线程不出所料。
解决方案是在事件侦听器上使用 blocking=true。所以它没有设置预期的行为(正如我所想的那样),而是为阻塞调用准备事件循环..
工作代码是:
@Slf4j
@Startup
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
eventBus.sendAndForget("test_topic", "eventbus call");
}
@ConsumeEvent(value = "test_topic", blocking = true)
@Transactional
public Uni<Void> listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
try {
repository.findAll().forEach(log::info);
return Uni.createFrom().voidItem();
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
}