RxJava - 一个生产者,单个订阅中的多个并发消费者

RxJava - One producer, many concurrent consumers in single subscription

我正在尝试了解 RxJava 并发的一些细节,但我不确定我的想法是否正确。我非常了解 SubscribeOn/ObserveOn 的工作原理,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个 1-N 生产者-消费者链,其中消费者数量尽可能多 CPUs。

根据文档,Schedulers.computation() 由与内核一样多的线程池支持。但是,根据 Reactive 合同,操作员只能获得顺序调用。

因此,像这样的设置

Observable.range(1, 1000) // Whatever has to be processed
            .observeOn(Schedulers.computation())
            .doOnNext(/* heavy computation */)
            .doOnCompleted(() -> System.out.println("COMPLETED"))
            .forEach(System.out::println);

尽管使用线程池,但只会收到对 doOnNext 的并发调用。睡眠实验和检查 OperatorObserveOn.java 似乎证实了这一点,因为每个 observeOn 调用都会获得一个工人。另外,如果不是这样的话,应该有一个复杂的 OnCompleted 管理必须等待任何挂起的 OnNext 完成,我发现它不存在。

假设我在这里走在正确的轨道上(也就是说,只涉及一个线程,尽管您可以使用 observeOn 跳过其中的几个线程),那么正确的模式是什么?我可以找到相反情况的示例(将多个异步事件生成器同步到一个消费者中),但不是这种典型情况的简单示例。

我猜想 flatMap 参与其中,可能使用限制并发订阅数量的 beta 版本(在 1.x 中)。也许像这样使用 window/flatMap 一样简单?

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()

在这种方法中,我仍然缺少一种以 Rx 通用方式最大化 CPU 的简单方法(即,指定计算调度程序而不是 flatMap 的最大订阅数)。所以,也许...:[=​​18=]

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(v -> Observable.just(v)
                        .observeOn(Schedulers.computation())
                        .map(/* heavy parallel computation */))
.subscribe()

最后,在某些使用 flatMap 的示例中,我在 flatMap 之后看到一个 toBlock() 调用,我不确定为什么需要它,因为 flatMap 不应该为下游执行序列化吗? (例如,在此示例中:http://akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html

托马斯·尼尔德 (Thomas Nield) 有一篇关于这种情况的好文章

RxJava - Maximizing Parallelization

我个人在那种情况下所做的,我只是在 flatMap 中订阅 Schedulers.io 并具有最大并发调用参数。

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* io calls */}) 
                    .subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
            .subscribe();

编辑 根据评论中的建议,最好将 Schedulers.computation() 用于 CPU 绑定工作

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */}) 
                    .subscribeOn(Schedulers.computation()))
            .subscribe();