Quarkus 阻塞 grpc vert.x eventloop 错误

Quarkus blocking grpc vert.x eventloop error

尝试使用 grpc 在微服务与 Quarkus 1.12 之间进行通信。1.Final。我正在尝试访问我的实体,但出现 vert.x-eventloop-thread-o 错误;

(vert.x-eventloop-thread-0) Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerHalfClosed@8e09d21: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

据说在我的方法中添加@Blocking 应该可以解决这个问题,但这似乎不起作用。

@Singleton
public class FillProductService extends SourceGrpc.SourceImplBase {

    @Override
    @Blocking
    public void fillProductsWithSourceInfo(FillProductsRequest request, StreamObserver<FillProductsReply> responseObserver) {
        List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> {
            List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId, Sort.descending("upvotes")).list();
            double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
            return SourceInfo.newBuilder().setSources(sources.size())
                        .setBestPrice(bestPrice).setProductId(productId).build();
        }).collect(Collectors.toList());

        responseObserver.onNext(FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build());
        responseObserver.onCompleted();
    }
}

我也尝试过使用 Mutiny,但它似乎给出了同样的错误

将 mutiny 与 runSubscribtionOn 结合使用现在确实给了我一个回应;

@Singleton
public class FillProductService extends MutinySourceGrpc.SourceImplBase {

    @Override
    public Uni<FillProductsReply> fillProductsWithSourceInfo(FillProductsRequest request) {
        return Uni.createFrom().item(() -> {
            List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> {
                List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId).list();
                double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
                return SourceInfo.newBuilder().setSources(sources.size())
                            .setBestPrice(bestPrice).setProductId(productId).build();
            }).collect(Collectors.toList());
            return FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build();
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
        .onFailure().invoke(t -> System.out.println("Oh no! We received a failure: " + t.getMessage())
        );
    }
}