执行多次下载并等待全部完成
Execute multiple downloads and wait for all to complete
我目前正在开发一项 API 服务,该服务允许 1 个或多个用户从 S3 存储桶下载 1 个或多个项目并将内容 return 提供给用户。虽然下载没问题,但下载多个文件所花费的时间几乎是 100-150 毫秒 * 文件数。
我已经尝试了几种方法来加快速度 - parallelStream() 而不是 stream()(考虑到同时下载的数量,它处于 serious risk of running out of threads),以及 CompleteableFutures,以及甚至创建一个 ExecutorService,进行下载然后关闭池。通常我只想要一些并发任务,例如5 同时,每个请求尝试减少活动线程数。
我尝试集成 Spring @Cacheable 以将下载的文件存储到 Redis(文件是只读的)- 虽然这肯定会减少响应时间(与 100-150 毫秒相比,检索文件需要几毫秒) ,只有在先前检索到文件后,才有好处。
处理等待多个异步任务完成然后获取结果的最佳方法是什么,同时考虑到我不希望(或认为我不能)有数百个线程打开 http 连接并下载所有马上?
您担心占用并行流中默认使用的公共 fork/join 池是正确的,因为我相信它用于流之外的其他事情,例如排序操作 api.您可以为 Stream 创建您自己的 fork/join 池,而不是使用 I/O-bound 并行流使公共 fork/join 池饱和。请参阅 this question 以了解如何创建具有所需大小的临时 ForkJoinPool 和 运行 其中的并行流。
您还可以创建一个具有固定大小线程池的 ExecutorService,该线程池也将独立于公共 fork/join 池,并且会限制请求,仅使用池中的线程。它还允许您指定要专用的线程数:
ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS_FOR_DOWNLOADS);
try {
List<CompletableFuture<Path>> downloadTasks = s3Paths
.stream()
.map(s3Path -> completableFuture.supplyAsync(() -> mys3Downloader.downloadAndGetPath(s3Path), executor))
.collect(Collectors.toList());
// at this point, all requests are enqueued, and threads will be assigned as they become available
executor.shutdown(); // stops accepting requests, does not interrupt threads,
// items in queue will still get threads when available
// wait for all downloads to complete
CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[downloadTasks.size()])).join();
// at this point, all downloads are finished,
// so it's safe to shut down executor completely
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdownNow(); // important to call this when you're done with the executor.
}
在@Hank D 的带领下,您可以封装执行程序服务的创建,以确保您确实在使用上述执行程序后调用 ExecutorService::shutdownNow:
private static <VALUE> VALUE execute(
final int nThreads,
final Function<ExecutorService, VALUE> function
) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
try {
return function.apply(executorService);
} catch (final InterruptedException | ExecutionException exception) {
exception.printStackTrace();
} finally {
executorService .shutdownNow(); // important to call this when you're done with the executor service.
}
}
public static void main(final String... arguments) {
// define variables
final List<CompletableFuture<Path>> downloadTasks = execute(
MAX_THREADS_FOR_DOWNLOADS,
executor -> s3Paths
.stream()
.map(s3Path -> completableFuture.supplyAsync(
() -> mys3Downloader.downloadAndGetPath(s3Path),
executor
))
.collect(Collectors.toList())
);
// use downloadTasks
}
我目前正在开发一项 API 服务,该服务允许 1 个或多个用户从 S3 存储桶下载 1 个或多个项目并将内容 return 提供给用户。虽然下载没问题,但下载多个文件所花费的时间几乎是 100-150 毫秒 * 文件数。
我已经尝试了几种方法来加快速度 - parallelStream() 而不是 stream()(考虑到同时下载的数量,它处于 serious risk of running out of threads),以及 CompleteableFutures,以及甚至创建一个 ExecutorService,进行下载然后关闭池。通常我只想要一些并发任务,例如5 同时,每个请求尝试减少活动线程数。
我尝试集成 Spring @Cacheable 以将下载的文件存储到 Redis(文件是只读的)- 虽然这肯定会减少响应时间(与 100-150 毫秒相比,检索文件需要几毫秒) ,只有在先前检索到文件后,才有好处。
处理等待多个异步任务完成然后获取结果的最佳方法是什么,同时考虑到我不希望(或认为我不能)有数百个线程打开 http 连接并下载所有马上?
您担心占用并行流中默认使用的公共 fork/join 池是正确的,因为我相信它用于流之外的其他事情,例如排序操作 api.您可以为 Stream 创建您自己的 fork/join 池,而不是使用 I/O-bound 并行流使公共 fork/join 池饱和。请参阅 this question 以了解如何创建具有所需大小的临时 ForkJoinPool 和 运行 其中的并行流。
您还可以创建一个具有固定大小线程池的 ExecutorService,该线程池也将独立于公共 fork/join 池,并且会限制请求,仅使用池中的线程。它还允许您指定要专用的线程数:
ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS_FOR_DOWNLOADS);
try {
List<CompletableFuture<Path>> downloadTasks = s3Paths
.stream()
.map(s3Path -> completableFuture.supplyAsync(() -> mys3Downloader.downloadAndGetPath(s3Path), executor))
.collect(Collectors.toList());
// at this point, all requests are enqueued, and threads will be assigned as they become available
executor.shutdown(); // stops accepting requests, does not interrupt threads,
// items in queue will still get threads when available
// wait for all downloads to complete
CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[downloadTasks.size()])).join();
// at this point, all downloads are finished,
// so it's safe to shut down executor completely
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdownNow(); // important to call this when you're done with the executor.
}
在@Hank D 的带领下,您可以封装执行程序服务的创建,以确保您确实在使用上述执行程序后调用 ExecutorService::shutdownNow:
private static <VALUE> VALUE execute(
final int nThreads,
final Function<ExecutorService, VALUE> function
) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
try {
return function.apply(executorService);
} catch (final InterruptedException | ExecutionException exception) {
exception.printStackTrace();
} finally {
executorService .shutdownNow(); // important to call this when you're done with the executor service.
}
}
public static void main(final String... arguments) {
// define variables
final List<CompletableFuture<Path>> downloadTasks = execute(
MAX_THREADS_FOR_DOWNLOADS,
executor -> s3Paths
.stream()
.map(s3Path -> completableFuture.supplyAsync(
() -> mys3Downloader.downloadAndGetPath(s3Path),
executor
))
.collect(Collectors.toList())
);
// use downloadTasks
}