读取流达到 settimeout 最大值错误

Read stream with settimeout maximum value reached error

我正在尝试读取一些大型 CSV 文件并处理这些数据,因此在处理过程中存在速率限制,因此我想在每个请求之间添加 1 分钟的延迟。我尝试设置超时,但最后,我知道 settimeout 是有限制的,并收到以下错误。我不确定还有什么其他方法可以处理这种情况,CSV 文件有超过 1M 的记录。我在这里做错了什么吗?

错误

Timeout duration was set to 1. (node:41) TimeoutOverflowWarning: 2241362000 does not fit into a 32-bit signed integer.

示例代码:

   const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
  redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options)  {
  return new Promise((resolve, reject) => {
    console.log('process csv started', new Date());
    let filePath = config.api.basePath + fileName;
    stream = fs.createReadStream(filePath)
        .on('error', (error) => {
          // handle error
          console.log('error processing csv');
          reject(error);
        })
        .pipe(csv())
        .on('data', async (row) => {
          ctr++
          increment(row, ctr)
        })
        .on('end', () => {
          console.log('stream processCSV end', fileName, new Date());
          resolve(filePath);
        })
  });

}

async function increment(raw, counter) {
  setTimeout(async function(){
    console.log('say i am inside a function', counter, new Date());
    domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
    // add jobs without delay it will hit ratelimit 
  }, 60000 * counter);

}

function queueWorkerProcess(value) { // Process jobs in queue and save in text file 
  console.log('value', value, new Date());
  return new Promise(resolve => {
    resolve();
  });

}

这是一个大概的想法。您需要跟踪正在处理的正在处理的项目数量,以限制使用的内存量并控制存储结果的任何资源的负载。

当您达到飞行中的数量限制时,您将暂停流。当您回到限制以下时,您将恢复流。您在 .add() 上增加一个计数器并在 completed 消息上减少一个计数器以跟踪事情。这是您暂停或恢复流的地方。

仅供参考,仅在某处插入 setTimeout() 对您没有帮助。为了控制内存使用,一旦处理的项目太多,就必须暂停流中的数据流。然后,当项目回到阈值以下时,您可以恢复流。

下面是大概的样子:

const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
    redis: connectRedis(),
});

// counter that keeps track of how many items in the queue
let queueCntr = 0;

// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;

function processCSV(name, fileName, options) {
    return new Promise((resolve, reject) => {
        let paused = false;

        console.log('process csv started', new Date());
        const filePath = config.api.basePath + fileName;

        const stream = fs.createReadStream(filePath)
            .on('error', (error) => {
                // handle error
                console.log('error processing csv');
                domainQueue.off('completed', completed);
                reject(error);
            }).pipe(csv())
            .on('data', async (row) => {
                increment(row, ctr);
                if (queueCntr)
            })
            .on('end', () => {
                console.log('stream processCSV end', fileName, new Date());
                domainQueue.off('completed', completed);
                resolve(filePath);
            });

        function completed() {
            --queueCntr;
            // see if queue got small enough we now resume the stream
            if (paused && queueCntr < queueMax) {
                stream.resume();
                paused = false;
            }
        }

        domainQueue.on('completed', completed);

        function increment(raw, counter) {
            ++queueCntr;
            domainQueue.add(data, options);
            if (!paused && queueCntr > queueMax) {
                stream.pause();
                paused = true;
            }
        }
    });
}

并且,如果您使用不同的文件多次调用 processCSV(),您应该对它们进行排序,以便在第一个完成之前不要调用第二个,不要调用第三个直到第二个完成,依此类推...您没有显示该代码,因此我们无法就此提出具体建议。