Uni<T> 在 Quarkus 中如何工作

How does Uni<T> works in Quarkus

在查看他们的官方教程后,我试图了解 Uni 在 Quarkus 框架中的行为 getting started with async guide。 在服务方法中我做了如下改动

package org.acme.getting.started.async;

import javax.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Uni;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@ApplicationScoped
public class GreetingService {

    ExecutorService executor = Executors.newFixedThreadPool(10, r -> {
        return new Thread(r, "CUSTOM_TASK_EXECUTION_THREAD");
    });


    public Uni<String> greeting(String name) {
        System.out.println("greeting Executing on Thread "+Thread.currentThread().getName());
        return Uni.createFrom().item(ioSimulation(name))
                .emitOn(executor);//Infrastructure.getDefaultExecutor()
    }

    public String ioSimulation(String param){
        System.out.println("ioSimulation Executing on Thread "+Thread.currentThread().getName());
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello "+param;

    }

}

然后我测试了 /greeting/{name} 资源,执行根本不是异步的,实际上它在同一个线程中以同步方式执行所有相关方法。

那有什么区别呢

@GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/greeting/{name}")
    public Uni<String> greeting(@PathParam String name) {
    }

@GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/greeting/{name}")
    public String greeting(@PathParam String name) {
    }

它是如何异步的?请帮助我理解它。

问题出在Uni.createFrom().item(ioSimulation(name))。在创建 Uni 之前同步调用 ioSimulation

您至少应该使用 Supplier 变体:Uni.createFrom().item(() -> ioSimulation(name))。这应该有所帮助。

除了@Ladicek 的回答,我相信你想使用 runSubscriptionOn 而不是 emitOn。请参阅 https://smallrye.io/smallrye-mutiny/guides/emit-subscription 作为参考。

以上答案均正确。我对服务 class 进行了一些更改,并且 ioSimulation(String param) 方法已卸载到自定义线程池。下面给出最终的解决方案

public class GreetingService {

    ThreadFactory threadFactory = new NameableThreadFactory("CUSTOM_TASK_EXECUTION_THREAD");
    ExecutorService executor = Executors.newFixedThreadPool(10, threadFactory);

    public Uni<String> greeting(String name) {
        System.out.println("greeting Executing on Thread "+Thread.currentThread().getName());
        return Uni.createFrom().item(()->ioSimulation(name))
                .runSubscriptionOn(executor);//Infrastructure.getDefaultExecutor()
    }


    public String ioSimulation(String param){
        System.out.println("ioSimulation Executing on Thread "+Thread.currentThread().getName());
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello "+param;

    }
}

--更新--

如果您在 item 方法中传递 supplier

emitOn(Executor executor) 也会异步工作,但方法略有不同,我认为在试验 Uni 时,this 是我的回购协议,以防有人感兴趣。