spring 启动时的数千次休息调用
Thousands of rest calls with spring boot
假设我们有以下实体:Project
和 Release
,这是一对多关系。
在从 SQS 队列消费事件时,发布 ID 作为事件的一部分发送,可能存在我们可能必须在我们的数据库中创建数千个发布的场景,我们必须为每个发布创建对第 3 方服务的休息调用,以获得每个版本的一些信息。
这意味着我们可能不得不进行数千次调用,在某些情况下超过 20k 次调用只是为了检索不同版本的信息并将其存储在数据库中。
显然这是不可扩展的,所以我不太确定在这种情况下该怎么做。
我知道我可能会使用 CompletableFuture,但我不确定如何将它与 spring 一起使用。
我使用的http客户端是WebClient。
有什么想法吗?
您可以通过在方法签名上方添加注释 @Transactional
使方法中的保存查询具有事务性。方法也应该是public,否则这个注解会被忽略。
至于在spring中使用CompletableFuture
;您可以通过在其签名上方添加 @Async
注释并将其 return 作为 CompletableFuture
作为 return 类型来使 http 调用方法异步。您应该 return 一个完整的未来,其中包含来自 http 调用的响应值。您可以使用方法 CompletableFuture.completedFuture(yourValue)
轻松创建完整的未来。一旦异步方法执行完其代码块中的所有内容,Spring 只会 return 完成的未来。要使 @Async
正常工作,您还必须将 @EnableAsync
注释添加到您的配置 classes 之一。最重要的是,@Async
注释方法必须是 public
,并且不能被同一 class 中的方法调用。如果方法是 private
或 是从同一个 class 中调用的 那么 @Async
注释将被忽略,而是该方法将在执行与调用方法相同的线程。
在 @Async
注释方法旁边,您还可以使用 parallelStream
并行执行所有 20K 个 http 调用。例如:
List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
最后,您还可以使用 ThreadPoolExecutor
并行执行 http 调用。一个例子:
List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.
//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());
//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());
private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
ReleaseInfo releaseInfo = null;
try {
releaseInfo = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
return releaseInfo;
}
}
确保在 ThreadPoolExecutor
上调用 shutdownNow()
以避免内存泄漏。
假设我们有以下实体:Project
和 Release
,这是一对多关系。
在从 SQS 队列消费事件时,发布 ID 作为事件的一部分发送,可能存在我们可能必须在我们的数据库中创建数千个发布的场景,我们必须为每个发布创建对第 3 方服务的休息调用,以获得每个版本的一些信息。
这意味着我们可能不得不进行数千次调用,在某些情况下超过 20k 次调用只是为了检索不同版本的信息并将其存储在数据库中。
显然这是不可扩展的,所以我不太确定在这种情况下该怎么做。
我知道我可能会使用 CompletableFuture,但我不确定如何将它与 spring 一起使用。
我使用的http客户端是WebClient。
有什么想法吗?
您可以通过在方法签名上方添加注释 @Transactional
使方法中的保存查询具有事务性。方法也应该是public,否则这个注解会被忽略。
至于在spring中使用CompletableFuture
;您可以通过在其签名上方添加 @Async
注释并将其 return 作为 CompletableFuture
作为 return 类型来使 http 调用方法异步。您应该 return 一个完整的未来,其中包含来自 http 调用的响应值。您可以使用方法 CompletableFuture.completedFuture(yourValue)
轻松创建完整的未来。一旦异步方法执行完其代码块中的所有内容,Spring 只会 return 完成的未来。要使 @Async
正常工作,您还必须将 @EnableAsync
注释添加到您的配置 classes 之一。最重要的是,@Async
注释方法必须是 public
,并且不能被同一 class 中的方法调用。如果方法是 private
或 是从同一个 class 中调用的 那么 @Async
注释将被忽略,而是该方法将在执行与调用方法相同的线程。
在 @Async
注释方法旁边,您还可以使用 parallelStream
并行执行所有 20K 个 http 调用。例如:
List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
最后,您还可以使用 ThreadPoolExecutor
并行执行 http 调用。一个例子:
List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.
//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());
//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());
private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
ReleaseInfo releaseInfo = null;
try {
releaseInfo = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
return releaseInfo;
}
}
确保在 ThreadPoolExecutor
上调用 shutdownNow()
以避免内存泄漏。