反应堆中的地图与平面地图

map vs flatMap in reactor

我找到了很多关于 RxJava 的答案,但我想了解它在 Reactor 中的工作原理。

我目前的理解非常模糊,我倾向于认为 map 是同步的而 flatMap 是异步的,但我无法真正理解它。

这是一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  

我有文件(一个Flux<FilePart>),我想将它复制到服务器上的一些UPLOAD_ROOT

这个例子摘自一本书。

我可以将所有 .map 更改为 .flatMap,反之亦然,一切仍然有效。我想知道有什么区别。

  • map 用于同步,non-blocking,一对一转换
  • flatMap 用于异步 (non-blocking) 1-to-N 转换

差异在方法签名中可见:

  • map 需要一个 Function<T, U> 和 return 一个 Flux<U>
  • flatMap 需要一个 Function<T, Publisher<V>> 和 return 一个 Flux<V>

这是主要提示:您 可以 Function<T, Publisher<V>> 传递给 map,但它不知道如何处理 Publishers,这将导致 Flux<Publisher<V>>,一系列惰性发布者。

另一方面,flatMap 期望每个 T 有一个 Publisher<V>。它知道如何处理它:订阅它并在输出序列中传播它的元素。因此,return 类型是 Flux<V>flatMap 会将每个内部 Publisher<V> 展平为 all 的输出序列 Vs.

关于1-N方面:

对于每个 <T> 输入元素,flatMap 将其映射到 Publisher<V>。在某些情况下(例如 HTTP 请求),该发布者只会发出一个项目,在这种情况下我们非常接近异步 map.

但这是退化的情况。一般情况是 Publisher 可以发出多个元素,flatMap 也可以。

举个例子,假设您有一个反应式数据库,并且您从一系列用户 ID 进行平面映射,请求 return 是用户的 Badge 集。您最终得到所有这些用户的所有徽章中的一个 Flux<Badge>

map真的和non-blocking同步吗?

是:它在运算符应用它的方式上是同步的(一个简单的方法调用,然后运算符发出结果)并且 non-blocking 在函数本身不应该阻塞运算符的意义上调用它。换句话说,它不应该引入延迟。那是因为a Flux 整体还是异步的。如果它阻塞 mid-sequence,它将影响其余的 Flux 处理,甚至其他 Flux.

如果您的地图函数有 blocking/introduces 延迟但无法转换为 return 和 Publisher,请考虑 publishOn/subscribeOn 来抵消阻塞工作在一个单独的线程上。

flatMap 方法类似于 map 方法,主要区别在于您提供给它的供应商应该 return 一个 Mono<T>Flux<T>

使用 map 方法会导致 Mono<Mono<T>> 而使用 flatMap 会导致 Mono<T>.

例如,当您必须进行网络调用以检索数据时,它很有用,其中 java API return 是 Mono,然后另一个网络调用是需要第一个的结果。

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
  map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
  flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected

此外,它允许精确处理错误:

public UserApi {
  
  private HttpClient httpClient;
    
  Mono<User> findUser(String username) {
    String queryUrl = "http://my-api-address/users/" + username;
    
    return Mono.fromCallable(() -> httpClient.get(queryUrl)).
      flatMap(response -> {
        if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
        else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
        else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
        return Mono.just(response.data);
      });
  }
                                           
}

map 在 Reactor 内部是如何工作的。

创建 Player class.

@Data
@AllArgsConstructor
public class Player {
        String name;
        String name;
}

现在正在创建 Player class

的一些实例
Flux<Player> players = Flux.just(
        "Zahid Khan",
        "Arif Khan",
        "Obaid Sheikh")
        .map(fullname -> {
            String[] split = fullname.split("\s");
            return new Player(split[0], split[1]);
        });

StepVerifier.create(players)
          .expectNext(new Player("Zahid", "Khan"))
          .expectNext(new Player("Arif", "Khan"))
          .expectNext(new Player("Obaid", "Sheikh"))
          .verifyComplete();

What’s important to understand about the map() is that the mapping is performed synchronously, as each item is published by the source Flux. If you want to perform the mapping asynchronously, you should consider the flatMap() operation.

FlatMap 的内部工作原理。

Flux<Player> players = Flux.just(
      "Zahid Khan", 
      "Arif Khan", 
      "Obaid Sheikh")
      .flatMap(
            fullname -> 
                  Mono.just(fullname).map(p -> {
                        String[] split = p.split("\s");
                        return new Player(split[0], split[1]);
        }).subscribeOn(Scheduler.parallel()));

        List<Player> playerList = Arrays.asList(
                  new Player("Zahid", "Khan"),
                  new Player("Arif", "Khan"), 
                  new Player("Obaid", "Sheikh"));

        StepVerifier.create(players).expectNextMatches(player ->         
                playerList.contains(player))    
                        .expectNextMatches(player ->  
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .verifyComplete();

在Flatmap()内部,对Mono进行map()操作,将String转换为Player。此外,subcribeOn () 表示每个订阅都应在并行线程中进行。 在没有 subscribeOn() 的情况下,flatmap() 充当同步。

地图用于同步、non-blocking、one-to-one 转换 而 flatMap 用于异步 (non-blocking) One-to-Many 转换。