循环组合可观察量

Round-robin combining observables

我是 RxJava 的新手,我一直在尝试以循环方式组合多个可观察对象。

所以,假设您有三个可观察值:

o1: --0---1------2--
o2: -4--56----------
o3: -------8---9----

以循环方式组合这些将得到类似的结果:

r : --04---815-9-26-

解决这个问题的最佳方法是什么? 由于它看起来像 RxJava、RxScala 等。几乎共享 API,用任何语言回答都应该没问题。 :)

谢谢, 马蒂亚

RxJava 默认没有这样的运算符。最接近的是使用节奏良好的源合并,因为它使用循环法来收集值,但是不能依赖这个 属性。为什么需要这种循环行为?

最好的办法是手动实施此行为。 Here is an example 没有背压支持。

有一种方法实现起来非常简单,并且几乎完全符合您的要求——只需压缩三个源可观察对象,然后从压缩的可观察对象中发出三个值每次都有新的三胞胎到来。

翻译成 RxScala

val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)

val roundRobinSource = Observable
    .zip(Observable.just(o1, o2, o3))
    .flatMap(Observable.from[Int])

roundRobinSource.subscribe(println, println)

给你

1
10
100
2
20
200
3
30
300

这正是您想要的。

这种方法的问题在于它会阻塞,直到来自 三个 来源中的每一个的值到达为止,但是如果您对此很满意,我认为这是迄今为止最简单的解决方案。我很好奇,你的用例是什么?

更新,拍摄 #2

这其实是个有趣的问题。这是另一种做法,它将用一个缺点换取另一个缺点。

import rx.lang.scala.{Subject, Observable}

val s1 = Subject[Int]()
val s2 = Subject[Int]()
val s3 = Subject[Int]()

val roundRobinSource3 = s1.publish(po1 ⇒ s2.publish(po2 ⇒ s3.publish(po3 ⇒ {
  def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
  def all: Observable[Int] = oneRound ++ Observable.defer(all)
  all
})))

roundRobinSource3.subscribe(println, println, () ⇒ println("Completed"))

println("s1.onNext(1)")
s1.onNext(1)
println("s2.onNext(10)")
s2.onNext(10)
println("s3.onNext(100)")
s3.onNext(100)
println("s2.onNext(20)")
s2.onNext(20)
println("s1.onNext(2)")
s1.onNext(2)
println("s3.onNext(200)")
s3.onNext(200)
println("s1.onCompleted()")
s1.onCompleted()
println("s2.onCompleted()")
s2.onCompleted()
println("s3.onCompleted()")
s3.onCompleted()
println("Done...")

给你

s1.onNext(1)
1
s2.onNext(10)
10
s3.onNext(100)
100
s2.onNext(20)
s1.onNext(2)
2
20
s3.onNext(200)
200
s1.onCompleted()
s2.onCompleted()
s3.onCompleted()
Done...

它不会阻塞,它会循环,但是...它也不会完成 :( 您可以使用 takeUntilSubjectdoOnComplete 如果你需要的话..

至于机制,它使用了 publish 对我来说有点神秘的行为,可以跟踪已经发出的东西。当他回答我自己的问题时,@lopar 最初指出我

publish 的行为对我来说实际上是一个谜,我在这里发布了一个关于它的问题:https://github.com/ReactiveX/RxJava/issues/2775。有好奇的可以关注一下