spring 启动时的数千次休息调用

Thousands of rest calls with spring boot

假设我们有以下实体:ProjectRelease,这是一对多关系。

在从 SQS 队列消费事件时,发布 ID 作为事件的一部分发送,可能存在我们可能必须在我们的数据库中创建数千个发布的场景,我们必须为每个发布创建对第 3 方服务的休息调用,以获得每个版本的一些信息。

这意味着我们可能不得不进行数千次调用,在某些情况下超过 20k 次调用只是为了检索不同版本的信息并将其存储在数据库中。

显然这是不可扩展的,所以我不太确定在这种情况下该怎么做。

我知道我可能会使用 CompletableFuture,但我不确定如何将它与 spring 一起使用。

我使用的http客户端是WebClient。

有什么想法吗?

您可以通过在方法签名上方添加注释 @Transactional 使方法中的保存查询具有事务性。方法也应该是public,否则这个注解会被忽略。

至于在spring中使用CompletableFuture;您可以通过在其签名上方添加 @Async 注释并将其 return 作为 CompletableFuture 作为 return 类型来使 http 调用方法异步。您应该 return 一个完整的未来,其中包含来自 http 调用的响应值。您可以使用方法 CompletableFuture.completedFuture(yourValue) 轻松创建完整的未来。一旦异步方法执行完其代码块中的所有内容,Spring 只会 return 完成的未来。要使 @Async 正常工作,您还必须将 @EnableAsync 注释添加到您的配置 classes 之一。最重要的是,@Async 注释方法必须是 public,并且不能被同一 class 中的方法调用。如果方法是 private 是从同一个 class 中调用的 那么 @Async 注释将被忽略,而是该方法将在执行与调用方法相同的线程。

@Async 注释方法旁边,您还可以使用 parallelStream 并行执行所有 20K 个 http 调用。例如:

List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

最后,您还可以使用 ThreadPoolExecutor 并行执行 http 调用。一个例子:

List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.

//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());

//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());

    private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
        ReleaseInfo releaseInfo = null;
        try {
            releaseInfo = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            return releaseInfo;
        }
    }

确保在 ThreadPoolExecutor 上调用 shutdownNow() 以避免内存泄漏。