如何创建一个 RxJS 缓冲区来对 NodeJS 中的元素进行分组但不依赖于永远 运行 间隔?

How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?

我在 NodeJS 中使用 Rx.Observable.fromEvent 从应用程序捕获事件。 这些使用请求 (https://www.npmjs.com/package/request) 发送到另一台服务器。 为了避免高网络负载,我需要在发送请求之间的给定超时时间缓冲这些事件。

问题

使用bufferWithTime(200)将保留节点进程运行,我不知道应用程序何时完成关闭流。

有没有办法使用Rx缓冲区说:

  1. 当元素 1 被按下时设置一个计时器
  2. 当元素 2 和 3 在计时器到期之前到达时将它们推送到数组 [1, 2, 3](缓冲区)
  3. 当计时器到期时,将 [1, 2, 3] 数组发送到管道。
  4. 如果元素 4 在计时器到期后出现,则设置一个新计时器并重新开始。

如果没有元素被压入,则没有计时器启动,这将使进程退出。

我最初的方法是:

Rx.Observable
     .fromEvent(eventEmitter, 'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))

您可能需要拆分流并使用第二部分触发第一部分。

var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));

source
     .buffer(closer)
     .map(addEventsToRequestOption)
     .flatMap(function(x) { Promise.resolve(request(x)); })
     //I assume this log method returns a function?
     .subscribe(log('Response received'));

source.flatMapFirst(Rx.Observable.timer(2000)) 是这里的重要一行。它创建一个 Observable,该 Observable 生成一个定时器,该定时器将在 2000 毫秒后触发。当第一个事件出现时,它将启动计时器。 flatMapFirst 将忽略后续事件,只要计时器为 运行。当计时器最终发出时,它将触发缓冲区发出其当前缓冲区并重新开始。

参见 docs 关于 buffer 的边界 Observable

建议的实现,使用 delay 运算符:

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = source.delay(1200);

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})

buffered$.subscribe(emits("buffer"));

jsbin 在这里: http://jsbin.com/wilurivehu/edit?html,js,console,output