如何有条件地发出两个可观察流?

How do I conditionally emit two observables stream?

我想知道是否有任何方法可以将一个流转换为另一个流。我已经有两个流 number$ 和 color$,它们输出数字和颜色,例如 [1, 1, 1, ...] 和 ['red', 'red', 'red', 。 ..].

我想要得到类似 [1, 1, 1, 'red', 'red', 'red', 'red', 'red', 'red', 1, 1, 1, ...], 此输出流将随机跳转 number$ 到 color$ 或相反。

我是 RxJS 的新手,没有找到合适的方法来解决我的问题。所以我假设了一个方法jumpWhen(condition, streamJumpTo),一旦条件为真,处理流将被弃用,它将被转换为一个新的流并开始工作。

let number$, color$

number$ = Rx.Observable.interval(300)
  .jumpWhen(Math.random() > 0.9, color$)
  .mapTo(1)

color$ = Rx.Observable.interval(500)
  .jumpWhen(Math.random() < 0.3, number$)
  .mapTo('red')

number$.subscribe(console.log)

问题是,如何使用 Rx.js v5 模拟此过程?注意:$number 和 $color 可能附加了完全不同的运算符(不是简单的 mapTo),所以我认为我们不应该尝试将这两个流合并在一起。

关于此流的说明:

起初,流输出每 300 毫秒 1,有 10% 的机会 (Math.random() > 0.9) 将 number$ 切换为 $color。如果发生切换,stream 停止输出 1,开始每 500ms 输出 red。整体输出可能看起来像:

       1 - - 1 - - 'red' - - - - 'red' - - - - 'red' - - - - 1 - - 1 - ...
cond   f     f       t             f             f           t     f
time  300   600     900           1400         1900        2400   2700
rand  0.5   0.2     0.95          0.99          0.4        0.08   0.1

您可以使用 Observable.if().

Observable.if(condition, thenSource, [elseSource]) 接受 3 个参数:

  1. 第一个参数接受一个 returns 布尔值的函数,
  2. 第二个参数是一个 observable 如果条件为真
  3. 最后一个是 else 源数组,如果条件为假,则将被发射。

因为你有两个 observables numbers$colors$,你可以编写一个条件函数并决定要发射哪些 observables。

Observable.if(
    () => Math.random() > 0.5, //function that determines your condition
    number$, //this observable will be emitted if the above function returns true 
    color$ //this will be emitted if false
    )
.subscribe(x => console.log(x));

工作 JSBin.

编辑:

从评论中,我意识到我错误地理解了您的要求。对于您的情况,您可以使用 .expand() 运算符重复 returns 您选择的 Observable,并相应地延迟:

let test = Observable
.of(1)
.expand((prevData) => {
  let rand = Math.random();
  let number$ = Observable.of(1).delay(300);
  let color$ = Observable.of('red').delay(500);

  if (prevData === 1) {
    return rand > 0.9 ? color$ : number$;
  }
  else {
    return rand < 0.3 ? number$ : color$;
  }

})
.subscribe(x => console.log(x))

编辑工作JSBin