RxJava2 - 重复任务的间隔和 运行 有条件的可观察

RxJava2 - Interval on a repetitive task & run an observable on a conditional

运行改造

这是我的界面:

  @GET("solicitation/all")
  Observable<SolicitationResponse> getAll(@Query("X-Authorization") String apiKey);

这是我运行它所在的地方:

    apiService.getAll(getResources().getString(R.string.api_key))
            .subscribeOn(Schedulers.io())
            .flatMapIterable(SolicitationResponse::getData)
            .observeOn(AndroidSchedulers.mainThread())
            .delay(5L, java.util.concurrent.TimeUnit.SECONDS) // THIS DOESN'T WORK LIKE I WANT IT TO..
            .repeat()
            .subscribe(s -> Log.e(TAG, "data: " + s.getName()));

那么,两个问题:

1) 如果我们有互联网连接,我怎样才能添加条件到 运行?

这行不通:

if (NetworkUtils.isConnected()) {
    //observable above here
}

为什么?因为条件代码本身不会 运行 无穷无尽,这意味着它只会检查我们是否有互联网连接一次,因此,如果我们丢失它就会崩溃。

有什么方法可以在 运行使用 getAll 方法之前添加条件吗?

2) 我需要在任务之前或之后添加一个间隔,通过插入 .delay 它会延迟订阅,这不是我想要或需要的.在这种特殊情况下我该如何完成?

这里有一个建议:

Observable.fromCallable(() -> NetworkUtils.isConnected())
            .flatMap(isConnected -> {
                if (isConnected) {
                    return apiService.getAll(getResources().getString(R.string.api_key))
                            .subscribeOn(Schedulers.io())
                            .flatMapIterable(SolicitationResponse::getData);
                } else {
                    return Observable.empty();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .repeatWhen(observable -> observable.delay(5, TimeUnit.SECONDS))
            .subscribe(s -> Log.e(TAG, "data: " + s.getName()));

1) 最直接的方法是将网络检查添加到流中,并使用 flatMap() 有条件地决定进一步做什么。

2) 可以使用 delaySubscription() 和所需的值来添加延迟,但是第一次也会发生延迟,所以在最后添加延迟的第二种方法在这里似乎更合适,并且可以使用 repeatWhen()

来完成