RxJS bufferWithCount() 不会暂停超时
RxJS bufferWithCount() not pausing for timeout
我正在尝试控制慢速订阅者的流入。在 NodeJS 中尝试了以下
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var commJson = xmlNodeStream.bufferWithCount(2).publish();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});
commJson.connect();
当我 run the above code 时,我希望慢速订阅者每次 在收到下一个数据批次之前暂停 5 秒。
但这并没有发生。在最初的 5 秒延迟后,所有数据被 淹没 到 slowSubscriber
,每批 2.
什么是控制流入的正确方法,以便慢订阅者可以慢慢来(最好是快订阅者可以等待慢订阅者完成)?
它没有暂停,因为 setTimeout
不会阻止执行它只是安排工作在以后完成,即 2 秒后,然后更多数据进来,它被安排为 2 秒 +从现在开始的一些小三角洲。结果是快订阅者和慢订阅者会同时完成,但是慢订阅者的结果要等到 2 秒后才能可视化。
如果您的实际用例中的慢速订阅者确实是非阻塞的,那么您有两种选择来控制事件流,要么您需要从消息源控制流,无论在哪里是。或者您需要使用一种背压运算符,例如 controlled()
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () {
console.log("============\nSlowsub called: ", x);
controller.request(1);
}, 5000);
});
commJson.request(1);
我正在尝试控制慢速订阅者的流入。在 NodeJS 中尝试了以下
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var commJson = xmlNodeStream.bufferWithCount(2).publish();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});
commJson.connect();
当我 run the above code 时,我希望慢速订阅者每次 在收到下一个数据批次之前暂停 5 秒。
但这并没有发生。在最初的 5 秒延迟后,所有数据被 淹没 到 slowSubscriber
,每批 2.
什么是控制流入的正确方法,以便慢订阅者可以慢慢来(最好是快订阅者可以等待慢订阅者完成)?
它没有暂停,因为 setTimeout
不会阻止执行它只是安排工作在以后完成,即 2 秒后,然后更多数据进来,它被安排为 2 秒 +从现在开始的一些小三角洲。结果是快订阅者和慢订阅者会同时完成,但是慢订阅者的结果要等到 2 秒后才能可视化。
如果您的实际用例中的慢速订阅者确实是非阻塞的,那么您有两种选择来控制事件流,要么您需要从消息源控制流,无论在哪里是。或者您需要使用一种背压运算符,例如 controlled()
var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();
var FastSubscriber = commJson.subscribe(
function (x) { console.log('----------\nFastSub: onNext: %s', x); },
function (e) { console.log('FastSub: onError: %s', e); },
function () { console.log('FastSub: onCompleted'); });
var slowSubscriber = commJson.subscribe(function (x) {
setTimeout(function () {
console.log("============\nSlowsub called: ", x);
controller.request(1);
}, 5000);
});
commJson.request(1);