Java Stream 与 Flux fromIterable

Java Stream vs Flux fromIterable

我有一个用户名列表,想在不阻塞主线程的情况下从远程服务获取用户详细信息。我正在使用 Spring 的反应式客户端 WebClient。对于响应,我得到 Mono 然后订阅它并打印结果。

private Mono<User> getUser(String username) {
    return webClient
            .get()
            .uri(uri + "/users/" + username)
            .retrieve()
            .bodyToMono(User.class)
            .doOnError(e -> 
                logger.error("Error on retrieveing a user details {}", username));
}

我通过两种方式完成了任务:

使用Java stream

usernameList.stream()
          .map(this::getUser)
          .forEach(mono ->
                mono.subscribe(System.out::println));

使用Flux.fromIterable:

Flux.fromIterable(usernameList)
        .map(this::getUser)
        .subscribe(mono ->
                mono.subscribe(System.out::println));

看来主线程在这两个方面都没有被阻塞。 在这种情况下 Java StreamFlux.fromIterable 有什么区别?如果两者都在做同样的事情,建议使用哪一个?

两种变体之间没有太大差异。 Flux.fromIterable 变体可能会为您提供更多选项和对 concurrency/retries 等的控制 - 但在这种情况下并非如此,因为在这里调用 subscribe 达不到目的。

您的问题缺少有关您正在构建的应用程序类型以及进行这些调用的背景信息。如果您正在构建 Web 应用程序并且在请求处理期间调用它,或者是批处理应用程序 - 意见可能会有所不同。

一般来说,我认为应用程序应该远离调用 subscribe,因为它会断开该管道的处理与应用程序其余部分的连接:如果发生异常,您可能无法报告它,因为用于发送该错误消息的资源此时可能已经消失。或者应用程序可能正在关闭,您无法让它等待该任务完成。

如果您正在构建一个应用程序,它想要开始一些工作并且其结果对当前操作没有用(即,在当前操作的生命周期内该工作是否完成并不重要),那么 subscribe 可能是一个选项。

在那种情况下,我会尝试将所有操作组合在一个 Mono<Void> 操作中,然后触发该工作:

Mono<Void> logUsers = Flux.fromIterable(userNameList)
    .map(name -> getUser(name))
    .doOnNext(user -> System.out.println(user)) // assuming this is non I/O work
    .then();
logUsers.subscribe(...);

如果您担心在 Web 应用程序中使用服务器线程,那么情况就大不相同了 - 您可能希望获得该操作的结果以将某些内容写入 HTTP 响应。通过调用订阅,这两个任务现在都已断开连接,并且在工作完成时 HTTP 响应可能早已消失(写入响应时会出现错误)。

在这种情况下,您应该将操作与 Reactor 运算符链接起来。