RxJava 合并保留顺序没有意义

RxJava merge preserve order without sense

我阅读了合并和连接之间的区别。 Concat保留顺序,不合并。

但在我的例子中,合并总是同时保留顺序。有什么问题吗?为什么我的合并等待以前的任务? 我想获得随机结果,但在这种情况下,我总是得到 A1 A2 A3... B1 B2 B3...

fun doSomething() {
        compositeDisposable.add(
            Observable.merge(getNumber1(), getNumber2())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                    { item -> println(item) },
                    { error -> println(error.message) }
                ))
    }

    private fun getNumber1(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableA: " + i)
            }
            emitter.onComplete()
        }
    }

    private fun getNumber2(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableB: " + i)
            }
            emitter.onComplete()
        }
    }

Why my merge waits for previous tasks ?

这是因为它们都在同一个线程上执行。为了使两个任务并行工作,subscribeOn 应该在每个 Observables 被合并之前被调用:

Observable.merge(
    getNumber1().subscribeOn(Schedulers.io()),
    getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
    ...

您还可以在 emitter.onNext 之后添加 Thread.sleep(100) 以进行更好的测试

为了理解 merge 和 concat 之间的区别,您可以参考下面的示例,其中每 10 毫秒发生一次发射。

val observableFirst = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "A$it" }
val observableSecond = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "B$it" }

val result = arrayListOf<String>()
Observable.merge(observableFirst, observableSecond)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnComplete { println(result) }
    .subscribe(
        { item -> result.add(item) },
        { error -> println(error.message) }
    )

输出(每次可能不同):[A0, B0, A1, B1, A2, B2, A3, B3, A4, B4, A5, B5, A6, B6, B7, A7, B8, A8, B9, A9]

在您的示例中,for 循环会导致非常快的发射间隔。因此,您没有发现其顺序有任何差异。

在我的示例中,如您所见,发射顺序未得到维护。此外,您可以使用 concat 而不是 merge 来检查这一点。

输出(没有区别):[A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, B0, B1, B2, B3, B4, B5, B6, B7, B8, B9]