Rx 编程,如何在单个可观察对象中将 item 与前一个元素结合起来?
Rx programming, how to combine item with former element in single observable?
如果我们有可观察的:
1 -> 2 -> 3 -> 4 -> 5 -> ...
如何构建新的可观察对象:
(1, 2) -> (3, 4) -> ...
也许这个问题很简短,但我真的不知道如何 achieve.Thank 你
谢谢大家,我找到了一个方法,正在考虑删除var
import java.util.concurrent.TimeUnit
import rx.lang.scala.{Subject, Observable}
import scala.concurrent.duration.Duration
object ObservableEx {
implicit class ObservableImpl[T](src: Observable[T]) {
/**
* 1 -> 2 -> 3 -> 4 ->...
* (1,2) -> (3,4) -> ...
*/
def pair: Observable[(T, T)] = {
val sub = Subject[(T, T)]()
var former: Option[T] = None //help me to kill it
src.subscribe(
x => {
if (former.isEmpty) {
former = Some(x)
}
else {
sub.onNext(former.get, x)
former = None
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
sub
}
}
}
object Test extends App {
import ObservableEx._
val pair = Observable.interval(Duration(1L, TimeUnit.SECONDS)).pair
pair.subscribe(x => println("1 - " + x))
pair.subscribe(x => println("2 - " + x))
Thread.currentThread().join()
}
我一点也不喜欢 var,再次感谢!
终于
我得到了一个简单的方法,希望可以帮助别人。
def pairPure[T](src: Observable[T]): Observable[(T, T)] = {
def pendingPair(former: Option[T], sub: Subject[(T, T)]): Unit = {
val p = Promise[Unit]
val subscription = src.subscribe(
x => {
if (former.isEmpty) {
p.trySuccess(Unit)
pendingPair(Some(x), sub)
}
else {
sub.onNext(former.get, x)
p.trySuccess(Unit)
pendingPair(None, sub)
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
p.future.map{x => subscription.unsubscribe()}
}
val sub = Subject[(T,T)]()
pendingPair(None, sub)
sub
}
其他答案也很有帮助~
尝试 groupBy 运算符。
祝你好运。
您可以使用 tumblingBuffer 和 count = 2
来获得长度为 2 的 Seq
的 Observable
,并且使用 map
,您可以将它们成对:
implicit class ObservableImpl[T](src: Observable[T]) {
def pair: Observable[(T, T)] = {
def seqToPair(seq: Seq[T]): (T, T) = seq match {
case Seq(first, second) => (first, second)
}
src.tumblingBuffer(2).map(seqToPair)
}
}
请注意,如果源 Observable 中的元素数量为奇数,这将失败,因此您必须在 seqToPair
中介绍这种情况。
如果我们有可观察的:
1 -> 2 -> 3 -> 4 -> 5 -> ...
如何构建新的可观察对象:
(1, 2) -> (3, 4) -> ...
也许这个问题很简短,但我真的不知道如何 achieve.Thank 你
谢谢大家,我找到了一个方法,正在考虑删除var
import java.util.concurrent.TimeUnit
import rx.lang.scala.{Subject, Observable}
import scala.concurrent.duration.Duration
object ObservableEx {
implicit class ObservableImpl[T](src: Observable[T]) {
/**
* 1 -> 2 -> 3 -> 4 ->...
* (1,2) -> (3,4) -> ...
*/
def pair: Observable[(T, T)] = {
val sub = Subject[(T, T)]()
var former: Option[T] = None //help me to kill it
src.subscribe(
x => {
if (former.isEmpty) {
former = Some(x)
}
else {
sub.onNext(former.get, x)
former = None
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
sub
}
}
}
object Test extends App {
import ObservableEx._
val pair = Observable.interval(Duration(1L, TimeUnit.SECONDS)).pair
pair.subscribe(x => println("1 - " + x))
pair.subscribe(x => println("2 - " + x))
Thread.currentThread().join()
}
我一点也不喜欢 var,再次感谢!
终于 我得到了一个简单的方法,希望可以帮助别人。
def pairPure[T](src: Observable[T]): Observable[(T, T)] = {
def pendingPair(former: Option[T], sub: Subject[(T, T)]): Unit = {
val p = Promise[Unit]
val subscription = src.subscribe(
x => {
if (former.isEmpty) {
p.trySuccess(Unit)
pendingPair(Some(x), sub)
}
else {
sub.onNext(former.get, x)
p.trySuccess(Unit)
pendingPair(None, sub)
}
},
e => sub.onError(e),
() => sub.onCompleted()
)
p.future.map{x => subscription.unsubscribe()}
}
val sub = Subject[(T,T)]()
pendingPair(None, sub)
sub
}
其他答案也很有帮助~
尝试 groupBy 运算符。 祝你好运。
您可以使用 tumblingBuffer 和 count = 2
来获得长度为 2 的 Seq
的 Observable
,并且使用 map
,您可以将它们成对:
implicit class ObservableImpl[T](src: Observable[T]) {
def pair: Observable[(T, T)] = {
def seqToPair(seq: Seq[T]): (T, T) = seq match {
case Seq(first, second) => (first, second)
}
src.tumblingBuffer(2).map(seqToPair)
}
}
请注意,如果源 Observable 中的元素数量为奇数,这将失败,因此您必须在 seqToPair
中介绍这种情况。