RxJava 合并保留顺序没有意义
RxJava merge preserve order without sense
我阅读了合并和连接之间的区别。
Concat保留顺序,不合并。
但在我的例子中,合并总是同时保留顺序。有什么问题吗?为什么我的合并等待以前的任务?
我想获得随机结果,但在这种情况下,我总是得到 A1 A2 A3... B1 B2 B3...
fun doSomething() {
compositeDisposable.add(
Observable.merge(getNumber1(), getNumber2())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ item -> println(item) },
{ error -> println(error.message) }
))
}
private fun getNumber1(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableA: " + i)
}
emitter.onComplete()
}
}
private fun getNumber2(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableB: " + i)
}
emitter.onComplete()
}
}
Why my merge waits for previous tasks ?
这是因为它们都在同一个线程上执行。为了使两个任务并行工作,subscribeOn
应该在每个 Observables 被合并之前被调用:
Observable.merge(
getNumber1().subscribeOn(Schedulers.io()),
getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
...
您还可以在 emitter.onNext
之后添加 Thread.sleep(100)
以进行更好的测试
为了理解 merge 和 concat 之间的区别,您可以参考下面的示例,其中每 10 毫秒发生一次发射。
val observableFirst = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "A$it" }
val observableSecond = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "B$it" }
val result = arrayListOf<String>()
Observable.merge(observableFirst, observableSecond)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete { println(result) }
.subscribe(
{ item -> result.add(item) },
{ error -> println(error.message) }
)
输出(每次可能不同):[A0, B0, A1, B1, A2, B2, A3, B3, A4, B4, A5, B5, A6, B6, B7, A7, B8, A8, B9, A9]
在您的示例中,for 循环会导致非常快的发射间隔。因此,您没有发现其顺序有任何差异。
在我的示例中,如您所见,发射顺序未得到维护。此外,您可以使用 concat 而不是 merge 来检查这一点。
输出(没有区别):[A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, B0, B1, B2, B3, B4, B5, B6, B7, B8, B9]
我阅读了合并和连接之间的区别。 Concat保留顺序,不合并。
但在我的例子中,合并总是同时保留顺序。有什么问题吗?为什么我的合并等待以前的任务? 我想获得随机结果,但在这种情况下,我总是得到 A1 A2 A3... B1 B2 B3...
fun doSomething() {
compositeDisposable.add(
Observable.merge(getNumber1(), getNumber2())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ item -> println(item) },
{ error -> println(error.message) }
))
}
private fun getNumber1(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableA: " + i)
}
emitter.onComplete()
}
}
private fun getNumber2(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableB: " + i)
}
emitter.onComplete()
}
}
Why my merge waits for previous tasks ?
这是因为它们都在同一个线程上执行。为了使两个任务并行工作,subscribeOn
应该在每个 Observables 被合并之前被调用:
Observable.merge(
getNumber1().subscribeOn(Schedulers.io()),
getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
...
您还可以在 emitter.onNext
之后添加 Thread.sleep(100)
以进行更好的测试
为了理解 merge 和 concat 之间的区别,您可以参考下面的示例,其中每 10 毫秒发生一次发射。
val observableFirst = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "A$it" }
val observableSecond = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "B$it" }
val result = arrayListOf<String>()
Observable.merge(observableFirst, observableSecond)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete { println(result) }
.subscribe(
{ item -> result.add(item) },
{ error -> println(error.message) }
)
输出(每次可能不同):[A0, B0, A1, B1, A2, B2, A3, B3, A4, B4, A5, B5, A6, B6, B7, A7, B8, A8, B9, A9]
在您的示例中,for 循环会导致非常快的发射间隔。因此,您没有发现其顺序有任何差异。
在我的示例中,如您所见,发射顺序未得到维护。此外,您可以使用 concat 而不是 merge 来检查这一点。
输出(没有区别):[A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, B0, B1, B2, B3, B4, B5, B6, B7, B8, B9]