在哪里创建 ExecutorServices 以及何时关闭它们

Where to create ExecutorServices and when to close them

我正在使用 Spring 和 Jersey 创建一个 REST 服务,我有一个用例,对于我收到的每个请求,我需要对上游 API 进行多次调用 (N) .

我收到一个请求,它有 n 个项目,对于每个项目,我创建一个线程来调用我的依赖项 (REST) 并处理响应。最后,我将所有响应收集在一起,维护顺序,并将它们 return 作为对客户端的单个响应。

我正在使用 Java 8 的 CompletableFuture,想知道我是否正确使用了 ExecutorService 框架。

@Component // automatic singleton by Spring
class A {
    private ExecutorService executorService = Executors.newCachedThreadPool();

    private RawResponse getRawResponse(Item item) {
        // make REST call
    }


    private Response processResponse(RawResponse rawResponse) {
        // process response
    }

    public List<Response> handleRequest(Request request) {
        List<CompletableFuture> futureResponses = new ArrayList<>();

        for(Item item : request.getItems()) {
            CompletableFuture<Response> futureResponse = CompletableFuture.supplyAsync(() -> getRawResponse(item), executorService)
            .thenApply(rawResponse -> processResponse(rawResponse))
            .handle((response, throwable) {
            if(throwable != null) { // log and return default response
            } else { return response;}});

            futureResponses.add(futureResponse);
        }

        List<Response> result = new ArrayList<>();

        for (CompletableFuture<Resource> futureResponse : futureResponses) {
                    try {
                        result.add(futureResponse.get());
                    } catch (Exception e) {

                        // log error
                    }
        }
        return result;
    }
}

我现在的问题是,我是否应该将 executorService 的创建移到正上方:

List<CompletableFuture> futureResponses = new ArrayList<>();

并在其上方调用关机:

return result;

因为此时,我并没有真正在任何地方调用关闭,因为该应用程序将始终 运行 在它的 docker 容器中。

继续创建和丢弃池是否代价高昂,或者当前的方式是否会泄漏内存?而且我认为将池静态称为私有字段 var 是多余的,因为 class 无论如何都是一个 spring bean(单例)。

任何建议将不胜感激,我还应该使用缓存线程池吗?我不确定如何估算我需要的线程数。

should I move the creation of the executorService right above?

不,你没有,你的 ExecutorService 在你的示例代码中的正确位置。将其视为线程池,您不会希望为 handleRequest 的每个方法调用初始化一个新的线程池并关闭它。当然 ExecutorService 比线程池做的更多,实际上它会在下面管理一个线程池,并为异步任务提供生命周期管理。

I am not really calling shutdown anywhere since the app will always run in it's docker container.

在大多数情况下,您会在应用程序启动时初始化 ExecutorService 并在应用程序关闭时将其关闭。所以你可以把它留在那里,因为它会在应用程序退出时关闭,或者如果你需要正常关闭,你可以添加某种 shutdown hooks

Is it costly to keep creating and discarding the pool.

有点,我们不想经常创建和丢弃 Thread,所以我们有线程池,如果你为每个方法调用 create/discard 池,有什么意义线程池。

or is the current way going to leak memory?

不需要,只要你提交的任务不泄漏内存即可。 ExecutorService本身的实现很好用

And I think calling the pool static as a private field var is redundant since the class is a spring bean anyways (singleton)

是的,你是对的。如果你想做一些定制的初始化过程,你也可以定义 ExecutorService 作为 Spring Bean 并将它注入服务 bean。

should I be using a cachedThreadPool, I am not sure how to approximate the number of threads I need.

这很难说,您需要进行一些测试才能为您的应用程序获得正确的线程数。但大多数 NIO 或 EventDriven 框架的可用内核数默认为线程数的两倍。

当您使用 Spring 时,您可能希望让它处理异步执行。

只需将 @EnableAsync 放入您的 @Configuration 类 之一即可启用方法上的 @Async 注释。

然后您可以将 getRawResponse() 更改为

@Async
private CompletableFuture<RawResponse> getRawResponse(Item item) {
    // make REST call
    return CompletableFuture.completedFuture(rawResponse);
}

(您可能需要将此方法放在单独的服务中以允许正确的代理,具体取决于您的项目中 AOP 的配置方式)

并将循环更改为简单

for(Item item : request.getItems()) {
    CompletableFuture<Response> futureResponse = getRawResponse(item)
        .thenApply(rawResponse -> processResponse(rawResponse))
        .handle((response, throwable) {
            if(throwable != null) { // log and return default response
            } else { return response;}
        });

    futureResponses.add(futureResponse);
}

如您所见,您不再需要关心服务中的执行器。

您还可以通过声明其 Spring bean 来自定义您的执行器,例如:

@SpringBootApplication
@EnableAsync
public class Application extends AsyncConfigurerSupport {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("GithubLookup-");
        executor.initialize();
        return executor;
    }
}

您甚至可以通过将其名称作为参数提供给 @Async 注释来配置多个执行器和 select 一个。

另见 Getting Started: Creating Async Methods and The @Async annotation