将 Observables 分组,然后按计数过滤
Group Observables and then filter by count
我在过滤基于 groupBy
可观察对象的可观察对象时遇到问题。
这是一个基于 groupBy()
文档的示例:
var codes = [
{ keyCode: 38}, // up
{ keyCode: 38}, // up
{ keyCode: 40}, // down
{ keyCode: 40}, // down
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 66}, // b
{ keyCode: 65} // a
];
var source = Rx.Observable.from(codes)
.groupBy(
function (x) { return x.keyCode; },
function (x) { return x.keyCode; }
)
.flatMap(obs =>
obs.count()
.filter(x => x > 1)
.flatMap(obs.toArray())
)
source.subscribe(console.log)
这个returns:
[]
[]
[]
[]
所以,Observables 被过滤了,但是它们的值消失了。
将可观察对象作为单个对象(例如,数组)而不是异步值序列进行推理是很棘手的。这里发生的是:
你获取 obs
observable 然后你应用 count
到它
count
将等待 observable 完成并输出一个值,即元素的数量,然后完成。
您过滤了该值,但随后您将 toArray
应用于已经完成的 obs
。这就是为什么你得到一个空数组。
所以这里的问题是相同的 obs
变量在不同的时间是不同的东西,所以当你在程序的不同点重用同一个变量时,你无法避免对时间进行推理。
现在,如果你想得到 subscribe
的计数,你应该可以直接写 obs.count().filter(x => x > 1).toArray()
而不是你现在拥有的。 flatMap
除了 observables 之外,还接受 promise 和数组作为选择器函数的 return 值:
比照。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md
更新:
考虑到评论,这就是我想出的(待测试):
var source = Rx.Observable
.from(codes)
.scan(function ( acc, code ) {
acc[code] = acc[code] + 1;
acc.latest = code;
return acc;
}, {})
.filter(function ( acc ) {return acc[acc.latest] > 1;})
.flatMap(function ( acc ) {return acc[acc.latest] == 2? Rx.Observable.repeat(acc.latest, 2) : Rx.just(acc.latest);})
如果您可以等待处理所有值,则可以在筛选后停止上述操作并添加 .last()
。这将为您提供最后一个具有所有值计数的累加器。从那里你可以做你想做的事。
我找到了类似于
的解决方案
var filtered = source
.flatMap(obs => {
var accumulated = obs.scan((a,b) => a.concat(b), [])
return obs
.scan((a) => a + 1, 0)
.filter(i => i > 1)
.withLatestFrom(
accumulated,
(a, b) => Rx.Observable.from(b)
)
})
.flatMap(d => d)
.subscribe(d => console.log(d))
我在过滤基于 groupBy
可观察对象的可观察对象时遇到问题。
这是一个基于 groupBy()
文档的示例:
var codes = [
{ keyCode: 38}, // up
{ keyCode: 38}, // up
{ keyCode: 40}, // down
{ keyCode: 40}, // down
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 66}, // b
{ keyCode: 65} // a
];
var source = Rx.Observable.from(codes)
.groupBy(
function (x) { return x.keyCode; },
function (x) { return x.keyCode; }
)
.flatMap(obs =>
obs.count()
.filter(x => x > 1)
.flatMap(obs.toArray())
)
source.subscribe(console.log)
这个returns:
[]
[]
[]
[]
所以,Observables 被过滤了,但是它们的值消失了。
将可观察对象作为单个对象(例如,数组)而不是异步值序列进行推理是很棘手的。这里发生的是:
你获取
obs
observable 然后你应用count
到它count
将等待 observable 完成并输出一个值,即元素的数量,然后完成。您过滤了该值,但随后您将
toArray
应用于已经完成的obs
。这就是为什么你得到一个空数组。
所以这里的问题是相同的 obs
变量在不同的时间是不同的东西,所以当你在程序的不同点重用同一个变量时,你无法避免对时间进行推理。
现在,如果你想得到 subscribe
的计数,你应该可以直接写 obs.count().filter(x => x > 1).toArray()
而不是你现在拥有的。 flatMap
除了 observables 之外,还接受 promise 和数组作为选择器函数的 return 值:
比照。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md
更新: 考虑到评论,这就是我想出的(待测试):
var source = Rx.Observable
.from(codes)
.scan(function ( acc, code ) {
acc[code] = acc[code] + 1;
acc.latest = code;
return acc;
}, {})
.filter(function ( acc ) {return acc[acc.latest] > 1;})
.flatMap(function ( acc ) {return acc[acc.latest] == 2? Rx.Observable.repeat(acc.latest, 2) : Rx.just(acc.latest);})
如果您可以等待处理所有值,则可以在筛选后停止上述操作并添加 .last()
。这将为您提供最后一个具有所有值计数的累加器。从那里你可以做你想做的事。
我找到了类似于
var filtered = source
.flatMap(obs => {
var accumulated = obs.scan((a,b) => a.concat(b), [])
return obs
.scan((a) => a + 1, 0)
.filter(i => i > 1)
.withLatestFrom(
accumulated,
(a, b) => Rx.Observable.from(b)
)
})
.flatMap(d => d)
.subscribe(d => console.log(d))