Reactor compose 与 flatMap
Reactor compose vs flatMap
我继续玩 Reactor,现在我看到 compose
运算符的行为与 flatMap
完全一样,我想知道是否有任何我不明白的区别。
@Test
public void compose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.compose(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
@Test
public void flatMapVsCompose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.flatMap(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
这两个示例的行为与return相同的结果。
此致。
An excellent explanation by Dan Lew:
不同之处在于 compose()
是更高级别的抽象:它对整个流进行操作,而不是单独发出的项目。更具体地说:
compose()
是从流中获取原始 Observable<T>
的唯一方法。因此,影响整个流的运算符(如 subscribeOn()
和 observeOn()
)需要使用 compose()
.
相比之下,如果您将 subscribeOn()
/observeOn()
放在 flatMap()
中,它只会影响您在 flatMap()
中创建的 Observable
而不会影响流的其余部分。
compose()
在您创建 Observable
流时立即执行,就好像您已经内联编写了运算符一样。 flatMap()
在其 onNext()
被调用时执行,每次被调用。换句话说,flatMap()
转换每个项目,而 compose()
转换整个流。
flatMap()
必然效率较低,因为每次调用 onNext()
时都必须创建一个新的 Observable
。 compose()
按原样对流进行操作。如果要用可重用代码替换某些运算符,请使用 compose()
。 flatMap()
有很多用途,但这不是其中之一。
@Andrew 的解释很好。只是想添加一个示例以便更好地理解。
Flux.just("1", "2")
.compose( stringFlux -> {
System.out.println("In compose"); // It takes whe whole Flux as input
return stringFlux.collectList();
}).subscribe(System.out::println);
Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
System.out.println("In flatMap");
return Flux.just(Integer.parseInt(s));
}).subscribe(System.out::println);
这会产生输出
In compose
[1, 2]
In flatMap
1
In flatMap
2
这表明 compose
适用于整个流,但 flatMap
适用于流中的单个项目
我继续玩 Reactor,现在我看到 compose
运算符的行为与 flatMap
完全一样,我想知道是否有任何我不明白的区别。
@Test
public void compose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.compose(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
@Test
public void flatMapVsCompose() throws InterruptedException {
Scheduler mainThread = Schedulers.single();
Flux.just(("old element"))
.flatMap(element ->
Flux.just("new element in new thread")
.subscribeOn(mainThread)
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName())))
.doOnNext(value -> System.out.println("Thread:" + Thread.currentThread().getName()))
.subscribe(System.out::println);
Thread.sleep(1000);
}
这两个示例的行为与return相同的结果。
此致。
An excellent explanation by Dan Lew:
不同之处在于 compose()
是更高级别的抽象:它对整个流进行操作,而不是单独发出的项目。更具体地说:
compose()
是从流中获取原始Observable<T>
的唯一方法。因此,影响整个流的运算符(如subscribeOn()
和observeOn()
)需要使用compose()
.相比之下,如果您将
subscribeOn()
/observeOn()
放在flatMap()
中,它只会影响您在flatMap()
中创建的Observable
而不会影响流的其余部分。compose()
在您创建Observable
流时立即执行,就好像您已经内联编写了运算符一样。flatMap()
在其onNext()
被调用时执行,每次被调用。换句话说,flatMap()
转换每个项目,而compose()
转换整个流。flatMap()
必然效率较低,因为每次调用onNext()
时都必须创建一个新的Observable
。compose()
按原样对流进行操作。如果要用可重用代码替换某些运算符,请使用compose()
。flatMap()
有很多用途,但这不是其中之一。
@Andrew 的解释很好。只是想添加一个示例以便更好地理解。
Flux.just("1", "2")
.compose( stringFlux -> {
System.out.println("In compose"); // It takes whe whole Flux as input
return stringFlux.collectList();
}).subscribe(System.out::println);
Flux.just("1", "2").flatMap(s -> { //Input to the anonymous function is individual items in stream
System.out.println("In flatMap");
return Flux.just(Integer.parseInt(s));
}).subscribe(System.out::println);
这会产生输出
In compose
[1, 2]
In flatMap
1
In flatMap
2
这表明 compose
适用于整个流,但 flatMap
适用于流中的单个项目