RXJS bufferedAmount / 累加值 / 减少重置
RXJS bufferedAmount / accumulate value / reduce with reset
我有一个值流
[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]
我想把它变成
[ 1 2 1 ]
因此,我们正在累积第一个流的值,直到达到整数(至少 1),然后在累积剩余数量的同时发出整数。
这让我大吃一惊,认为 switchMap 的解决方案指日可待。
这是我的方法:
src$ = src$.pipe(publish());
const wholeNumber$ = src$.pipe(
scan(
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
),
map(v => (v | 0)),
filter(v => v >= 1),
);
src$.pipe(
buffer(wholeNumber$)
).subscribe();
publish
将确保源不会被多次订阅。它也是 multicast(new Subject())
的简写形式,它基本上是一种多播源的方式。为了使其工作,src$
必须异步发出,以便使用中的 Subject
正确注册其订阅者(wholeNumber$
和另一个订阅者)。
如果源不异步发出,您可以强制它使用src$.pipe(observeOn(asapScheduler))
这样做,这将安排每个通知作为承诺。
让我们仔细看看提供给scan
的cb:
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`
number | 0
等同于 Math.trunc(number)
.
在+(acc - (acc | 0)).toPrecision(1)
中:
- 当您执行
1.2 - 1
时,您将得到:0.199...96
; toPrecision(1)
:(1.2 - 1).toPrecision(1)
= "0.2"
。 +
将得到 0.2
作为数字。
谢谢安德烈,
我已经使用了您的代码并将其更改为自定义运算符。
export const bufferAmount = (
amount: number
): MonoTypeOperatorFunction<number> => (
source: Observable<number>
): Observable<number> =>
new Observable<number>((observer) => {
let bufferSum = 0;
return source.subscribe({
next(value) {
bufferSum += value;
if (bufferSum < amount) return;
const nextSum = Math.trunc(bufferSum);
bufferSum -= nextSum;
observer.next(nextSum);
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
ticker.pipe(bufferAmount(1)).subscribe();
我有一个值流
[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]
我想把它变成
[ 1 2 1 ]
因此,我们正在累积第一个流的值,直到达到整数(至少 1),然后在累积剩余数量的同时发出整数。
这让我大吃一惊,认为 switchMap 的解决方案指日可待。
这是我的方法:
src$ = src$.pipe(publish());
const wholeNumber$ = src$.pipe(
scan(
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
),
map(v => (v | 0)),
filter(v => v >= 1),
);
src$.pipe(
buffer(wholeNumber$)
).subscribe();
publish
将确保源不会被多次订阅。它也是 multicast(new Subject())
的简写形式,它基本上是一种多播源的方式。为了使其工作,src$
必须异步发出,以便使用中的 Subject
正确注册其订阅者(wholeNumber$
和另一个订阅者)。
如果源不异步发出,您可以强制它使用src$.pipe(observeOn(asapScheduler))
这样做,这将安排每个通知作为承诺。
让我们仔细看看提供给scan
的cb:
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`
number | 0
等同于 Math.trunc(number)
.
在+(acc - (acc | 0)).toPrecision(1)
中:
- 当您执行
1.2 - 1
时,您将得到:0.199...96
;toPrecision(1)
:(1.2 - 1).toPrecision(1)
="0.2"
。+
将得到0.2
作为数字。
谢谢安德烈,
我已经使用了您的代码并将其更改为自定义运算符。
export const bufferAmount = (
amount: number
): MonoTypeOperatorFunction<number> => (
source: Observable<number>
): Observable<number> =>
new Observable<number>((observer) => {
let bufferSum = 0;
return source.subscribe({
next(value) {
bufferSum += value;
if (bufferSum < amount) return;
const nextSum = Math.trunc(bufferSum);
bufferSum -= nextSum;
observer.next(nextSum);
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
ticker.pipe(bufferAmount(1)).subscribe();