takeWhile 每次都订阅新的 Single

takeWhile with subscribing to new Single each time

我有一个数据源fun getData(page : Int) : Single<List<Data>>

我想创建一种行为,每次我都可以使用 不同的参数订阅该来源 直到 conditionCheck() returns true.

我想象的是这样的:

getData(page)
  .doOnNext { page++ }
  .doOnNext { /* manipulate data */ }
  .takeWhile { conditionCheck() }
  .subscribe({
    print "Completed"
  })

这是一种不同的思考方式(伪代码式):

initialPage = Observable.just(1);
futurePages = PublishSubject.create();

allPossiblePages = Observable.concat(initialPage, futurePages);

allPossibleData = allPossiblePages
  .map(page -> getData(page))
  .do(page -> futurePages.onNext(/* manipulate page data */));

dataYouWant = allPossibleData
  .filter(data -> conditionCheck(data))
  .takeFirst()
  .subscribe( /* data should be here */);

也许这不是最好的方法,如果是这样的话,有人会说出来的。我不是 .do(page -> futurePages.onNext ..) 这行的忠实粉丝,但像这样的东西可能有用。

我在这里整理了一个使用 rxjs 运行的示例(最容易 prototype/demo):https://plnkr.co/edit/uXE53ygo8REzpbMJdvsD