RxJava:带有 zip() 的 concatMap() 被卡住

RxJava: concatMap() with zip() gets stuck

我有一个虚拟网络数据源:

    fun networkDataSource(): Single<List<Int>> {
        return Single.just((0 until 100).toList())
                .delay(150, TimeUnit.MILLISECONDS)
    }

这是一个无穷无尽的可观察对象。它的主要用途是它的计算应该是'protected',这样它只计算一次它的单一值(这里取值为1)

    val endless = Observable
            .just(1)
            .observeOn(Schedulers.io())
            .delay(500, TimeUnit.MILLISECONDS)
            // Counts as heavy operation, do not calculate this here once again
            .doOnNext { println("=> E: Calculated once") }
            .cache()
            //.doOnNext { println("=> E: From cache") }
            .repeat()

主流只是简单地发出值:

    val mainStream = Observable.range(0, 6)
            .doOnNext { println("=> M: Main stream $it") }

任务:

将 3 个 observable 压缩在一起,并优化网络使用,这样它就不会被不必要地调用。 (一旦满足数据数量(在本例中为整数)。

方法:

    mainStream
            .concatMap {index ->
                Observables.zip(
                        Observable.just(index),
                        endless,
                        networkDataSource()
                                .toObservable()
                                .doOnNext { println("#> N: Network data fetch $index") }
                )
            }
            .doOnNext { println("=> After concatmap: ${it.first}") }
            .take(4)
            .doOnNext { println("=> After take: ${it.first}") }
            .subscribe(
                    { println("=> Last onnext") },
                    { it.printStackTrace() },
                    { synchronized(check) { check.notifyAll() } }
            )

正在完成锁定的线程 - 仅用于测试:

synchronized(check) {
    check.wait()
}
println("Ending")

这是输出:

=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext

这是输出,第二次拍摄后卡住了。 (不会在一分钟内继续)。我的问题是,为什么会这样?

作为旁注,如果我取消注释来自 endless observable 的行:

.doOnNext { println("=> E: From cache") }

它将用该行淹没控制台。为什么 endless 被调用这么多次而不是每次迭代?

flatMap() 不是这里的解决方案,因为它没有考虑 take(4) 并继续完成所有网络调用。

那么我怎样才能让 concatMap() 工作呢?

(我还添加了 RxJS 标签,因为这是一个反应性问题,绝对与 Kotlin 无关。如果 RxJava 库中存在这些函数,也欢迎使用 JS 解决方案。)

编辑:

我查看了代码,2 个输出可能是因为 prefetch 参数:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

但我还是不明白它是如何工作的。我只读到 concatMap()flatmap(),但它会等待每个结果。

来自评论:

整个设置可能会 运行 在第一个项目之后的同一个线程上,并且 endless 中的 repeat 永远不会放弃线程,从而阻止任何其他操作员继续进行。重复 cache 对我来说毫无意义,因为您只会使用其中的一项。