async.queue 个并发任务
async.queue concurrent tasks
我正在使用 async.queue
来确保服务中的某些文件副本最多 n
同时发生,但观察文件副本有时我看到的比队列允许的要多得多。有没有人看到我在下面的实现中可能遗漏了什么?
createQueue(limit: number) {
let self = this;
return async.queue(function(cmdObj, callback) {
console.log("Beginning copy");
let cmd = cmdObj.cmd;
let args = cmdObj.args;
let request = cmdObj.req;
request.state = State.IN_PROGRESS;
self.reportStatus(request.destination);
const proc = spawn(cmd, args); //uses an rsync command upstream
proc.on("close", code => {
if (code !== 0) {
request.state = State.ERRORED;
self.reportStatus(request.destination); // these just report to the caller
statusMap.delete(request.destination);
} else {
fs.rename(request.destination + ".part", request.destination);
request.state = State.COMPLETED;
self.reportStatus(request.destination); // same here
statusMap.delete(request.destination);
}
callback();
});
proc.on("error", err => {
console.error("COPY ERR: " + err);
});
}, limit); // limit here, for example, may be two, but I see four copies concurrently
}
编辑:
我现在相信这是系统其余部分的副作用......在副本开始后队列被清除并重新初始化......所以当新项目添加到重新初始化的队列时,它们会立即启动,因为系统没有想法是否已将某些内容移交给用户空间并且当前为 运行。
所以,这是用户错误...PEBCAK!发布解决方案更像是一个警示故事:
上面的队列按设计工作,但我有一个端点供调用服务器根据需要清除队列;问题是我正在使用 kill()
并重新初始化队列,丢失了所有正在进行的作业及其回调的所有轨道。一旦一个新项目进入新鲜队列,它就会认为什么都没有发生并产生一个新的复制进程。我通过使用 remove
来清除队列而不是重新初始化来解决。
我正在使用 async.queue
来确保服务中的某些文件副本最多 n
同时发生,但观察文件副本有时我看到的比队列允许的要多得多。有没有人看到我在下面的实现中可能遗漏了什么?
createQueue(limit: number) {
let self = this;
return async.queue(function(cmdObj, callback) {
console.log("Beginning copy");
let cmd = cmdObj.cmd;
let args = cmdObj.args;
let request = cmdObj.req;
request.state = State.IN_PROGRESS;
self.reportStatus(request.destination);
const proc = spawn(cmd, args); //uses an rsync command upstream
proc.on("close", code => {
if (code !== 0) {
request.state = State.ERRORED;
self.reportStatus(request.destination); // these just report to the caller
statusMap.delete(request.destination);
} else {
fs.rename(request.destination + ".part", request.destination);
request.state = State.COMPLETED;
self.reportStatus(request.destination); // same here
statusMap.delete(request.destination);
}
callback();
});
proc.on("error", err => {
console.error("COPY ERR: " + err);
});
}, limit); // limit here, for example, may be two, but I see four copies concurrently
}
编辑: 我现在相信这是系统其余部分的副作用......在副本开始后队列被清除并重新初始化......所以当新项目添加到重新初始化的队列时,它们会立即启动,因为系统没有想法是否已将某些内容移交给用户空间并且当前为 运行。
所以,这是用户错误...PEBCAK!发布解决方案更像是一个警示故事:
上面的队列按设计工作,但我有一个端点供调用服务器根据需要清除队列;问题是我正在使用 kill()
并重新初始化队列,丢失了所有正在进行的作业及其回调的所有轨道。一旦一个新项目进入新鲜队列,它就会认为什么都没有发生并产生一个新的复制进程。我通过使用 remove
来清除队列而不是重新初始化来解决。