我如何创建一个不会在每次恢复时重置的 RXJS 可暂停数据流?
how do i create a RXJS pausable data flow that doesnt reset on each resume?
我已经尝试了 RXJS pausable 的文档示例,当它暂停时,它会在恢复时重置。我如何修改下面的示例,让我的流从我暂停的地方恢复而不是重置?
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(1000)
.timeInterval()
.pausable(pauser);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
pauser.onNext(true);
var paused = false;
$("#result").click(function() {
$(this).append("mouse clicked");
paused = (paused === false) ? true : false;
pauser.onNext(paused);
});
这给了我以下输出:
Next: {"value":0,"interval":1002}
Next: {"value":1,"interval":1000}
Next: {"value":2,"interval":999}
mouse clicked
mouse clicked
Next: {"value":0,"interval":1001}
Next: {"value":1,"interval":999}
Next: {"value":2,"interval":1000}
如pausable
文档中所述,pausable
将用于热源。
使源成为热点的一种方法是使用 share
。但是,这不会像与 pausable
一起使用一样工作,因为 share
将在没有订阅者时断开其源,这将在您暂停时发生。
所以这里有两种方法可以完成这项工作。一种是使用 share
并保留一个虚拟订阅者,这样 share
永远不会断开与其来源的连接,因为总会有至少一个订阅者。第二种方法是使用 publish
,并且在完成所有接线后使用 connect
observable。
Example 1 虚拟订阅者:
var pauser = new Rx.Subject();
function noop(){}
var source = Rx.Observable
.interval(1000)
.timeInterval()
.share();
var pausableSource = source.pausable(pauser);
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
source.subscribe(noop);
pauser.onNext(false);
var paused = false;
$("#result").click(function() {
$("#ta_change").append("mouse clicked\n");
paused = !paused;
pauser.onNext(paused);
});
Example 2 与 connect
:
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(1000)
.timeInterval()
.publish();
var pausableSource = source.pausable(pauser);
// source.subscribe(function(){});
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
source.connect();
pauser.onNext(false);
var paused = false;
$("#result").click(function() {
$("#ta_change").append("mouse clicked\n");
paused = !paused;
pauser.onNext(paused);
});
我已经尝试了 RXJS pausable 的文档示例,当它暂停时,它会在恢复时重置。我如何修改下面的示例,让我的流从我暂停的地方恢复而不是重置?
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(1000)
.timeInterval()
.pausable(pauser);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
pauser.onNext(true);
var paused = false;
$("#result").click(function() {
$(this).append("mouse clicked");
paused = (paused === false) ? true : false;
pauser.onNext(paused);
});
这给了我以下输出:
Next: {"value":0,"interval":1002}
Next: {"value":1,"interval":1000}
Next: {"value":2,"interval":999}
mouse clicked
mouse clicked
Next: {"value":0,"interval":1001}
Next: {"value":1,"interval":999}
Next: {"value":2,"interval":1000}
如pausable
文档中所述,pausable
将用于热源。
使源成为热点的一种方法是使用 share
。但是,这不会像与 pausable
一起使用一样工作,因为 share
将在没有订阅者时断开其源,这将在您暂停时发生。
所以这里有两种方法可以完成这项工作。一种是使用 share
并保留一个虚拟订阅者,这样 share
永远不会断开与其来源的连接,因为总会有至少一个订阅者。第二种方法是使用 publish
,并且在完成所有接线后使用 connect
observable。
Example 1 虚拟订阅者:
var pauser = new Rx.Subject();
function noop(){}
var source = Rx.Observable
.interval(1000)
.timeInterval()
.share();
var pausableSource = source.pausable(pauser);
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
source.subscribe(noop);
pauser.onNext(false);
var paused = false;
$("#result").click(function() {
$("#ta_change").append("mouse clicked\n");
paused = !paused;
pauser.onNext(paused);
});
Example 2 与 connect
:
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(1000)
.timeInterval()
.publish();
var pausableSource = source.pausable(pauser);
// source.subscribe(function(){});
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + JSON.stringify(x) + '<br>');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
source.connect();
pauser.onNext(false);
var paused = false;
$("#result").click(function() {
$("#ta_change").append("mouse clicked\n");
paused = !paused;
pauser.onNext(paused);
});