RxJava 2:重新订阅(或取消并再次订阅)链中的第二个无限可观察对象

RxJava 2: Resubscribe (or Cancel and subscribe again) to second infinite observable in chain

我有两个链接成一个的无限可观察对象(getLastNDaysItemsInfinite 和 listenToServerUpdates)。 第一个发出第二个应该收听的项目。

            repository
            .getLastNDaysItemsInfinite(4)
            .flatMap(items ->
                    Observable
                            .fromIterable(items)
                            .map(Item::getId)
                            .toList()
                            .flatMapObservable(ids ->
                                    repository
                                            .listenToServerUpdates(ids)
                                            .onErrorResumeNext(throwable -> {
                                                Log.w(TAG, "Error occurred: ", throwable);
                                                return Observable.empty();
                                            }))
            );

listenToServerUpdates observable 打开套接字连接并在我们取消订阅时关闭它。因此,我需要实现以下行为:当 getLastNDaysItemsInfinite(4) 发出新的项目列表时,listenToServerUpdates 将关闭其套接字连接并打开新的连接。但现在它只是为来自 getLastNDaysItemsInfinite 的每个新项目包并行创建新的 listenToServerUpdates observable 和结果新套接字连接。

当 getLastNDaysItemsInfinite 发出新项目时,我如何重新订阅或取消并再次订阅 运行 listenToServerUpdates observable?

提前致谢!

您可以使用 switchMap 而不是 flatMap

像这样

 repository
            .getLastNDaysItemsInfinite(4)
            .switchMap(items ->
                Observable
                        .fromIterable(items)....

因此,每次新列表来自 getLastNDaysItemsInfinite 之前创建的 Observables 都会取消订阅。 Here 是文档