如何在 Quarkus-Kafka 消费者方法中执行 JPA 实体管理器操作
How to execute JPA Entity manager operations inside Quarkus-Kafka consumer method
亲爱的,我想根据 Kafka 主题收到的消息更新我的域实体。我正在使用 Quarkus latest 和 Smallrye reactive messaging with Kafka。 Pub-sub 模型对我来说工作正常,但在消费者方法中我无法使用 entityManager 或 HibernatePanache 更新我的实体。
每当我尝试在消费者消息中使用 entityManager 代码时,都会抛出异常并静默处理。这是我的消费者代码:
@Transactional
@Incoming("new-payment")
public CompletionStage<Void> updateTotalBuyers(String paymentEvent) {
return CompletableFuture.runAsync(() -> {
PaymentEvent event = jsonb.fromJson(paymentEvent, PaymentEvent.class);
TypedQuery<Book> query = em.createQuery("SELECT b FROM Book b where b.isbn=:isbn", Book.class);
query.setParameter("isbn", event.getIsbn());
Book book = query.getSingleResult();
book.setTotalBuyers(book.getTotalBuyers() + 1);
em.merge(book);
});
}
如果有人有解决我的问题的有效代码片段,那就太好了。另外,如何打印静默异常以进行进一步调试?
更新:
我用 try/catch 块包围了代码,并抛出了以下异常:
javax.enterprise.context.ContextNotActiveException: interface javax.enterprise.context.RequestScoped
at io.quarkus.hibernate.orm.runtime.RequestScopedEntityManagerHolder_ClientProxy.arc$delegate(RequestScopedEntityManagerHolder_ClientProxy.zig:83)
at io.quarkus.hibernate.orm.runtime.RequestScopedEntityManagerHolder_ClientProxy.getOrCreateEntityManager(RequestScopedEntityManagerHolder_ClientProxy.zig:191)
at io.quarkus.hibernate.orm.runtime.entitymanager.TransactionScopedEntityManager.getEntityManager(TransactionScopedEntityManager.java:78)
at io.quarkus.hibernate.orm.runtime.entitymanager.TransactionScopedEntityManager.createQuery(TransactionScopedEntityManager.java:317)
at io.quarkus.hibernate.orm.runtime.entitymanager.ForwardingEntityManager.createQuery(ForwardingEntityManager.java:142)
at io.quarkus.hibernate.orm.panache.runtime.JpaOperations.find(JpaOperations.java:208)
at io.quarkus.hibernate.orm.panache.runtime.JpaOperations.find(JpaOperations.java:200)
at org.ibra.ebs.book.model.Book.find(Book.java)
at org.ibra.ebs.book.service.BookService.lambda$updateTotalBuyers[=13=](BookService.java:106)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
我在 class 和方法上都添加了注释 @ActivateRequestContext
,但没有成功。
更新:我尝试使用
提升上下文传播
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
</dependency>
一些上下文传播 classes 也抛出了同样的异常(这意味着它正在被激活)。
此致。
在 updateTotalBuyers
方法的 class 和 @Transactional
注释上添加 @ApplicationScoped
和 @ActivateRequestContext
这两个注释。
干杯!
终于成功了 :),是的,答案是围绕上下文传播,这就是我所做的:
@Transactional
@Incoming("new-payment")
public CompletionStage<?> updateTotalBuyers(Message<String> paymentEvent) {
//TODO: move to CDI producer
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
//TODO: move to CDI producer
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
return executor.runAsync(threadContext.contextualRunnable(() -> {
try {
log.info("Into update total buyers");
PaymentEvent event = jsonb.fromJson(paymentEvent.getPayload(), PaymentEvent.class);
Book book = Book.find("isbn", event.getIsbn()).singleResult();
book.totalBuyers++;
book.persist();
log.info("Total books {}", book.totalBuyers);
} catch(Exception e) {
log.error("Something wrong happened !!!", e);
} finally {
paymentEvent.ack();
}
}));
}
我会努力让它更标准。
我在使用 Quarkus + 单线程执行器收集 AWS SQS 消息时遇到了类似的问题。
- 初始-错误方法:
private final ExecutorService scheduler = Executors.newSingleThreadExecutor();
scheduler.submit(/*MyRunnable*/)
- 解决方案:注入由quarkus提供的ManagedExecutor:
@Inject
ManagedExecutor managedExecutor;
managedExecutor.submit(/*MyRunnable*/);
在您的特定情况下,您可以 将以下代码 替换为注入 ManagedExecutor.
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
//TODO: move to CDI producer
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
亲爱的,我想根据 Kafka 主题收到的消息更新我的域实体。我正在使用 Quarkus latest 和 Smallrye reactive messaging with Kafka。 Pub-sub 模型对我来说工作正常,但在消费者方法中我无法使用 entityManager 或 HibernatePanache 更新我的实体。
每当我尝试在消费者消息中使用 entityManager 代码时,都会抛出异常并静默处理。这是我的消费者代码:
@Transactional
@Incoming("new-payment")
public CompletionStage<Void> updateTotalBuyers(String paymentEvent) {
return CompletableFuture.runAsync(() -> {
PaymentEvent event = jsonb.fromJson(paymentEvent, PaymentEvent.class);
TypedQuery<Book> query = em.createQuery("SELECT b FROM Book b where b.isbn=:isbn", Book.class);
query.setParameter("isbn", event.getIsbn());
Book book = query.getSingleResult();
book.setTotalBuyers(book.getTotalBuyers() + 1);
em.merge(book);
});
}
如果有人有解决我的问题的有效代码片段,那就太好了。另外,如何打印静默异常以进行进一步调试?
更新: 我用 try/catch 块包围了代码,并抛出了以下异常:
javax.enterprise.context.ContextNotActiveException: interface javax.enterprise.context.RequestScoped at io.quarkus.hibernate.orm.runtime.RequestScopedEntityManagerHolder_ClientProxy.arc$delegate(RequestScopedEntityManagerHolder_ClientProxy.zig:83) at io.quarkus.hibernate.orm.runtime.RequestScopedEntityManagerHolder_ClientProxy.getOrCreateEntityManager(RequestScopedEntityManagerHolder_ClientProxy.zig:191) at io.quarkus.hibernate.orm.runtime.entitymanager.TransactionScopedEntityManager.getEntityManager(TransactionScopedEntityManager.java:78) at io.quarkus.hibernate.orm.runtime.entitymanager.TransactionScopedEntityManager.createQuery(TransactionScopedEntityManager.java:317) at io.quarkus.hibernate.orm.runtime.entitymanager.ForwardingEntityManager.createQuery(ForwardingEntityManager.java:142) at io.quarkus.hibernate.orm.panache.runtime.JpaOperations.find(JpaOperations.java:208) at io.quarkus.hibernate.orm.panache.runtime.JpaOperations.find(JpaOperations.java:200) at org.ibra.ebs.book.model.Book.find(Book.java) at org.ibra.ebs.book.service.BookService.lambda$updateTotalBuyers[=13=](BookService.java:106) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1618) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
我在 class 和方法上都添加了注释 @ActivateRequestContext
,但没有成功。
更新:我尝试使用
提升上下文传播<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
</dependency>
一些上下文传播 classes 也抛出了同样的异常(这意味着它正在被激活)。
此致。
在 updateTotalBuyers
方法的 class 和 @Transactional
注释上添加 @ApplicationScoped
和 @ActivateRequestContext
这两个注释。
干杯!
终于成功了 :),是的,答案是围绕上下文传播,这就是我所做的:
@Transactional
@Incoming("new-payment")
public CompletionStage<?> updateTotalBuyers(Message<String> paymentEvent) {
//TODO: move to CDI producer
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
//TODO: move to CDI producer
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
return executor.runAsync(threadContext.contextualRunnable(() -> {
try {
log.info("Into update total buyers");
PaymentEvent event = jsonb.fromJson(paymentEvent.getPayload(), PaymentEvent.class);
Book book = Book.find("isbn", event.getIsbn()).singleResult();
book.totalBuyers++;
book.persist();
log.info("Total books {}", book.totalBuyers);
} catch(Exception e) {
log.error("Something wrong happened !!!", e);
} finally {
paymentEvent.ack();
}
}));
}
我会努力让它更标准。
我在使用 Quarkus + 单线程执行器收集 AWS SQS 消息时遇到了类似的问题。
- 初始-错误方法:
private final ExecutorService scheduler = Executors.newSingleThreadExecutor();
scheduler.submit(/*MyRunnable*/)
- 解决方案:注入由quarkus提供的ManagedExecutor:
@Inject
ManagedExecutor managedExecutor;
managedExecutor.submit(/*MyRunnable*/);
在您的特定情况下,您可以 将以下代码 替换为注入 ManagedExecutor.
ManagedExecutor executor = ManagedExecutor.builder()
.maxAsync(5)
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();
//TODO: move to CDI producer
ThreadContext threadContext = ThreadContext.builder()
.propagated(ThreadContext.CDI,
ThreadContext.TRANSACTION)
.build();