将 Observables 分组,然后按计数过滤

Group Observables and then filter by count

我在过滤基于 groupBy 可观察对象的可观察对象时遇到问题。

这是一个基于 groupBy() 文档的示例:

var codes = [
  { keyCode: 38}, // up
  { keyCode: 38}, // up
  { keyCode: 40}, // down
  { keyCode: 40}, // down
  { keyCode: 37}, // left
  { keyCode: 39}, // right
  { keyCode: 37}, // left
  { keyCode: 39}, // right
  { keyCode: 66}, // b
  { keyCode: 65}  // a
];

var source = Rx.Observable.from(codes)
  .groupBy(
    function (x) { return x.keyCode; },
    function (x) { return x.keyCode; }
  )
  .flatMap(obs =>
    obs.count()
      .filter(x => x > 1)
      .flatMap(obs.toArray())
  )

source.subscribe(console.log)

这个returns:

[]
[]
[]
[]

所以,Observables 被过滤了,但是它们的值消失了。

将可观察对象作为单个对象(例如,数组)而不是异步值序列进行推理是很棘手的。这里发生的是:

  • 你获取 obs observable 然后你应用 count 到它

    count 将等待 observable 完成并输出一个值,即元素的数量,然后完成。

  • 您过滤了该值,但随后您将 toArray 应用于已经完成的 obs。这就是为什么你得到一个空数组。

所以这里的问题是相同的 obs 变量在不同的时间是不同的东西,所以当你在程序的不同点重用同一个变量时,你无法避免对时间进行推理。

现在,如果你想得到 subscribe 的计数,你应该可以直接写 obs.count().filter(x => x > 1).toArray() 而不是你现在拥有的。 flatMap 除了 observables 之外,还接受 promise 和数组作为选择器函数的 return 值:

比照。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md

更新: 考虑到评论,这就是我想出的(待测试):

var source = Rx.Observable
    .from(codes)
    .scan(function ( acc, code ) {
            acc[code] = acc[code] + 1;
            acc.latest = code;
            return acc;
          }, {})
    .filter(function ( acc ) {return acc[acc.latest] > 1;})
    .flatMap(function ( acc ) {return acc[acc.latest] == 2? Rx.Observable.repeat(acc.latest, 2) : Rx.just(acc.latest);})

如果您可以等待处理所有值,则可以在筛选后停止上述操作并添加 .last()。这将为您提供最后一个具有所有值计数的累加器。从那里你可以做你想做的事。

我找到了类似于

的解决方案
  var filtered = source
    .flatMap(obs => {
      var accumulated = obs.scan((a,b) => a.concat(b), [])
      return obs
        .scan((a) => a + 1, 0)
        .filter(i => i > 1)
        .withLatestFrom(
          accumulated, 
          (a, b) => Rx.Observable.from(b)
        )
    })
    .flatMap(d => d)
    .subscribe(d => console.log(d))