如何使用 rxjs 存储扫描的累积结果

How to store accumulated result of a scan with rxjs

我有两个合并后的可观察对象,合并后进行了扫描。第一个是简单范围,另一个是主题。每当 Subject 发出带有 onNext 的新值时,我都会在扫描中连接该值和 return 新数组作为累加器。如果我处理我的订阅,然后再次订阅,它会重播范围内的值,但我丢失了主题中的值。在下面的代码中,我希望我的第二个订阅的最终值为 [1, 2, 3, 4, 5]

最好的方法是什么?现在我有另一个主题,我在其中存储最终值并订阅它,但它 感觉 错误。

这是一个演示正在发生的事情的简单版本:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});

这输出:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]

根据您的描述,您似乎在寻找 BehaviorSubjectReplaySubject(或它们相应的运算符 publishValue()replay()),而不是 scan.

您可以使用以下方式在每次重新访问页面时连接您的状态,(既然您提到了它,我将使用待办事项应用示例):

var todos = buttonClicked
  .map(function(e) { return newTodoBox.text(); })
   //This will preserve the state of the todo list across subscriptions
  .replay();

var todoSubscription;

pageActivated.subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
                          //Add the items to the list
                          todos.subscribe(addItem),
                          //This enables the replay to actually start
                          //receiving values
                          todos.connect());
  /*Do other activation work*/
});

pageDeactivated.subscribe(function() {
   todoSubscription && todoSubscription.dispose();
   /*Do other clean up work*/
});

这是一个工作代码示例,我认为它反映了您正在寻找的内容,请注意,在这种情况下,我不需要扫描,因为链上的 replay() 会完全补充列表。您会注意到,如果您添加一些待办事项,然后断开连接并再次连接,这些项目将重新出现。

var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');

var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');

var connect = Rx.Observable.fromEvent($('#connect'), 'click');

var newTodoBox = $('#todos')[0];

var mylist = $('#mylist');

function addItem(e) {
  mylist.append('<li>' + e + '</li>');
}

var todos = buttonClicked
  .map(function(e) {
    return newTodoBox.value;
  })
  .replay();

var todoSubscription;

disconnect.subscribe(function() {
  mylist.empty();
  todoSubscription && todoSubscription.dispose();
});

connect.startWith(true).subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
    todos.subscribe(addItem),
    todos.connect());
});
<head>
  <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>

<body>
  <input id="todos" />
  <button id="add">Add Todo</button>
  <div>
    <ul id='mylist'></ul>
  </div>
  <div>
    <button id='disconnect'>Disconnect</button>
    <button id='connect'>Connect</button>
  </div>
</body>

Subject 是一个热门的 Observable,这就是为什么第二个订阅不会看到来自 Subject 的事件。 Observable 的范围是冷的,所以每个 "execution instance" 完全由每个订阅拥有。另一方面,Subject 的 "execution instance" 是单例且独立的,因此第二个订阅看不到事件。

有几种方法可以解决这个问题。

  1. 使用 ReplaySubject。您必须指定缓冲区大小。如果您对该缓冲区没有很好的限制,使用无限缓冲区可能会导致内存问题。
  2. 避免使用主题。换句话说,避免使用热 Observable,将其替换为冷 Observable,根据我一开始给出的问题描述,您的订阅不会有问题,它们会看到相同的事件。通常一个 Subject 可以被一个 Observable 代替,但这取决于你的整体架构。可能需要重写很多。而在最坏的情况下,比如 Observables 的循环依赖,你无法避免使用 Subject。
  3. 重新安排代码,使订阅在 Subject 开始发射之前开始,因此所有订阅都有机会看到 "live" 来自热 Observable 的发射。

但是,如果我对这个问题的解释是正确的,您只需要 merged 发出的最后一个事件,那么您可以使用备选方案 (1) 的变体,您只重播最后一个事件。这将是将 .shareReplay(1) 添加到 merged 的问题,这将使它成为一个热重播的 Observable。