Node.js Streams - 在没有背压的情况下及时安排事件

Node.js Streams - schedule events in time without backpressure

使用 Node.js,我正在尝试 "replay" 一个简单的 csv 文件,其中一列包含以浮点秒为单位的时间点,另一列包含数值。这个想法是在脚本启动后在第一列中指定的时间输出值。

这是我为此编写的代码。

const NS_PER_SEC = 1e9;

const {chain}  = require('stream-chain');

const {parser} = require('stream-csv-as-json');
const {streamValues} = require('stream-json/streamers/StreamValues');

const fs   = require('fs');
const zlib = require('zlib');

// CSV file streaming pipeline
const pipeline = chain([
  fs.createReadStream('timed-data.csv'),
  parser({separator: ','}),
  streamValues(),
  data => {
    const value = data.value;
    return {t: value[0], pitch: value[1]}
  }
]);

var startTime = process.hrtime();

// Output a line every second for testing
setInterval(()=>{
  let t = process.hrtime(startTime)[0]
  console.log('----------------' + t + '-----------------------')
}, 1000)


pipeline.on('data', (d) => {
  // get time
  let hrtDiff = process.hrtime(startTime);
  // convert hrtime array to float sec
  let hrtDiffFloat = (hrtDiff[0] + (hrtDiff[1] / NS_PER_SEC));
  // calcualte time difference from now
  var diff = (d.t - hrtDiffFloat)*1000;
  console.log('Setting timeout to '+ diff + 'ms');
  setTimeout((d)=>console.log('data: '+d.t+', '+d.pitch), diff, d);
});

目前,当 Node 快速浏览文件时,安排了数千次执行。稍后,事件开始触发。

问题是:我如何安排一次(或几次)事件执行,以便在计划的执行完成后从流中提取新项目?

(除了 war 犯罪,例如 while (Date.now() < executionTime) ;)

请注意,我是 Node 的新手,所以流可能不是完成此任务的正确工具。

示例数据:(实际数据每秒包含数百个值)

time,val
1.886621315,0
2.757369614,186.920
3.848707482,178.005
4.440816326,0
4.992290249,154.440
5.932698412,0
7.845260770,0
9.027936507,240.235
10.164172335,264.044
11.625487528,198.861
13.526439909,249.802
14.841088435,0
15.243628117,173.235
15.847346938,198.861
17.250612244,223.481
18.521541950,218.313
20.495238095,264.044
21.796371882,348.087
22.134240362,278.755
26.769705215,249.083

示例数据的控制台输出:

Mikulass-MBP:experiments mikulas$ node strem-csv.js 
Setting timeout to NaNms
Setting timeout to 1882.043799ms
Setting timeout to 2752.6014609999997ms
Setting timeout to 3843.824312ms
Setting timeout to 4435.851658ms
Setting timeout to 4987.248032ms
Setting timeout to 5927.584522ms
Setting timeout to 7840.075599000001ms
Setting timeout to 9022.677742ms
Setting timeout to 10158.839757ms
Setting timeout to 11620.070245ms
Setting timeout to 13520.945921ms
Setting timeout to 14835.519640999999ms
Setting timeout to 15237.984088ms
Setting timeout to 15841.62652ms
Setting timeout to 17244.816256ms
Setting timeout to 18515.670276999997ms
Setting timeout to 20489.291373ms
Setting timeout to 21790.351713ms
Setting timeout to 22128.146255ms
Setting timeout to 26763.537879999996ms
data: time, val
----------------1-----------------------
data: 1.886621315, 0
----------------2-----------------------
data: 2.757369614, 186.920
----------------3-----------------------
data: 3.848707482, 178.005
----------------4-----------------------
data: 4.440816326, 0
data: 4.992290249, 154.440
----------------5-----------------------
data: 5.932698412, 0
----------------6-----------------------
----------------7-----------------------
data: 7.845260770, 0
----------------8-----------------------
data: 9.027936507, 240.235
----------------9-----------------------
----------------10-----------------------
data: 10.164172335, 264.044
----------------11-----------------------
data: 11.625487528, 198.861
----------------12-----------------------
----------------13-----------------------
data: 13.526439909, 249.802
----------------14-----------------------
data: 14.841088435, 0
----------------15-----------------------
data: 15.243628117, 173.235
data: 15.847346938, 198.861
----------------16-----------------------
----------------17-----------------------
data: 17.250612244, 223.481
----------------18-----------------------
data: 18.521541950, 218.313
----------------19-----------------------
----------------20-----------------------
data: 20.495238095, 264.044
----------------21-----------------------
data: 21.796371882, 348.087
----------------22-----------------------
data: 22.134240362, 278.755
----------------23-----------------------
----------------24-----------------------
----------------25-----------------------
----------------26-----------------------
data: 26.769705215, 249.083
----------------27-----------------------
----------------28-----------------------

您可以向链中添加一个异步函数:

const timer = ms => new Promise(res => setTimeout(res, ms));

const pipeline = chain([
 fs.createReadStream('timed-data.csv'),
 parser({separator: ','}),
 streamValues(),
 async data => {
   const [time, pitch] = data.value;
   await timer(time * 1000); // "sleep"
   return {time, pitch };
 }
]);

这会产生背压,这实际上是最优雅的选择,否则您将不得不将所有数据加载到缓冲区中。