rxjs 的条件发射延迟

Conditional emission delays with rxjs

从图片到代码?

如何从 Data 和 Gates 获取 Out observable?

const gate$ = Rx.Observable.interval(2000)
                           .map(_ => Math.random() >= 0.5)
                           .filter(_ => _)


const data$ = Rx.Observable.interval(500)
                            .map(_ => "data"+ _)
                            .buffer(gate$)
                            .flatMap(_ => Rx.Observable.from(_))

data$.subscribe(_ => console.log(_))                          

门流产生随机的真值和假值(例如 n/w 是向上或向下)。我们只从这个流中发出真值

根据这个流的真实值,我们缓冲我们的数据流。

参见 fiddle - fiddle。不要忘记打开浏览器控制台:)

据我了解,当 gates$ 发出 true 时,您需要 data$,并且缓冲 data$ 否则,当 gates$ 再次发出 true 时结束,所以诸如此类:

out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))

假设:data$gates$ 是热流(参见此处的含义 )。

这没有经过测试,但请尝试一下,让我们知道它是否确实有效(或者用您所说的代码证明它:-)。逻辑看起来没问题,我只是不确定可重入gates$。希望来自 buffer 的内部 gates$ 订阅在外部订阅之前触发。如果没有发生这种情况,您将看到与网络停机时间相对应的数据发射暂停。

好吧,如果这不起作用,那么 scan 的标准解决方案就可以了。您寻求的行为可以表示为一个(微型)状态机,具有两个状态:passthroughbuffering。您可以使用 scan.

实现所有此类状态机

这里是 scan 解决方案:https://jsfiddle.net/1znvwyzc/

const gates$ = Rx.Observable.interval(2000)
                            .map(_ => Math.random() >= 0.5)
                            .map(x => ({gates: x}))
                            .share()

const data$ = Rx.Observable.interval(500)
                           .map(_ => "data"+ _)
                           .map(x => ({data: x}))                           
                           .share()

const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
  if (acc.controlState === 'passthrough'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'passthrough',
        bufferedData : [],
        out : val.data
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates passing from true to true -> no changes to perform
        return {
        controlState : 'passthrough',
        bufferedData : [],
        out : null
        }
      } else {
        // gates passing from true to false, switch control state
        return {
        controlState : 'buffered',
        bufferedData : [],
        out : null        
        }
      }      
    }
  }
  if (acc.controlState === 'buffered'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'buffered',
        bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
        out : null              
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates from false to true -> switch control state and pass the buffered data
        return {
          controlState : 'passthrough',
          bufferedData : [],
          out : acc.bufferedData              
        }
      } else {
        // gates from false to false -> nothing to do
        return {
          controlState : 'buffered',
          bufferedData : acc.bufferedData,
          out : null                    
        }
      }
    }
  }
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))

out$.subscribe(_ => console.log(_))   

您可以在此处看到完全相同的技术:

受对此 post 的贡献的启发,以下似乎产生了所需的行为:

const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)

它可能会变得更简单,在https://jsfiddle.net/KristjanLaane/6kbgnp41/

玩一玩

另一种有条件地延迟数据 $ 的方法是使用 delayWhen() 这样的:

const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
  .pipe(delayWhen(triggerF))              
  .subscribe( (v) => console.log(v));

// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);