ReactiveX:Group and Buffer only last item in each group

ReactiveX: Group and Buffer only last item in each group

如何对一个 Observable 进行分组,并从每个 GroupedObservable 中仅在内存中保留最后发出的项目? 这样每个组的行为就像 BehaviorSubject 一样。

像这样:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

所以在内存中我们只有每个 user:

的最后一项
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

当订阅者订阅时,这两个项目会立即发布(每个都有自己的发射)。就像我们每个 user.

都有 BehaviorSubject

永远不会触发 onCompleted(),因为人们可能会永远聊天。

我事先不知道可以有哪些 user 值。

您可以编写减少分组的可观察对象的最新发射项的减少函数,将其传递给 scan 可观察对象,并使用 shareReplay 为新订阅者召回最后发射的值。应该是这样的:

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

所以你订阅的任何时候得到的是一个数组,它的大小在任何时候都是发送消息的用户数,其内容是按发送时间排序的用户发送的消息。

只要有订阅,最新的数组就会立即传递给订阅者。虽然那是一个数组,但我想不出一种方法来一个一个地传递值,同时满足您的规范。希望这足以满足您的用例。

更新:这里是 jsbin http://jsfiddle.net/zs7ydw6b/2

我假设您的聊天记录 observable 很热门。因此,#groupBy 发出的 groupObservables 也会很热,不会自行在内存中保留任何内容。

要获得您想要的行为(丢弃订阅前的最后一个值以外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。

如有错误请指正

jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });