根据输入将来自 WebClient 并行调用的批处理结果转换为 HashMap 条目,而不会阻塞每个单独的调用

Transform batch results from WebClient parallel calls in to HashMap entries based on input without blocking every individual call

我想transform/add将来自 WebClient 的 Mono 响应转换为以输入作为键的 Map

我正在使用 WebClient 并行执行一批 REST 调用,但我不想 returning 用户列表,而是 return ID 的 HashMap 作为键和用户 return作为值从 REST 调用中编辑。

我不想在添加到 HashMap 之前阻止每个单独的调用来获取值。

有没有一种方法可以将 WebClient 的结果转换为 HashMap 条目而不影响 REST 调用的并行执行?

我尝试 doOnSuccess 回调 Mono 但不确定这是否真的是正确的方法。

当前实施

public List<<User> fetchUsers(List<Integer> ids) {
        List<Mono<User>> userMonos = new ArrayList();
        
        for (int id : ids) {
            userMonos.add(webClient.get()
                .uri("/otheruser/{id}", id)
                .retrieve()
                .bodyToMono(User.class));
        }

       List<User> list = Flux.merge(userMonos).collectList().block();
       return list; 
    }

所以预期的输出是:

HashMap<Integer, User>()

如果我无法恰当地表达预期结果,我深表歉意。如果我需要添加更多细节或使问题更清晰,请随时告诉我。

非常感谢您对此提供的帮助。与此同时,我也在努力寻找解决方案。

您将命令式代码与反应式代码混合在一起。你必须选择一种方式,并坚持下去。

如果您想要实际值而不是 MonoFlux,您可以 MUST 阻止。把它想象成一个 Future,在我们等待值出现之前,那里没有“值”。所以阻塞是 ONLY 方式。

如果我正确理解您的代码,我会执行以下操作。

public HashMap<Integer, User> fetchUsers(List<Integer> ids) {
    final Map<Integer, User> userMap = new HashMap();
    return Flux.fromIterable(ids)
                .flatMap(id -> webClient.get()
                    .uri("/otheruser/{id}", id)
                    .retrieve()
                    .bodyToMono(User.class)
                    .doOnSuccess(user -> {
                        userMap.put(id, user);
                    })
                .thenReturn(userMap)
                .block()
}

那么这段代码有什么作用?

它获取一个 id 列表并将其放入 Flux。由于我们正在使用 flatMap,因此通量将 async 同时启动所有请求。当所有请求都完成后,我们将通过将值添加到哈希图中来产生副作用。由于我们不关心 return 类型,我们使用 then 来静默忽略 return。我们告诉它 return 哈希图。最后我们调用 block 使代码实际上 运行 并等待所有请求等完成并生成最终的 hashmap。

我是在手机上写的,所以我无法检查编译器,但像这样的东西应该能让你入门。如果有人看到任何错误,请随时编辑。

如果可能,最好避免从 doOnSuccess 等副作用运算符修改外部状态。例如,在这种特殊情况下,如果外部 Map 不是线程安全的,它可能会导致并发问题。

作为更好的替代方案,您可以使用 Reactor 运算符收集到 Map

public Map<Integer, User> fetchUsers(List<Integer> ids) {
    return Flux.fromIterable(ids)
               .flatMap(id -> webClient.get()
                                       .uri("/otheruser/{id}", id)
                                       .retrieve()
                                       .bodyToMono(User.class)
                                       .map(user -> Tuples.of(id, user)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

您可以创建一个小 class 而不是 Tuple 来提高可读性。或者甚至更好,如果用户知道它的 ID,那么你可以完全省略 Tuple,你可以做类似 .collectMap(User::getId, user -> user).

的事情