RxJava2:如何改进并行数据下载和缓存?

RxJava2: How to improve parallel data downloading and caching?

我正在努力通过 RxJava2。我想知道我的解决方案是否可以接受,或者有什么方法可以改进它。

用例

  1. 用户按下更新数据按钮
  2. 显示一个对话框 - 请稍等
  3. 并行处理多个后端调用
  4. 其中任何一个完成后 - 数据将保存在本地数据库中
  5. 所有请求完成后(后端调用和持久化)对话框应该关闭

当前解

我有几个 Completables 看起来像这样:

Completable organisationUnitCompletable = backendService.getOrganisationUnits()
    .doOnNext(data -> organisationUnitDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable locationCompletable = backendService.getLocations()
    .doOnNext(data -> locationDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

Completable prioritiesCompletable = backendService.getPriorities()
    .doOnNext(data -> priorityDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

我通过添加到列表并使用 merge 运算符将它们打包成一个:

List<Completable> compatibles = new ArrayList<>();
compatibles.add(organisationUnitCompletable);
compatibles.add(locationCompletable);
compatibles.add(prioritiesCompletable);

Completable.merge(compatibles)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
     progressDialog.dismiss();
});

可能的改进

好的,这按预期工作了。但有些事情我不太高兴。

我真的必须向每个 Completable 添加 subscribeOn(Schedulers.io()) 吗?没有它就不能并行工作,但也许有更好的方法来做到这一点?

所有可完成项都有这些行。

    .ignoreElements()
    .subscribeOn(Schedulers.io());

有没有办法把它提取到一个方法中?我试过这样的事情:

private <T> Completable prepareCompletable(Function<Void, Observable<List<T>>> source, AbstractDao<T, Long> dao) {

    Completable orderTypeCompletable = source
            .doOnNext(data -> dao.saveInTx(data))
            .ignoreElements()
            .subscribeOn(Schedulers.io());
}

我只是将 Observable 和 DAO 放入其中。当然它不编译。看来它需要的泛型知识比我已经掌握的要多得多。

抱歉,这个问题很长,很难用几句话来解释整个用例。

Do I really have to add the subscribeOn(Schedulers.io()) to each Completable?

是的,但是在Completable.merge()之后就不需要了。

Is there a way to extract it into one method?

public static <T> Function<Flowable<T>, Completable> applyIgnore() {
    return f -> f.ignoreElements().subscribeOn(Schedulers.io());
}

Completable locationCompletable = backendService.getLocations()
.doOnNext(data -> locationDao.saveInTx(data))
.to(applyIgnore());