RxJava 2:重新订阅(或取消并再次订阅)链中的第二个无限可观察对象
RxJava 2: Resubscribe (or Cancel and subscribe again) to second infinite observable in chain
我有两个链接成一个的无限可观察对象(getLastNDaysItemsInfinite 和 listenToServerUpdates)。
第一个发出第二个应该收听的项目。
repository
.getLastNDaysItemsInfinite(4)
.flatMap(items ->
Observable
.fromIterable(items)
.map(Item::getId)
.toList()
.flatMapObservable(ids ->
repository
.listenToServerUpdates(ids)
.onErrorResumeNext(throwable -> {
Log.w(TAG, "Error occurred: ", throwable);
return Observable.empty();
}))
);
listenToServerUpdates observable 打开套接字连接并在我们取消订阅时关闭它。因此,我需要实现以下行为:当 getLastNDaysItemsInfinite(4) 发出新的项目列表时,listenToServerUpdates 将关闭其套接字连接并打开新的连接。但现在它只是为来自 getLastNDaysItemsInfinite 的每个新项目包并行创建新的 listenToServerUpdates observable 和结果新套接字连接。
当 getLastNDaysItemsInfinite 发出新项目时,我如何重新订阅或取消并再次订阅 运行 listenToServerUpdates observable?
提前致谢!
您可以使用 switchMap
而不是 flatMap
。
像这样
repository
.getLastNDaysItemsInfinite(4)
.switchMap(items ->
Observable
.fromIterable(items)....
因此,每次新列表来自 getLastNDaysItemsInfinite
之前创建的 Observables
都会取消订阅。 Here 是文档
我有两个链接成一个的无限可观察对象(getLastNDaysItemsInfinite 和 listenToServerUpdates)。 第一个发出第二个应该收听的项目。
repository
.getLastNDaysItemsInfinite(4)
.flatMap(items ->
Observable
.fromIterable(items)
.map(Item::getId)
.toList()
.flatMapObservable(ids ->
repository
.listenToServerUpdates(ids)
.onErrorResumeNext(throwable -> {
Log.w(TAG, "Error occurred: ", throwable);
return Observable.empty();
}))
);
listenToServerUpdates observable 打开套接字连接并在我们取消订阅时关闭它。因此,我需要实现以下行为:当 getLastNDaysItemsInfinite(4) 发出新的项目列表时,listenToServerUpdates 将关闭其套接字连接并打开新的连接。但现在它只是为来自 getLastNDaysItemsInfinite 的每个新项目包并行创建新的 listenToServerUpdates observable 和结果新套接字连接。
当 getLastNDaysItemsInfinite 发出新项目时,我如何重新订阅或取消并再次订阅 运行 listenToServerUpdates observable?
提前致谢!
您可以使用 switchMap
而不是 flatMap
。
像这样
repository
.getLastNDaysItemsInfinite(4)
.switchMap(items ->
Observable
.fromIterable(items)....
因此,每次新列表来自 getLastNDaysItemsInfinite
之前创建的 Observables
都会取消订阅。 Here 是文档