Quarkus 下的 Panache 为数据库更新提供超时

Panache under Quarkus giving timeout for db update

我的 Quarkus 应用程序中有以下 api:

  @PATCH
  @Path("/{id}")
  @Transactional
  public Uni<Response> updateCustomer(@RestPath Long id, MyPatchObject myBody) {
    return myService.updateCustomer(id, myBody);
  }

在我的服务中:

  public Uni<Response> updateCustomer(final long id, final MyPatchObject myBody) {
    return myRepo.findById(id) 
// some validation here
        .replaceWith(myRepo.update(id, myBody)
                           .chain(() -> myRepo.flush())
// reload to get the latest changes
                           .replaceWith(myRepo.findById(id)) 
// some other handling that persits to another repository
        .replaceWith(Response.noContent().build());
  }

还有我的存储库:

  public Uni<Integer> update(Long id, MyPatchObject params) {
    // Some handling which results in these constants:
    String query = "black_listed = :blacklisted where id = :id"
    Parameters params = ... // contains: [blacklisted-true, id-1]
    return this.update(query,params);
  }

然而,当我尝试通过我的测试调用此 api 时,我收到此超时异常

2021-10-07 12:40:28,911 ERROR [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (executor-thread-0) Request failed: java.lang.RuntimeException: java.util.concurrent.TimeoutException
    at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.executeInVertxEventLoop(AbstractJpaOperations.java:52)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
    at io.smallrye.mutiny.operators.uni.builders.DefaultUniEmitter.complete(DefaultUniEmitter.java:36)
    at io.smallrye.mutiny.groups.UniOnNull.lambda$failWith(UniOnNull.java:43)
    at io.smallrye.context.impl.wrappers.SlowContextualBiConsumer.accept(SlowContextualBiConsumer.java:21)
    at io.smallrye.mutiny.groups.UniOnItem.lambda$transformToUni(UniOnItem.java:169)
    at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:36)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:63)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
    at io.vertx.core.Future.lambda$toCompletionStage(Future.java:360)
    at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
    at io.vertx.core.Promise.complete(Promise.java:66)
    at io.vertx.core.Promise.handle(Promise.java:51)
    at io.vertx.core.Promise.handle(Promise.java:29)
    at io.vertx.core.impl.future.FutureImpl.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess[=13=](FutureBase.java:54)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:81)
    at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:173)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
    at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
    at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:287)
    at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:96)
    at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init[=13=](SocketConnectionBase.java:99)
    at io.vertx.core.net.impl.NetSocketImpl.lambda$new(NetSocketImpl.java:97)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:240)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:130)
    at io.vertx.core.net.impl.NetSocketImpl.lambda$handleMessage(NetSocketImpl.java:390)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
    at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
    at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:389)
    at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:155)
    at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:154)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write[=13=](PgEncoder.java:87)
    at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:139)
    at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
    at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.util.concurrent.TimeoutException
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1956)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2091)
    at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.executeInVertxEventLoop(AbstractJpaOperations.java:50)
    ... 86 more

我确实在 io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations

中找到了一些代码
    // FIXME: make it configurable?
    static final long TIMEOUT_MS = 5000;

这似乎是相关的,但不确定是否是问题的根本原因。

我的更新查询是 运行 更新单个字段,按条目的 ID 过滤,在 table 中只有一条记录,所以这不是数据库优化的问题当然。什么可能导致错误?

我不能确定这是导致你问题的原因,但你应该使用 Uni#replaceWithUni#invoke 进行同步,使用 Uni#callUni#chain 进行异步代码,像这样:

public Uni<Customer> updateCustomer(final long id, final MyPatchObject myBody) {
    return myRepo.findById(id)
        .invoke(customer -> myValidationMethod(customer))
        .call(() -> myRepo.update(id, myBody)) // .call propagates the incoming Uni
        .call(() -> myRepo.flush())
        .chain(() -> myRepo.findById(id)); // .chain propagates the returned Uni
}

只是为了挑剔,不要 return 来自您的服务的响应 :)

@PATCH
@Path("/{id}")
@Transactional
public Uni<Response> updateCustomer(@RestPath Long id, MyPatchObject myBody) {
    return myService.updateCustomer(id, myBody)
        .replaceWith(Response.noContent().build());
}

毕竟 Mutiny 有一个非常混乱的术语...

因为我又遇到了这个问题,不得不再次解决这个问题。

我遇到的问题是我调用 panacheRepository.persistAndFlush 生成一个 uni,然后将其传递给 .chain(myUni)

这触发了 AbstractJpOperations

的超时

相反,我需要确保 persistAndFlush uni 在 .chain 内的 lambda 中被调用,并且不能事先准备好。

这真的感觉像是无意的行为,因为 uni 的创建不应该执行任何触发超时的操作