RxJava 并行线程等待后续线程
RxJava parallel threads wait for sequental thread
我有这样一个流程,我需要在多个线程中准备文件并在一个线程中写入数据库
Flowable.fromIterable(files)
.parallel()
.runOn(Schedulers.io())
.flatMap(prepareFiles())
.doOnNext(writeToDatabase())
.sequential()
.takeUntil(file -> {
return stopService.get();
})
.blockingSubscribe();
当我使用这样的流程时,它工作得很好。但是我需要在一个线程中写入数据库(数据库细节的原因)
所以我只更改了代码中的两行(doOnNext
和 sequential
),它变成了这样:
Flowable.fromIterable(files)
.parallel()
.runOn(Schedulers.io())
.flatMap(prepareFiles())
.sequential()
.doOnNext(writeToDatabase())
.takeUntil(file -> {
return stopService.get();
})
.blockingSubscribe();
但是如果你查看 JVM,你会发现,总体速度变慢了很多
似乎所有线程只等待一个线程写入数据库
第一个代码的JVM
这就是带有第二个代码的 JVM
你可以看到,似乎所有线程都在等待一个
那么,如何在一个线程中写入,并设置其他线程不先等待呢?
您看到的是背压对 sequential
的影响。在它之后使用另一个调度程序应用 observeOn
,以免阻塞并行线程。
我有这样一个流程,我需要在多个线程中准备文件并在一个线程中写入数据库
Flowable.fromIterable(files)
.parallel()
.runOn(Schedulers.io())
.flatMap(prepareFiles())
.doOnNext(writeToDatabase())
.sequential()
.takeUntil(file -> {
return stopService.get();
})
.blockingSubscribe();
当我使用这样的流程时,它工作得很好。但是我需要在一个线程中写入数据库(数据库细节的原因)
所以我只更改了代码中的两行(doOnNext
和 sequential
),它变成了这样:
Flowable.fromIterable(files)
.parallel()
.runOn(Schedulers.io())
.flatMap(prepareFiles())
.sequential()
.doOnNext(writeToDatabase())
.takeUntil(file -> {
return stopService.get();
})
.blockingSubscribe();
但是如果你查看 JVM,你会发现,总体速度变慢了很多
似乎所有线程只等待一个线程写入数据库
第一个代码的JVM
这就是带有第二个代码的 JVM
你可以看到,似乎所有线程都在等待一个
那么,如何在一个线程中写入,并设置其他线程不先等待呢?
您看到的是背压对 sequential
的影响。在它之后使用另一个调度程序应用 observeOn
,以免阻塞并行线程。