RxJava:带有 zip() 的 concatMap() 被卡住
RxJava: concatMap() with zip() gets stuck
我有一个虚拟网络数据源:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
这是一个无穷无尽的可观察对象。它的主要用途是它的计算应该是'protected',这样它只计算一次它的单一值。 (这里取值为1)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
主流只是简单地发出值:
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
任务:
将 3 个 observable 压缩在一起,并优化网络使用,这样它就不会被不必要地调用。 (一旦满足数据数量(在本例中为整数)。
方法:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
正在完成锁定的线程 - 仅用于测试:
synchronized(check) {
check.wait()
}
println("Ending")
这是输出:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
这是输出,第二次拍摄后卡住了。 (不会在一分钟内继续)。我的问题是,为什么会这样?
作为旁注,如果我取消注释来自 endless
observable 的行:
.doOnNext { println("=> E: From cache") }
它将用该行淹没控制台。为什么 endless
被调用这么多次而不是每次迭代?
flatMap()
不是这里的解决方案,因为它没有考虑 take(4)
并继续完成所有网络调用。
那么我怎样才能让 concatMap()
工作呢?
(我还添加了 RxJS 标签,因为这是一个反应性问题,绝对与 Kotlin 无关。如果 RxJava 库中存在这些函数,也欢迎使用 JS 解决方案。)
编辑:
我查看了代码,2 个输出可能是因为 prefetch
参数:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
但我还是不明白它是如何工作的。我只读到 concatMap()
是 flatmap()
,但它会等待每个结果。
来自评论:
整个设置可能会 运行 在第一个项目之后的同一个线程上,并且 endless
中的 repeat
永远不会放弃线程,从而阻止任何其他操作员继续进行。重复 cache
对我来说毫无意义,因为您只会使用其中的一项。
我有一个虚拟网络数据源:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
这是一个无穷无尽的可观察对象。它的主要用途是它的计算应该是'protected',这样它只计算一次它的单一值。 (这里取值为1)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
主流只是简单地发出值:
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
任务:
将 3 个 observable 压缩在一起,并优化网络使用,这样它就不会被不必要地调用。 (一旦满足数据数量(在本例中为整数)。
方法:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
正在完成锁定的线程 - 仅用于测试:
synchronized(check) {
check.wait()
}
println("Ending")
这是输出:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
这是输出,第二次拍摄后卡住了。 (不会在一分钟内继续)。我的问题是,为什么会这样?
作为旁注,如果我取消注释来自 endless
observable 的行:
.doOnNext { println("=> E: From cache") }
它将用该行淹没控制台。为什么 endless
被调用这么多次而不是每次迭代?
flatMap()
不是这里的解决方案,因为它没有考虑 take(4)
并继续完成所有网络调用。
那么我怎样才能让 concatMap()
工作呢?
(我还添加了 RxJS 标签,因为这是一个反应性问题,绝对与 Kotlin 无关。如果 RxJava 库中存在这些函数,也欢迎使用 JS 解决方案。)
编辑:
我查看了代码,2 个输出可能是因为 prefetch
参数:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
但我还是不明白它是如何工作的。我只读到 concatMap()
是 flatmap()
,但它会等待每个结果。
来自评论:
整个设置可能会 运行 在第一个项目之后的同一个线程上,并且 endless
中的 repeat
永远不会放弃线程,从而阻止任何其他操作员继续进行。重复 cache
对我来说毫无意义,因为您只会使用其中的一项。