执行多次下载并等待全部完成

Execute multiple downloads and wait for all to complete

我目前正在开发一项 API 服务,该服务允许 1 个或多个用户从 S3 存储桶下载 1 个或多个项目并将内容 return 提供给用户。虽然下载没问题,但下载多个文件所花费的时间几乎是 100-150 毫秒 * 文件数。

我已经尝试了几种方法来加快速度 - parallelStream() 而不是 stream()(考虑到同时下载的数量,它处于 serious risk of running out of threads),以及 CompleteableFutures,以及甚至创建一个 ExecutorService,进行下载然后关闭池。通常我只想要一些并发任务,例如5 同时,每个请求尝试减少活动线程数。

我尝试集成 Spring @Cacheable 以将下载的文件存储到 Redis(文件是只读的)- 虽然这肯定会减少响应时间(与 100-150 毫秒相比,检索文件需要几毫秒) ,只有在先前检索到文件后,才有好处。

处理等待多个异步任务完成然后获取结果的最佳方法是什么,同时考虑到我不希望(或认为我不能)有数百个线程打开 http 连接并下载所有马上?

您担心占用并行流中默认使用的公共 fork/join 池是正确的,因为我相信它用于流之外的其他事情,例如排序操作 api.您可以为 Stream 创建您自己的 fork/join 池,而不是使用 I/O-bound 并行流使公共 fork/join 池饱和。请参阅 this question 以了解如何创建具有所需大小的临时 ForkJoinPool 和 运行 其中的并行流。

您还可以创建一个具有固定大小线程池的 ExecutorService,该线程池也将独立于公共 fork/join 池,并且会限制请求,仅使用池中的线程。它还允许您指定要专用的线程数:

ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS_FOR_DOWNLOADS);
try {
    List<CompletableFuture<Path>> downloadTasks = s3Paths
            .stream()
            .map(s3Path -> completableFuture.supplyAsync(() -> mys3Downloader.downloadAndGetPath(s3Path), executor))
            .collect(Collectors.toList());    

        // at this point, all requests are enqueued, and threads will be assigned as they become available      

        executor.shutdown();    // stops accepting requests, does not interrupt threads, 
                                // items in queue will still get threads when available

        // wait for all downloads to complete
        CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[downloadTasks.size()])).join();

        // at this point, all downloads are finished, 
        // so it's safe to shut down executor completely

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdownNow(); // important to call this when you're done with the executor.
    }

在@Hank D 的带领下,您可以封装执行程序服务的创建,以确保您确实在使用上述执行程序后调用 ExecutorService::shutdownNow:

private static <VALUE> VALUE execute(
  final int nThreads,
  final Function<ExecutorService, VALUE> function
) {
  ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
  try {
    return function.apply(executorService);
  } catch (final InterruptedException | ExecutionException exception) {
    exception.printStackTrace();
  } finally {
    executorService .shutdownNow(); // important to call this when you're done with the executor service.
  }
}

public static void main(final String... arguments) {
  // define variables
  final List<CompletableFuture<Path>> downloadTasks = execute(
    MAX_THREADS_FOR_DOWNLOADS,
    executor -> s3Paths
      .stream()
      .map(s3Path -> completableFuture.supplyAsync(
        () -> mys3Downloader.downloadAndGetPath(s3Path),
        executor
      ))
      .collect(Collectors.toList())
  );
  // use downloadTasks
}