如何急切合并两个Flux?
How to eagerly merge two Flux?
Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));
Flux<Long> flux2 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 1)
.doOnNext(aLong -> System.out.println("flux 2 : " + aLong));
Flux.merge(flux1, flux2)
.doOnNext(System.out::println)
.then()
.block();
创建两个Flux<Long>
像上面的代码
flux1 创建偶数流 (0,2,4,6,8 ...)
flux2 创建奇数流 (1,3,5,7,9 ...)
我预计合并这 2 个 flux1 和 flux2 时会像
0,1,2,3,4 ...
或0,2,1,3,4..
取决于计算能力
但总是花费 flux1 和 flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19
如何订阅多个流量热切事件?
两个流 运行 在同一线程上。当您订阅时 flux1
开始推送数据直到完成。只有这样线程才有空 flux2
才能继续。 merge
运算符按照它们到达的顺序发出值。它不会在第一个和第二个流之间切换。
如果您想要流运行并发,您需要运行它们在不同的线程上,例如通过使用 publishOn
运算符。
Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.publishOn(Schedulers.newSingle("thread-x")
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));
Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));
Flux<Long> flux2 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.filter(aLong -> aLong % 2 == 1)
.doOnNext(aLong -> System.out.println("flux 2 : " + aLong));
Flux.merge(flux1, flux2)
.doOnNext(System.out::println)
.then()
.block();
创建两个Flux<Long>
像上面的代码
flux1 创建偶数流 (0,2,4,6,8 ...) flux2 创建奇数流 (1,3,5,7,9 ...)
我预计合并这 2 个 flux1 和 flux2 时会像
0,1,2,3,4 ...
或0,2,1,3,4..
取决于计算能力
但总是花费 flux1 和 flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19
如何订阅多个流量热切事件?
两个流 运行 在同一线程上。当您订阅时 flux1
开始推送数据直到完成。只有这样线程才有空 flux2
才能继续。 merge
运算符按照它们到达的顺序发出值。它不会在第一个和第二个流之间切换。
如果您想要流运行并发,您需要运行它们在不同的线程上,例如通过使用 publishOn
运算符。
Flux<Long> flux1 = Flux
.<Long>create(fluxSink -> {
for (long i = 0; i < 20; i++) {
fluxSink.next(i);
}
})
.publishOn(Schedulers.newSingle("thread-x")
.filter(aLong -> aLong % 2 == 0)
.doOnNext(aLong -> System.out.println("flux 1 : " + aLong));