如何让 .scan() 只聚合未完成序列中最近发出的数组
How to have .scan() only aggregate the most recent emitted array in an uncompleted sequence
我需要转换和聚合可观察数组中的元素。该序列是开放的,可能永远不会完成(基于网络)。
我目前正在使用以下代码:
const numbers = [1,2,3];
const numbers$ = new Rx.Subject();
const output = numbers$
.flatMap(n => n)
.map(n => n*n)
.scan((acc, x) => acc.concat([x]), [])
.subscribe(n => { console.log(n); });
numbers$.next(numbers);
setTimeout(() => {
numbers$.next([5,6,7])
}, 1000);
目前发射了多个数组,目前最后发射的值为[1, 4, 9, 25, 36, 49]
。但是,我只希望在同一输入数组 中 的那些值进行平方。
即我需要输出 observable 正好发出两个数组:[1,4,9]
和 [25, 36, 49]
.
如何操作?
这应该是您要找的:
const output = numbers$
.map((a) => a.map(n => n * n))
.subscribe(n => { console.log(n); });
编辑:如果你不想使用Array.map
,你可以使用RxJS。
您可以将 Array.map
替换为对值进行平方并将它们缩减回数组的可观察对象。这可以扩展为包括 distinct
或其他 RxJS 运算符(根据您的评论):
const output = numbers$
.mergeMap(
(a) => Rx.Observable.from(a)
.map((n) => n * n)
.reduce((acc, n) => { acc.push(n); return acc; }, [])
)
.subscribe(n => { console.log(n); });
您发布的解决方案:
const output = numbers$
.flatMap(n => n)
.map(n => n*n)
.buffer(numbers$.delay(1))
.subscribe(n => { console.log(n); });
是基于时间的,数组没有被合并的唯一原因是第一次 next
调用和第二次调用之间的间隔超过一毫秒。
看起来这就是您需要的:jsbin
const numbers = [1,2,3];
const numbers$ = new Rx.Subject();
const output = numbers$
.map(numbers => numbers.map(n => n * n))
.scan((acc, x) => acc.concat([x]), [])
.subscribe(n => { console.log(n); });
numbers$.next(numbers);
setTimeout(() => {
numbers$.next([5,6,7])
}, 1000);
我需要转换和聚合可观察数组中的元素。该序列是开放的,可能永远不会完成(基于网络)。
我目前正在使用以下代码:
const numbers = [1,2,3];
const numbers$ = new Rx.Subject();
const output = numbers$
.flatMap(n => n)
.map(n => n*n)
.scan((acc, x) => acc.concat([x]), [])
.subscribe(n => { console.log(n); });
numbers$.next(numbers);
setTimeout(() => {
numbers$.next([5,6,7])
}, 1000);
目前发射了多个数组,目前最后发射的值为[1, 4, 9, 25, 36, 49]
。但是,我只希望在同一输入数组 中 的那些值进行平方。
即我需要输出 observable 正好发出两个数组:[1,4,9]
和 [25, 36, 49]
.
如何操作?
这应该是您要找的:
const output = numbers$
.map((a) => a.map(n => n * n))
.subscribe(n => { console.log(n); });
编辑:如果你不想使用Array.map
,你可以使用RxJS。
您可以将 Array.map
替换为对值进行平方并将它们缩减回数组的可观察对象。这可以扩展为包括 distinct
或其他 RxJS 运算符(根据您的评论):
const output = numbers$
.mergeMap(
(a) => Rx.Observable.from(a)
.map((n) => n * n)
.reduce((acc, n) => { acc.push(n); return acc; }, [])
)
.subscribe(n => { console.log(n); });
您发布的解决方案:
const output = numbers$
.flatMap(n => n)
.map(n => n*n)
.buffer(numbers$.delay(1))
.subscribe(n => { console.log(n); });
是基于时间的,数组没有被合并的唯一原因是第一次 next
调用和第二次调用之间的间隔超过一毫秒。
看起来这就是您需要的:jsbin
const numbers = [1,2,3];
const numbers$ = new Rx.Subject();
const output = numbers$
.map(numbers => numbers.map(n => n * n))
.scan((acc, x) => acc.concat([x]), [])
.subscribe(n => { console.log(n); });
numbers$.next(numbers);
setTimeout(() => {
numbers$.next([5,6,7])
}, 1000);