ReactiveX:Group and Buffer only last item in each group
ReactiveX: Group and Buffer only last item in each group
如何对一个 Observable 进行分组,并从每个 GroupedObservable 中仅在内存中保留最后发出的项目?
这样每个组的行为就像 BehaviorSubject 一样。
像这样:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
所以在内存中我们只有每个 user
:
的最后一项
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
当订阅者订阅时,这两个项目会立即发布(每个都有自己的发射)。就像我们每个 user
.
都有 BehaviorSubject
永远不会触发 onCompleted(),因为人们可能会永远聊天。
我事先不知道可以有哪些 user
值。
您可以编写减少分组的可观察对象的最新发射项的减少函数,将其传递给 scan
可观察对象,并使用 shareReplay
为新订阅者召回最后发射的值。应该是这样的:
var fn_scan = function ( aMessages, message ) {
// aMessages is the latest array of messages
// this function will update aMessages to reflect the arrival of the new message
var aUsers = aMessages.map(function ( x ) {return x.user;});
var index = aUsers.indexOf(message.user);
if (index > -1) {
// remove previous message from that user...
aMessages.splice(index, 1);
}
// ...and push the latest message
aMessages.push(message);
return aMessages;
};
var groupedLatestMessages$ = messages$
.scan(fn_scan, [])
.shareReplay(1);
所以你订阅的任何时候得到的是一个数组,它的大小在任何时候都是发送消息的用户数,其内容是按发送时间排序的用户发送的消息。
只要有订阅,最新的数组就会立即传递给订阅者。虽然那是一个数组,但我想不出一种方法来一个一个地传递值,同时满足您的规范。希望这足以满足您的用例。
更新:这里是 jsbin http://jsfiddle.net/zs7ydw6b/2
我假设您的聊天记录 observable 很热门。因此,#groupBy 发出的 groupObservables 也会很热,不会自行在内存中保留任何内容。
要获得您想要的行为(丢弃订阅前的最后一个值以外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。
如有错误请指正
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});
如何对一个 Observable 进行分组,并从每个 GroupedObservable 中仅在内存中保留最后发出的项目? 这样每个组的行为就像 BehaviorSubject 一样。
像这样:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
所以在内存中我们只有每个 user
:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
当订阅者订阅时,这两个项目会立即发布(每个都有自己的发射)。就像我们每个 user
.
永远不会触发 onCompleted(),因为人们可能会永远聊天。
我事先不知道可以有哪些 user
值。
您可以编写减少分组的可观察对象的最新发射项的减少函数,将其传递给 scan
可观察对象,并使用 shareReplay
为新订阅者召回最后发射的值。应该是这样的:
var fn_scan = function ( aMessages, message ) {
// aMessages is the latest array of messages
// this function will update aMessages to reflect the arrival of the new message
var aUsers = aMessages.map(function ( x ) {return x.user;});
var index = aUsers.indexOf(message.user);
if (index > -1) {
// remove previous message from that user...
aMessages.splice(index, 1);
}
// ...and push the latest message
aMessages.push(message);
return aMessages;
};
var groupedLatestMessages$ = messages$
.scan(fn_scan, [])
.shareReplay(1);
所以你订阅的任何时候得到的是一个数组,它的大小在任何时候都是发送消息的用户数,其内容是按发送时间排序的用户发送的消息。
只要有订阅,最新的数组就会立即传递给订阅者。虽然那是一个数组,但我想不出一种方法来一个一个地传递值,同时满足您的规范。希望这足以满足您的用例。
更新:这里是 jsbin http://jsfiddle.net/zs7ydw6b/2
我假设您的聊天记录 observable 很热门。因此,#groupBy 发出的 groupObservables 也会很热,不会自行在内存中保留任何内容。
要获得您想要的行为(丢弃订阅前的最后一个值以外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。
如有错误请指正
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});