RxJS - 如何在没有多次回调的情况下积累数据并对其进行操作

RxJS - how to accumulate data and act on it, without multiple callbacks

所以这段代码运行了,但它在 .flatmap 中触发了回调 5 次:

        var i = 0;
        const values = {};
        return this.obsClient.take(5)
            .flatMap(v => {
                const time = Date.now();
                values[i] = {time: time, count: v.clientCount};
                console.log('values => ', values);
                i++;
                return Rx.Observable.timer(100)
            });

"values => x" 将被记录 5 次。

我想也许如果我执行以下操作,它会在触发 flatMap 回调之前累积所有 5 条数据:

      var i = 0;
      const values = {};
      return this.obsClient.take(5).takeLast(5)
        .flatMap(v => {
            const time = Date.now();
            values[i] = {time: time, count: v.clientCount};
            console.log('values => ', values);
            i++;
            return Rx.Observable.timer(100)
        });

但它仍然记录 "values => x" 5 次。 如何累积数据,并将所有数据传递给回调,避免触发回调 5 次?

您可能正在寻找 bufferCount:

return this.obsClient.bufferCount(5).flatMap(buffer => {
  console.log(`buffer => ${buffer}`);
  return Rx.Observable.timer(100);
});

Buffers the source Observable values until the size hits the maximum bufferSize given.

如果您一次只想要序列中的最后五个项目:

  return this.obsClient
              .takeLast(5)
              .toArray()
              .flatMap(arr => { /*arr will be an array of the last five items in the sequence */ })