如何构建一个在工作线程中调用阻塞操作并立即回复的异步 rest 端点 (Quarkus)

How to build a async rest endpoint that calls blocking action in worker thread and replies instantly (Quarkus)

我查看了文档和 Whosebug,但没有找到完全合适的方法。 例如。这个 post 似乎非常接近: 但是,我不想在我的服务中有那么多不必要的样板代码,充其量,根本不更改服务代码。

我通常只想调用一个使用实体管理器的服务方法,因此是一个阻塞操作,但是,我想 return 立即向调用者发送一个字符串,例如“查询开始”之类的。我不需要回调对象,它只是一种即刻即弃的方法。

我试过这样的东西

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}

但它不工作 -> 错误消息return发送给呼叫者:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",

我实际上期望 quarkus 会注意相应地分配任务,也就是说,对 io 线程的 rest 调用和对工作线程的阻塞实体管理器操作。 所以我一定是用错了。

更新:

还尝试了我在 https://github.com/quarkusio/quarkus/issues/11535 中将方法主体更改为

中提出的解决方法
return Uni.createFrom()
        .item("query started")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(()-> service.startLongRunningQuery());

现在我没有收到错误,但是没有调用 service.startLongRunningQuery(),因此没有日志,也没有查询实际发送到数据库。

与 () 相同:

return Uni.createFrom()
            .item(() ->service.startLongRunningQuery()) 
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

与 () 相同:

ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

return Uni.createFrom()
                .item(() -> service.startLongRunningQuery())
                .runSubscriptionOn(executor);

知道为什么根本不调用 service.startLongRunningQuery() 以及如何实现即发即弃行为吗?假设通过 IO 线程处理休息调用,通过工作线程处理服务调用?

为此使用 EventBus https://quarkus.io/guides/reactive-event-bus

发送后忘记是正确的方法。

这取决于您是想立即return(在您的startLongRunningQuery 操作有效执行之前),还是想等到操作完成。

如果是第一种情况,使用类似:

@Inject EventBus bus;

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public void triggerQuery() {
    bus.send("some-address", "my payload");
}

@Blocking // Will be called on a worker thread
@ConsumeEvent("some-address")
public void executeQuery(String payload) {
    service.startLongRunningQuery();
}

在第二种情况下,您需要在工作线程上执行查询。

@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
   return Uni.createFrom(() -> service.startLongRunningQuery())
      .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

请注意,您需要 RESTEasy Reactive 才能工作(而不是经典的 RESTEasy)。如果您使用经典的 RESTEasy,您将需要 quarkus-resteasy-mutiny 扩展(但我​​建议使用 RESTEasy Reactive,它会更有效率)。