创建一个通过过滤器的连续值流

Create a stream of consecutive values that pass a filter

假设我有一个数字流

---1-----1----1----0----0----1----1----0--->

我想得到一个新的数组流,其中包含像这样的连续 1

---[1,1,1]---[1,1]--->

我虽然使用了扫描函数,但它只发出一个值,我读到了有关 bufferToggle 的信息,但文档只将它与定时可观察对象一起使用。有什么功能可以做到这一点吗?

一种可能的方法是将 scanpairwise 运算符一起使用。

使用成对的方法,您可以比较第 N-1 次发射与第 N 次发射。

console.clear();
var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);

source
  .concat(Rx.Observable.of(0)) // for the case when source completes with 1
  .scan(function(acc, x) {
    // reset accumulator to an empty array when a 0 is encountered
    if (x === 0) {
      return [];
    } else {
      return acc.concat([x]);
    }
  }, [])
  .pairwise()
  // filter when the accumulated value goes from a length greater than 0
  // to 0 then you know you've hit 0
  .filter((pair) => pair[0].length > 0 && pair[1].length === 0)
  // take the first element of the pair since it has the result you're interested in
  .map((pair) => pair[0])
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>

可以使用源本身作为 bufferToggle 的信号 - 它使用 observables 来控制缓冲区的打开和关闭。

但是,您必须注意订阅源的顺序。特别是,用于指示缓冲区打开和关闭的可观察对象必须在 bufferToggle 可观察对象订阅之前被订阅。

这可以使用 publishSubject 来完成,如下所示:

const source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
const published = source.publish();

const signal = new Rx.Subject();
published.subscribe(signal);

const buffered = published.bufferToggle(
  signal
    .startWith(0)
    .pairwise()
    .filter(([prev, last]) => (prev === 0) && (last === 1)),
  () => signal.filter(value => value === 0)
);

buffered.subscribe(value => console.log(value));
published.connect();
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Subject 是必需的,因为每次发出值时,都会对用于指示缓冲区关闭的可观察对象进行订阅。

我应该补充一点,我提交这个答案并不是因为我认为它比其他答案更好,而是为了表明可以使用 buffer (and window ) 运算符,信号来自源。必须小心一些,仅此而已。使用 RxJS,通常有不止一种方法来做某事。

此外,创建一个 re-usable, lettable/pipeable operator 来帮助解决这些情况也很容易。