RxJava 并行线程等待后续线程

RxJava parallel threads wait for sequental thread

我有这样一个流程,我需要在多个线程中准备文件并在一个线程中写入数据库

    Flowable.fromIterable(files)
            .parallel()
            .runOn(Schedulers.io())
            .flatMap(prepareFiles())
            .doOnNext(writeToDatabase())
            .sequential()
            .takeUntil(file -> {
                return stopService.get();
            })
            .blockingSubscribe();

当我使用这样的流程时,它工作得很好。但是我需要在一个线程中写入数据库(数据库细节的原因)

所以我只更改了代码中的两行(doOnNextsequential),它变成了这样:

    Flowable.fromIterable(files)
            .parallel()
            .runOn(Schedulers.io())
            .flatMap(prepareFiles())
            .sequential()
            .doOnNext(writeToDatabase())
            .takeUntil(file -> {
                return stopService.get();
            })
            .blockingSubscribe();

但是如果你查看 JVM,你会发现,总体速度变慢了很多

似乎所有线程只等待一个线程写入数据库

第一个代码的JVM

这就是带有第二个代码的 JVM

你可以看到,似乎所有线程都在等待一个

那么,如何在一个线程中写入,并设置其他线程不先等待呢?

您看到的是背压对 sequential 的影响。在它之后使用另一个调度程序应用 observeOn,以免阻塞并行线程。

https://github.com/ReactiveX/RxJava/issues/7394