Reactor compose 与 flatMap

Reactor compose vs flatMap

我继续玩 Reactor,现在我看到 compose 运算符的行为与 flatMap 完全一样,我想知道是否有任何我不明白的区别。

    @Test
public void compose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .compose(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

@Test
public void flatMapVsCompose() throws InterruptedException {
    Scheduler mainThread = Schedulers.single();
    Flux.just(("old element"))
            .flatMap(element ->
                    Flux.just("new element in new thread")
                            .subscribeOn(mainThread)
                            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
            .doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
            .subscribe(System.out::println);
    Thread.sleep(1000);
}

这两个示例的行为与return相同的结果。

此致。

An excellent explanation by Dan Lew:

不同之处在于 compose() 是更高级别的抽象:它对整个流进行操作,而不是单独发出的项目。更具体地说:

  • compose() 是从流中获取原始 Observable<T> 的唯一方法。因此,影响整个流的运算符(如 subscribeOn()observeOn())需要使用 compose().

    相比之下,如果您将 subscribeOn()/observeOn() 放在 flatMap() 中,它只会影响您在 flatMap() 中创建的 Observable 而不会影响流的其余部分。

  • compose() 在您创建 Observable 流时立即执行,就好像您已经内联编写了运算符一样。 flatMap() 在其 onNext() 被调用时执行,每次被调用。换句话说,flatMap() 转换每个项目,而 compose() 转换整个流。

  • flatMap() 必然效率较低,因为每次调用 onNext() 时都必须创建一个新的 Observablecompose() 按原样对流进行操作。如果要用可重用代码替换某些运算符,请使用 compose()flatMap() 有很多用途,但这不是其中之一。

@Andrew 的解释很好。只是想添加一个示例以便更好地理解。

Flux.just("1", "2")
        .compose( stringFlux -> {
            System.out.println("In compose"); // It takes whe whole Flux as input
           return stringFlux.collectList();
        }).subscribe(System.out::println);


Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
            System.out.println("In flatMap");
            return Flux.just(Integer.parseInt(s));
        }).subscribe(System.out::println);

这会产生输出

In compose
[1, 2]
In flatMap
1
In flatMap
2

这表明 compose 适用于整个流,但 flatMap 适用于流中的单个项目