读取流达到 settimeout 最大值错误
Read stream with settimeout maximum value reached error
我正在尝试读取一些大型 CSV 文件并处理这些数据,因此在处理过程中存在速率限制,因此我想在每个请求之间添加 1 分钟的延迟。我尝试设置超时,但最后,我知道 settimeout 是有限制的,并收到以下错误。我不确定还有什么其他方法可以处理这种情况,CSV 文件有超过 1M 的记录。我在这里做错了什么吗?
错误
Timeout duration was set to 1. (node:41) TimeoutOverflowWarning:
2241362000 does not fit into a 32-bit signed integer.
示例代码:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options) {
return new Promise((resolve, reject) => {
console.log('process csv started', new Date());
let filePath = config.api.basePath + fileName;
stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
reject(error);
})
.pipe(csv())
.on('data', async (row) => {
ctr++
increment(row, ctr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
resolve(filePath);
})
});
}
async function increment(raw, counter) {
setTimeout(async function(){
console.log('say i am inside a function', counter, new Date());
domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
// add jobs without delay it will hit ratelimit
}, 60000 * counter);
}
function queueWorkerProcess(value) { // Process jobs in queue and save in text file
console.log('value', value, new Date());
return new Promise(resolve => {
resolve();
});
}
这是一个大概的想法。您需要跟踪正在处理的正在处理的项目数量,以限制使用的内存量并控制存储结果的任何资源的负载。
当您达到飞行中的数量限制时,您将暂停流。当您回到限制以下时,您将恢复流。您在 .add()
上增加一个计数器并在 completed
消息上减少一个计数器以跟踪事情。这是您暂停或恢复流的地方。
仅供参考,仅在某处插入 setTimeout()
对您没有帮助。为了控制内存使用,一旦处理的项目太多,就必须暂停流中的数据流。然后,当项目回到阈值以下时,您可以恢复流。
下面是大概的样子:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
// counter that keeps track of how many items in the queue
let queueCntr = 0;
// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;
function processCSV(name, fileName, options) {
return new Promise((resolve, reject) => {
let paused = false;
console.log('process csv started', new Date());
const filePath = config.api.basePath + fileName;
const stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
domainQueue.off('completed', completed);
reject(error);
}).pipe(csv())
.on('data', async (row) => {
increment(row, ctr);
if (queueCntr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
domainQueue.off('completed', completed);
resolve(filePath);
});
function completed() {
--queueCntr;
// see if queue got small enough we now resume the stream
if (paused && queueCntr < queueMax) {
stream.resume();
paused = false;
}
}
domainQueue.on('completed', completed);
function increment(raw, counter) {
++queueCntr;
domainQueue.add(data, options);
if (!paused && queueCntr > queueMax) {
stream.pause();
paused = true;
}
}
});
}
并且,如果您使用不同的文件多次调用 processCSV()
,您应该对它们进行排序,以便在第一个完成之前不要调用第二个,不要调用第三个直到第二个完成,依此类推...您没有显示该代码,因此我们无法就此提出具体建议。
我正在尝试读取一些大型 CSV 文件并处理这些数据,因此在处理过程中存在速率限制,因此我想在每个请求之间添加 1 分钟的延迟。我尝试设置超时,但最后,我知道 settimeout 是有限制的,并收到以下错误。我不确定还有什么其他方法可以处理这种情况,CSV 文件有超过 1M 的记录。我在这里做错了什么吗?
错误
Timeout duration was set to 1. (node:41) TimeoutOverflowWarning: 2241362000 does not fit into a 32-bit signed integer.
示例代码:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options) {
return new Promise((resolve, reject) => {
console.log('process csv started', new Date());
let filePath = config.api.basePath + fileName;
stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
reject(error);
})
.pipe(csv())
.on('data', async (row) => {
ctr++
increment(row, ctr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
resolve(filePath);
})
});
}
async function increment(raw, counter) {
setTimeout(async function(){
console.log('say i am inside a function', counter, new Date());
domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
// add jobs without delay it will hit ratelimit
}, 60000 * counter);
}
function queueWorkerProcess(value) { // Process jobs in queue and save in text file
console.log('value', value, new Date());
return new Promise(resolve => {
resolve();
});
}
这是一个大概的想法。您需要跟踪正在处理的正在处理的项目数量,以限制使用的内存量并控制存储结果的任何资源的负载。
当您达到飞行中的数量限制时,您将暂停流。当您回到限制以下时,您将恢复流。您在 .add()
上增加一个计数器并在 completed
消息上减少一个计数器以跟踪事情。这是您暂停或恢复流的地方。
仅供参考,仅在某处插入 setTimeout()
对您没有帮助。为了控制内存使用,一旦处理的项目太多,就必须暂停流中的数据流。然后,当项目回到阈值以下时,您可以恢复流。
下面是大概的样子:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
// counter that keeps track of how many items in the queue
let queueCntr = 0;
// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;
function processCSV(name, fileName, options) {
return new Promise((resolve, reject) => {
let paused = false;
console.log('process csv started', new Date());
const filePath = config.api.basePath + fileName;
const stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
domainQueue.off('completed', completed);
reject(error);
}).pipe(csv())
.on('data', async (row) => {
increment(row, ctr);
if (queueCntr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
domainQueue.off('completed', completed);
resolve(filePath);
});
function completed() {
--queueCntr;
// see if queue got small enough we now resume the stream
if (paused && queueCntr < queueMax) {
stream.resume();
paused = false;
}
}
domainQueue.on('completed', completed);
function increment(raw, counter) {
++queueCntr;
domainQueue.add(data, options);
if (!paused && queueCntr > queueMax) {
stream.pause();
paused = true;
}
}
});
}
并且,如果您使用不同的文件多次调用 processCSV()
,您应该对它们进行排序,以便在第一个完成之前不要调用第二个,不要调用第三个直到第二个完成,依此类推...您没有显示该代码,因此我们无法就此提出具体建议。