RxJS bufferWithCount() 不会暂停超时

RxJS bufferWithCount() not pausing for timeout

我正在尝试控制慢速订阅者的流入。在 NodeJS 中尝试了以下

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var commJson = xmlNodeStream.bufferWithCount(2).publish();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});

commJson.connect();

当我 run the above code 时,我希望慢速订阅者每次 在收到下一个数据批次之前暂停 5 秒。

但这并没有发生。在最初的 5 秒延迟后,所有数据被 淹没 slowSubscriber,每批 2.

什么是控制流入的正确方法,以便慢订阅者可以慢慢来(最好是快订阅者可以等待慢订阅者完成)?

它没有暂停,因为 setTimeout 不会阻止执行它只是安排工作在以后完成,即 2 秒后,然后更多数据进来,它被安排为 2 秒 +从现在开始的一些小三角洲。结果是快订阅者和慢订阅者会同时完成,但是慢订阅者的结果要等到 2 秒后才能可视化。

如果您的实际用例中的慢速订阅者确实是非阻塞的,那么您有两种选择来控制事件流,要么您需要从消息源控制流,无论在哪里是。或者您需要使用一种背压运算符,例如 controlled()

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var controller = xmlNodeStream.bufferWithCount(2).controlled();
var commJson = controller.publish().refCount();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { 
                console.log("============\nSlowsub called: ", x); 
                controller.request(1);
               }, 5000);
});

commJson.request(1);