如何使用 rxAndroid 递归调用 api 直到满足某些条件?

How to call api recursively using rxAndroid until some met condition?

我正在使用 rxAndroid 进行改造,试图从服务器获得响应,直到服务器 return 0 列表大小。

getPageAndNext(rows_id),它的接受起始点 id 现在考虑它的 0,在下一个请求中,我将传递第 9 个 id,以便服务器 return 在第 9 个 id 响应后。(10th,第 11、...第 15...第 19)

 Observable<List<Feed>> getPageAndNext(int id) {
        return apiInterface.postGetFeed(membersIds, id)
                .flatMap(feedResponse -> {
                    if (feedResponse.getData().size() > 0) {
                        return Observable.just(feedResponse.getData())
                                .concatWith(getPageAndNext(feedResponse.getData().get(feedResponse.getData().size() - 1).getId()));

                    }
                    return Observer::onComplete;
                });

    }

这里每个请求,我得到 10 个 feed 并在 onNext 方法中将 feed 插入房间数据库

getPageAndNext(0)
                .subscribeOn(Schedulers.computation())
                .subscribe(new DisposableObserver<List<Feed>>() {
                    @Override
                    public void onNext(List<Feed> feeds) {
                        //save to room database
                        memberDatabaseRepository.insertMemberList(feeds);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("jjjj", "error"+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d("kkkk","done");
                    }
                });

现在我没有收到服务器的所有响应。我正在搜索过去 24 小时,但没有找到任何解决方案。

注意:此后我可以进行 76 API 次呼叫,即使只有 76 API 次呼叫数据保存在房间数据库中也没有任何响应。但大约有 630 API 个电话

链接:-

最后解决了在rxAndroid中递归调用API。 takeUntil -> 我正在检查我是否得到大小为零然后停止递归调用。

Observable.range(0, Integer.MAX_VALUE)
                // Get each page in order.
                .concatMap(integer -> {
                    return apiInterface.postGetFeed(membersIds, lastFeedId);
                })
                // Take every result up to and including the one where the next page index is null.
                .takeUntil(result -> {
                    if(!result.getData().isEmpty())
                    lastFeedId = result.getData().get(result.getData().size() - 1).getId();
                    return result.getData().isEmpty();
                })
                .map(FeedResponse::getData)
                .subscribeOn(Schedulers.io())
                .subscribe(new DisposableObserver<List<Feed>>() {
                               @Override
                               public void onNext(List<Feed> feeds) {
                                   Log.d("kkkk", "" + feeds);
                                   memberDatabaseRepository.insertMemberList(feeds, lastFeedId);

                               }

                               @Override
                               public void onError(Throwable e) {
                                   Log.d("kkkk", "error " + e.getMessage());

                               }

                               @Override
                               public void onComplete() {
                                   Log.d("kkkk", "onComplete");
                                   memberDatabaseRepository.getAllFeed(FeedFragment.this);
                               }
                           }
                );