RXJS bufferedAmount / 累加值 / 减少重置

RXJS bufferedAmount / accumulate value / reduce with reset

我有一个值流

[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]

我想把它变成

[           1         2         1 ]

因此,我们正在累积第一个流的值,直到达到整数(至少 1),然后在累积剩余数量的同时发出整数。

这让我大吃一惊,认为 switchMap 的解决方案指日可待。

这是我的方法:

src$ = src$.pipe(publish());

const wholeNumber$ = src$.pipe(
  scan(
    (acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
  ),
  map(v => (v | 0)),
  filter(v => v >= 1),
);

src$.pipe(
  buffer(wholeNumber$)
).subscribe();

publish 将确保源不会被多次订阅。它也是 multicast(new Subject()) 的简写形式,它基本上是一种多播源的方式。为了使其工作,src$ 必须异步发出,以便使用中的 Subject 正确注册其订阅者(wholeNumber$ 和另一个订阅者)。

如果源不异步发出,您可以强制它使用src$.pipe(observeOn(asapScheduler))这样做,这将安排每个通知作为承诺。

让我们仔细看看提供给scan的cb:

(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`

number | 0 等同于 Math.trunc(number).

+(acc - (acc | 0)).toPrecision(1)中:

  • 当您执行 1.2 - 1 时,您将得到:0.199...96toPrecision(1)(1.2 - 1).toPrecision(1) = "0.2"+ 将得到 0.2 作为数字。

谢谢安德烈,

我已经使用了您的代码并将其更改为自定义运算符。


export const bufferAmount = (
  amount: number
): MonoTypeOperatorFunction<number> => (
  source: Observable<number>
): Observable<number> =>
  new Observable<number>((observer) => {
    let bufferSum = 0;
    return source.subscribe({
      next(value) {
        bufferSum += value;
        if (bufferSum < amount) return;
        const nextSum = Math.trunc(bufferSum);
        bufferSum -= nextSum;
        observer.next(nextSum);
      },
      error(error) {
        observer.error(error);
      },
      complete() {
        observer.complete();
      },
    });
  });

ticker.pipe(bufferAmount(1)).subscribe();