循环组合可观察量
Round-robin combining observables
我是 RxJava 的新手,我一直在尝试以循环方式组合多个可观察对象。
所以,假设您有三个可观察值:
o1: --0---1------2--
o2: -4--56----------
o3: -------8---9----
以循环方式组合这些将得到类似的结果:
r : --04---815-9-26-
解决这个问题的最佳方法是什么?
由于它看起来像 RxJava、RxScala 等。几乎共享 API,用任何语言回答都应该没问题。 :)
谢谢,
马蒂亚
RxJava 默认没有这样的运算符。最接近的是使用节奏良好的源合并,因为它使用循环法来收集值,但是不能依赖这个 属性。为什么需要这种循环行为?
最好的办法是手动实施此行为。 Here is an example 没有背压支持。
有一种方法实现起来非常简单,并且几乎完全符合您的要求——只需压缩三个源可观察对象,然后从压缩的可观察对象中发出三个值每次都有新的三胞胎到来。
翻译成 RxScala
val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)
val roundRobinSource = Observable
.zip(Observable.just(o1, o2, o3))
.flatMap(Observable.from[Int])
roundRobinSource.subscribe(println, println)
给你
1
10
100
2
20
200
3
30
300
这正是您想要的。
这种方法的问题在于它会阻塞,直到来自 三个 来源中的每一个的值到达为止,但是如果您对此很满意,我认为这是迄今为止最简单的解决方案。我很好奇,你的用例是什么?
更新,拍摄 #2
这其实是个有趣的问题。这是另一种做法,它将用一个缺点换取另一个缺点。
import rx.lang.scala.{Subject, Observable}
val s1 = Subject[Int]()
val s2 = Subject[Int]()
val s3 = Subject[Int]()
val roundRobinSource3 = s1.publish(po1 ⇒ s2.publish(po2 ⇒ s3.publish(po3 ⇒ {
def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
def all: Observable[Int] = oneRound ++ Observable.defer(all)
all
})))
roundRobinSource3.subscribe(println, println, () ⇒ println("Completed"))
println("s1.onNext(1)")
s1.onNext(1)
println("s2.onNext(10)")
s2.onNext(10)
println("s3.onNext(100)")
s3.onNext(100)
println("s2.onNext(20)")
s2.onNext(20)
println("s1.onNext(2)")
s1.onNext(2)
println("s3.onNext(200)")
s3.onNext(200)
println("s1.onCompleted()")
s1.onCompleted()
println("s2.onCompleted()")
s2.onCompleted()
println("s3.onCompleted()")
s3.onCompleted()
println("Done...")
给你
s1.onNext(1)
1
s2.onNext(10)
10
s3.onNext(100)
100
s2.onNext(20)
s1.onNext(2)
2
20
s3.onNext(200)
200
s1.onCompleted()
s2.onCompleted()
s3.onCompleted()
Done...
它不会阻塞,它会循环,但是...它也不会完成 :( 您可以使用 takeUntil
、Subject
和doOnComplete
如果你需要的话..
至于机制,它使用了 publish
对我来说有点神秘的行为,可以跟踪已经发出的东西。当他回答我自己的问题时,@lopar 最初指出我 。
publish
的行为对我来说实际上是一个谜,我在这里发布了一个关于它的问题:https://github.com/ReactiveX/RxJava/issues/2775。有好奇的可以关注一下
我是 RxJava 的新手,我一直在尝试以循环方式组合多个可观察对象。
所以,假设您有三个可观察值:
o1: --0---1------2--
o2: -4--56----------
o3: -------8---9----
以循环方式组合这些将得到类似的结果:
r : --04---815-9-26-
解决这个问题的最佳方法是什么? 由于它看起来像 RxJava、RxScala 等。几乎共享 API,用任何语言回答都应该没问题。 :)
谢谢, 马蒂亚
RxJava 默认没有这样的运算符。最接近的是使用节奏良好的源合并,因为它使用循环法来收集值,但是不能依赖这个 属性。为什么需要这种循环行为?
最好的办法是手动实施此行为。 Here is an example 没有背压支持。
有一种方法实现起来非常简单,并且几乎完全符合您的要求——只需压缩三个源可观察对象,然后从压缩的可观察对象中发出三个值每次都有新的三胞胎到来。
翻译成 RxScala
val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)
val roundRobinSource = Observable
.zip(Observable.just(o1, o2, o3))
.flatMap(Observable.from[Int])
roundRobinSource.subscribe(println, println)
给你
1
10
100
2
20
200
3
30
300
这正是您想要的。
这种方法的问题在于它会阻塞,直到来自 三个 来源中的每一个的值到达为止,但是如果您对此很满意,我认为这是迄今为止最简单的解决方案。我很好奇,你的用例是什么?
更新,拍摄 #2
这其实是个有趣的问题。这是另一种做法,它将用一个缺点换取另一个缺点。
import rx.lang.scala.{Subject, Observable}
val s1 = Subject[Int]()
val s2 = Subject[Int]()
val s3 = Subject[Int]()
val roundRobinSource3 = s1.publish(po1 ⇒ s2.publish(po2 ⇒ s3.publish(po3 ⇒ {
def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
def all: Observable[Int] = oneRound ++ Observable.defer(all)
all
})))
roundRobinSource3.subscribe(println, println, () ⇒ println("Completed"))
println("s1.onNext(1)")
s1.onNext(1)
println("s2.onNext(10)")
s2.onNext(10)
println("s3.onNext(100)")
s3.onNext(100)
println("s2.onNext(20)")
s2.onNext(20)
println("s1.onNext(2)")
s1.onNext(2)
println("s3.onNext(200)")
s3.onNext(200)
println("s1.onCompleted()")
s1.onCompleted()
println("s2.onCompleted()")
s2.onCompleted()
println("s3.onCompleted()")
s3.onCompleted()
println("Done...")
给你
s1.onNext(1)
1
s2.onNext(10)
10
s3.onNext(100)
100
s2.onNext(20)
s1.onNext(2)
2
20
s3.onNext(200)
200
s1.onCompleted()
s2.onCompleted()
s3.onCompleted()
Done...
它不会阻塞,它会循环,但是...它也不会完成 :( 您可以使用 takeUntil
、Subject
和doOnComplete
如果你需要的话..
至于机制,它使用了 publish
对我来说有点神秘的行为,可以跟踪已经发出的东西。当他回答我自己的问题时,@lopar 最初指出我
publish
的行为对我来说实际上是一个谜,我在这里发布了一个关于它的问题:https://github.com/ReactiveX/RxJava/issues/2775。有好奇的可以关注一下