如何创建一个 RxJS 缓冲区来对 NodeJS 中的元素进行分组但不依赖于永远 运行 间隔?
How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?
我在 NodeJS 中使用 Rx.Observable.fromEvent
从应用程序捕获事件。
这些使用请求 (https://www.npmjs.com/package/request) 发送到另一台服务器。
为了避免高网络负载,我需要在发送请求之间的给定超时时间缓冲这些事件。
问题
使用bufferWithTime(200)
将保留节点进程运行,我不知道应用程序何时完成关闭流。
有没有办法使用Rx缓冲区说:
- 当元素 1 被按下时设置一个计时器
- 当元素 2 和 3 在计时器到期之前到达时将它们推送到数组 [1, 2, 3](缓冲区)
- 当计时器到期时,将 [1, 2, 3] 数组发送到管道。
- 如果元素 4 在计时器到期后出现,则设置一个新计时器并重新开始。
如果没有元素被压入,则没有计时器启动,这将使进程退出。
我最初的方法是:
Rx.Observable
.fromEvent(eventEmitter, 'log')
.bufferWithTime(200) // this is the issue
.map(addEventsToRequestOption)
.map(request)
.flatMap(Promise.resolve)
.subscribe(log('Response received'))
您可能需要拆分流并使用第二部分触发第一部分。
var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));
source
.buffer(closer)
.map(addEventsToRequestOption)
.flatMap(function(x) { Promise.resolve(request(x)); })
//I assume this log method returns a function?
.subscribe(log('Response received'));
source.flatMapFirst(Rx.Observable.timer(2000))
是这里的重要一行。它创建一个 Observable,该 Observable 生成一个定时器,该定时器将在 2000 毫秒后触发。当第一个事件出现时,它将启动计时器。 flatMapFirst
将忽略后续事件,只要计时器为 运行。当计时器最终发出时,它将触发缓冲区发出其当前缓冲区并重新开始。
参见 docs 关于 buffer
的边界 Observable
建议的实现,使用 delay
运算符:
function emits(who){
return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}
var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");
var delayedSource$ = source.delay(1200);
var buffered$ = source
.buffer(function () { return delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})
buffered$.subscribe(emits("buffer"));
jsbin 在这里:
http://jsbin.com/wilurivehu/edit?html,js,console,output
我在 NodeJS 中使用 Rx.Observable.fromEvent
从应用程序捕获事件。
这些使用请求 (https://www.npmjs.com/package/request) 发送到另一台服务器。
为了避免高网络负载,我需要在发送请求之间的给定超时时间缓冲这些事件。
问题
使用bufferWithTime(200)
将保留节点进程运行,我不知道应用程序何时完成关闭流。
有没有办法使用Rx缓冲区说:
- 当元素 1 被按下时设置一个计时器
- 当元素 2 和 3 在计时器到期之前到达时将它们推送到数组 [1, 2, 3](缓冲区)
- 当计时器到期时,将 [1, 2, 3] 数组发送到管道。
- 如果元素 4 在计时器到期后出现,则设置一个新计时器并重新开始。
如果没有元素被压入,则没有计时器启动,这将使进程退出。
我最初的方法是:
Rx.Observable
.fromEvent(eventEmitter, 'log')
.bufferWithTime(200) // this is the issue
.map(addEventsToRequestOption)
.map(request)
.flatMap(Promise.resolve)
.subscribe(log('Response received'))
您可能需要拆分流并使用第二部分触发第一部分。
var source = Rx.Observable.fromEvent(eventEmitter, 'log');
var closer = source.flatMapFirst(Rx.Observable.timer(2000));
source
.buffer(closer)
.map(addEventsToRequestOption)
.flatMap(function(x) { Promise.resolve(request(x)); })
//I assume this log method returns a function?
.subscribe(log('Response received'));
source.flatMapFirst(Rx.Observable.timer(2000))
是这里的重要一行。它创建一个 Observable,该 Observable 生成一个定时器,该定时器将在 2000 毫秒后触发。当第一个事件出现时,它将启动计时器。 flatMapFirst
将忽略后续事件,只要计时器为 运行。当计时器最终发出时,它将触发缓冲区发出其当前缓冲区并重新开始。
参见 docs 关于 buffer
的边界 Observable
建议的实现,使用 delay
运算符:
function emits(who){
return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}
var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");
var delayedSource$ = source.delay(1200);
var buffered$ = source
.buffer(function () { return delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})
buffered$.subscribe(emits("buffer"));
jsbin 在这里: http://jsbin.com/wilurivehu/edit?html,js,console,output