async.queue 个并发任务

async.queue concurrent tasks

我正在使用 async.queue 来确保服务中的某些文件副本最多 n 同时发生,但观察文件副本有时我看到的比队列允许的要多得多。有没有人看到我在下面的实现中可能遗漏了什么?

createQueue(limit: number) {
    let self = this;
    return async.queue(function(cmdObj, callback) {
      console.log("Beginning copy");
      let cmd = cmdObj.cmd;
      let args = cmdObj.args;
      let request = cmdObj.req;
      request.state = State.IN_PROGRESS;
      self.reportStatus(request.destination);
      const proc = spawn(cmd, args); //uses an rsync command upstream
      proc.on("close", code => {
        if (code !== 0) {
          request.state = State.ERRORED;
          self.reportStatus(request.destination); // these just report to the caller
          statusMap.delete(request.destination);
        } else {
          fs.rename(request.destination + ".part", request.destination);
          request.state = State.COMPLETED;
          self.reportStatus(request.destination); // same here
          statusMap.delete(request.destination);
        }
        callback();
      });
      proc.on("error", err => {
        console.error("COPY ERR: " + err);
      });
    }, limit); // limit here, for example, may be two, but I see four copies concurrently
  }

编辑: 我现在相信这是系统其余部分的副作用......在副本开始后队列被清除并重新初始化......所以当新项目添加到重新初始化的队列时,它们会立即启动,因为系统没有想法是否已将某些内容移交给用户空间并且当前为 运行。

所以,这是用户错误...PEBCAK!发布解决方案更像是一个警示故事:

上面的队列按设计工作,但我有一个端点供调用服务器根据需要清除队列;问题是我正在使用 kill() 并重新初始化队列,丢失了所有正在进行的作业及其回调的所有轨道。一旦一个新项目进入新鲜队列,它就会认为什么都没有发生并产生一个新的复制进程。我通过使用 remove 来清除队列而不是重新初始化来解决。