如何使用 rxjs 存储扫描的累积结果
How to store accumulated result of a scan with rxjs
我有两个合并后的可观察对象,合并后进行了扫描。第一个是简单范围,另一个是主题。每当 Subject 发出带有 onNext
的新值时,我都会在扫描中连接该值和 return 新数组作为累加器。如果我处理我的订阅,然后再次订阅,它会重播范围内的值,但我丢失了主题中的值。在下面的代码中,我希望我的第二个订阅的最终值为 [1, 2, 3, 4, 5]
最好的方法是什么?现在我有另一个主题,我在其中存储最终值并订阅它,但它 感觉 错误。
这是一个演示正在发生的事情的简单版本:
var Rx = require('rx');
var source = Rx.Observable.range(1, 3);
var adder = new Rx.Subject();
var merged = source.merge(adder)
.scan([], function(accum, x) {
return accum.concat(x);
});
var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);
subscription1.dispose();
console.log('After Disposal');
var subscription2 = merged.subscribe(function(x) {console.log(x)});
这输出:
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
根据您的描述,您似乎在寻找 BehaviorSubject
或 ReplaySubject
(或它们相应的运算符 publishValue()
和 replay()
),而不是 scan
.
您可以使用以下方式在每次重新访问页面时连接您的状态,(既然您提到了它,我将使用待办事项应用示例):
var todos = buttonClicked
.map(function(e) { return newTodoBox.text(); })
//This will preserve the state of the todo list across subscriptions
.replay();
var todoSubscription;
pageActivated.subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
//Add the items to the list
todos.subscribe(addItem),
//This enables the replay to actually start
//receiving values
todos.connect());
/*Do other activation work*/
});
pageDeactivated.subscribe(function() {
todoSubscription && todoSubscription.dispose();
/*Do other clean up work*/
});
这是一个工作代码示例,我认为它反映了您正在寻找的内容,请注意,在这种情况下,我不需要扫描,因为链上的 replay()
会完全补充列表。您会注意到,如果您添加一些待办事项,然后断开连接并再次连接,这些项目将重新出现。
var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');
var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');
var connect = Rx.Observable.fromEvent($('#connect'), 'click');
var newTodoBox = $('#todos')[0];
var mylist = $('#mylist');
function addItem(e) {
mylist.append('<li>' + e + '</li>');
}
var todos = buttonClicked
.map(function(e) {
return newTodoBox.value;
})
.replay();
var todoSubscription;
disconnect.subscribe(function() {
mylist.empty();
todoSubscription && todoSubscription.dispose();
});
connect.startWith(true).subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
todos.subscribe(addItem),
todos.connect());
});
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>
<body>
<input id="todos" />
<button id="add">Add Todo</button>
<div>
<ul id='mylist'></ul>
</div>
<div>
<button id='disconnect'>Disconnect</button>
<button id='connect'>Connect</button>
</div>
</body>
Subject 是一个热门的 Observable,这就是为什么第二个订阅不会看到来自 Subject 的事件。 Observable 的范围是冷的,所以每个 "execution instance" 完全由每个订阅拥有。另一方面,Subject 的 "execution instance" 是单例且独立的,因此第二个订阅看不到事件。
有几种方法可以解决这个问题。
- 使用 ReplaySubject。您必须指定缓冲区大小。如果您对该缓冲区没有很好的限制,使用无限缓冲区可能会导致内存问题。
- 避免使用主题。换句话说,避免使用热 Observable,将其替换为冷 Observable,根据我一开始给出的问题描述,您的订阅不会有问题,它们会看到相同的事件。通常一个 Subject 可以被一个 Observable 代替,但这取决于你的整体架构。可能需要重写很多。而在最坏的情况下,比如 Observables 的循环依赖,你无法避免使用 Subject。
- 重新安排代码,使订阅在 Subject 开始发射之前开始,因此所有订阅都有机会看到 "live" 来自热 Observable 的发射。
但是,如果我对这个问题的解释是正确的,您只需要 merged
发出的最后一个事件,那么您可以使用备选方案 (1) 的变体,您只重播最后一个事件。这将是将 .shareReplay(1)
添加到 merged
的问题,这将使它成为一个热重播的 Observable。
我有两个合并后的可观察对象,合并后进行了扫描。第一个是简单范围,另一个是主题。每当 Subject 发出带有 onNext
的新值时,我都会在扫描中连接该值和 return 新数组作为累加器。如果我处理我的订阅,然后再次订阅,它会重播范围内的值,但我丢失了主题中的值。在下面的代码中,我希望我的第二个订阅的最终值为 [1, 2, 3, 4, 5]
最好的方法是什么?现在我有另一个主题,我在其中存储最终值并订阅它,但它 感觉 错误。
这是一个演示正在发生的事情的简单版本:
var Rx = require('rx');
var source = Rx.Observable.range(1, 3);
var adder = new Rx.Subject();
var merged = source.merge(adder)
.scan([], function(accum, x) {
return accum.concat(x);
});
var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);
subscription1.dispose();
console.log('After Disposal');
var subscription2 = merged.subscribe(function(x) {console.log(x)});
这输出:
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
根据您的描述,您似乎在寻找 BehaviorSubject
或 ReplaySubject
(或它们相应的运算符 publishValue()
和 replay()
),而不是 scan
.
您可以使用以下方式在每次重新访问页面时连接您的状态,(既然您提到了它,我将使用待办事项应用示例):
var todos = buttonClicked
.map(function(e) { return newTodoBox.text(); })
//This will preserve the state of the todo list across subscriptions
.replay();
var todoSubscription;
pageActivated.subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
//Add the items to the list
todos.subscribe(addItem),
//This enables the replay to actually start
//receiving values
todos.connect());
/*Do other activation work*/
});
pageDeactivated.subscribe(function() {
todoSubscription && todoSubscription.dispose();
/*Do other clean up work*/
});
这是一个工作代码示例,我认为它反映了您正在寻找的内容,请注意,在这种情况下,我不需要扫描,因为链上的 replay()
会完全补充列表。您会注意到,如果您添加一些待办事项,然后断开连接并再次连接,这些项目将重新出现。
var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');
var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');
var connect = Rx.Observable.fromEvent($('#connect'), 'click');
var newTodoBox = $('#todos')[0];
var mylist = $('#mylist');
function addItem(e) {
mylist.append('<li>' + e + '</li>');
}
var todos = buttonClicked
.map(function(e) {
return newTodoBox.value;
})
.replay();
var todoSubscription;
disconnect.subscribe(function() {
mylist.empty();
todoSubscription && todoSubscription.dispose();
});
connect.startWith(true).subscribe(function() {
todoSubscription = new Rx.CompositeDisposable(
todos.subscribe(addItem),
todos.connect());
});
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>
<body>
<input id="todos" />
<button id="add">Add Todo</button>
<div>
<ul id='mylist'></ul>
</div>
<div>
<button id='disconnect'>Disconnect</button>
<button id='connect'>Connect</button>
</div>
</body>
Subject 是一个热门的 Observable,这就是为什么第二个订阅不会看到来自 Subject 的事件。 Observable 的范围是冷的,所以每个 "execution instance" 完全由每个订阅拥有。另一方面,Subject 的 "execution instance" 是单例且独立的,因此第二个订阅看不到事件。
有几种方法可以解决这个问题。
- 使用 ReplaySubject。您必须指定缓冲区大小。如果您对该缓冲区没有很好的限制,使用无限缓冲区可能会导致内存问题。
- 避免使用主题。换句话说,避免使用热 Observable,将其替换为冷 Observable,根据我一开始给出的问题描述,您的订阅不会有问题,它们会看到相同的事件。通常一个 Subject 可以被一个 Observable 代替,但这取决于你的整体架构。可能需要重写很多。而在最坏的情况下,比如 Observables 的循环依赖,你无法避免使用 Subject。
- 重新安排代码,使订阅在 Subject 开始发射之前开始,因此所有订阅都有机会看到 "live" 来自热 Observable 的发射。
但是,如果我对这个问题的解释是正确的,您只需要 merged
发出的最后一个事件,那么您可以使用备选方案 (1) 的变体,您只重播最后一个事件。这将是将 .shareReplay(1)
添加到 merged
的问题,这将使它成为一个热重播的 Observable。