消费观察者同时发出的值

Consuming values concurrently emmited by an Observer

我正在学习使用 RxJava 进行反应式编程,并希望在不阻塞单个执行线程的情况下并发使用 emmited 值。

        Observable
            .interval(50, TimeUnit.MILLISECONDS)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long counter) {
                    sleep(1000);
                    System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
                }
            });

    sleep(10000);

我会得到这个输出

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1

如何异步处理每个事件?像这样

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5

在 Rx 中,一个 observable 代表并发性1,因此要相互并发处理通知,您必须将每个通知投射到一个 observable 中。

flatMap 是异步顺序组合运算符。它将来自源可观察对象的每个通知投射到可观察对象中,从而允许您同时处理每个输入值。然后它将每个计算的结果合并到一个带有非重叠通知的扁平化可观察序列中。

附录:

selector for flatMap 中,通常有多种方法可以根据目标平台创建并发可观察对象。我不知道 Java,但在 .NET 中,您通常会使用 Observable.Start 来引入并发性或使用异步方法 (async/await) 来利用本机异步,这通常更可取.

1 从技术上讲,cold observable 的个人订阅(观察者)可以在 Rx 中启用并发性,尽管考虑起来通常很方便代替可观察项。有关详细信息,请参阅 this answer