RxJS 减少一个 ReplaySubject
RxJS reduce a ReplaySubject
我正在使用 ReactiveX/RxJS 版本。
假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个对象,其中包含一个 id 和一个带有值的数组。我想减少这个值数组并得到它们的总和。
问题是 ReplaySubject 是一个热可观察对象,它永远不会完成,至少我不希望它完成,因为我想要每 2 秒计算一次该对象值的总和。但是为了使用 reduce 运算符,应该完成 observable。那么,我该如何进行呢?
E.G 无效代码:
var subject = new Rx.ReplaySubject();
subject.
map(x => x.transactions).
// Reduce never concludes because ReplaySubject instance is not completed
reduce((v1, v2) => v1+v2, 0).
subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.next({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
考虑使用 Observable.prototype.scan()
(RxJS documentation)
scan()
基本上聚合了一个 observable 并发出每个连续的值,不像 reduce()
只在完成时发出结果。 (参见 scan and reduce 的 Rx 解释)
使用 OP 代码的示例(这里是 fiddle):
var subject = new Rx.ReplaySubject();
subject
// note: use "selectMany" to flatten observable of observables
.selectMany(x => Rx.Observable.fromArray(x.transactions))
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
另一个例子:
由于selectMany()
,上面的代码发出了每个事务的聚合。如果你只希望它每 2 秒发出一次,这是使用 reduce()
的好时机(这是另一个 fiddle):
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});
补充说明:
Rx 主题可以完成;您只需在准备就绪后致电 onCompleted()
。如果您使主题完整,您仍然可以使用 reduce()
。将此 fiddle 与上面的进行比较。
我正在使用 ReactiveX/RxJS 版本。
假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个对象,其中包含一个 id 和一个带有值的数组。我想减少这个值数组并得到它们的总和。
问题是 ReplaySubject 是一个热可观察对象,它永远不会完成,至少我不希望它完成,因为我想要每 2 秒计算一次该对象值的总和。但是为了使用 reduce 运算符,应该完成 observable。那么,我该如何进行呢?
E.G 无效代码:
var subject = new Rx.ReplaySubject();
subject.
map(x => x.transactions).
// Reduce never concludes because ReplaySubject instance is not completed
reduce((v1, v2) => v1+v2, 0).
subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.next({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
考虑使用 Observable.prototype.scan()
(RxJS documentation)
scan()
基本上聚合了一个 observable 并发出每个连续的值,不像 reduce()
只在完成时发出结果。 (参见 scan and reduce 的 Rx 解释)
使用 OP 代码的示例(这里是 fiddle):
var subject = new Rx.ReplaySubject();
subject
// note: use "selectMany" to flatten observable of observables
.selectMany(x => Rx.Observable.fromArray(x.transactions))
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
另一个例子:
由于selectMany()
,上面的代码发出了每个事务的聚合。如果你只希望它每 2 秒发出一次,这是使用 reduce()
的好时机(这是另一个 fiddle):
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});
补充说明:
Rx 主题可以完成;您只需在准备就绪后致电 onCompleted()
。如果您使主题完整,您仍然可以使用 reduce()
。将此 fiddle 与上面的进行比较。