我怎样才能避免 "Callback was already called" 和 async.queue?

How can I avoid "Callback was already called" with async.queue?

我正在尝试使用 async.queue 进行一系列调用,并且每个调用都有自己的回调,我在其中使用 mocha 来测试是否返回了预期结果。

当我使用并发值 1(我的 NUMBER_OF_THREADS 变量)时,这一切都很好。但是,当我使用任何大于 1 的值时,我会收到错误消息,指出 "Error: Callback was already called." 例如,如果我发送 10 条消息,并将 NUMBER_OF_THREADS 设置为 5,则前 5 条消息会顺利发送,但是然后我开始看到围绕消息 6 或 7 的重复回调错误(见下文)。你知道我怎样才能避免这个错误吗?

我的测试文件(定义异步队列的地方):

var myQueue = async.queue(function(options, callback){   
    var counter = options.counter;

    myService.sendMyMessage(options.text, counter, function(result) {
        var myText = result.content.text;
        console.log("Response " + myService.counter + ": " + myText);

        responses.push(myText);
        callback();
    });
}, NUMBER_OF_THREADS);

myQueue.drain = function(){ 
    console.log("sends completed"); 
    for (var i = 0; i < numberOfSends; i++) {
        assert.equal(myExpectedResponse,responses[i],"text doesn't match");
    }
    done();
};  

for (var j = 1; j <= numberOfSends; j++) {
    var options = {
        counter: j,
        text: "Hello_" + j
    };
    myQueue.push(options);
}

我的服务文件(发生发送和响应的地方):

myService.callback = function() {};

myService.sendMyMessage = function(message, counter, myCallback) {
    console.log("Sending message " + counter + ": " + message);
    var myContent = JSON.stringify(myModel.makeMessage(message));

    myModel.post(content)
        .then(function(res) {
            myService.callback = myCallback;
        });
};

myService.otherService = function(done) {
    app = express();
    app.use(express.bodyParser());

app.post('/myRoute/events', function(req, res, next) {
    var response = {
        "myId": "1234567890",
        "myVersion": 1
    };

    res.set('Content-Type', 'application/json;charset=UTF-8');
    res.send(JSON.stringify(response));
    if (myService.callback)
    {
        myService.counter ++;

        myService.callback(req.body);
        //myService.callback = null;
    }
    else
    {
        console.log('the callback is NULL');
    }
});

我在控制台中的结果:

Sending message 1: Hello_1  
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
Response 7: myResponse
Error: Callback was already called.
    at myFile.js:12:34

如果我取消注释 myService.callback = null 行,我最后一批的第一次发送会导致 myService.callback 过早为空。例如,如果我发送 10 个 NUMBER_OF_THREADS=5 的请求,则请求 1 到 5 会很好用。但是,一旦我发送请求 1 到 10,请求 #10 就会过早地使 myService.callback 无效。回复示例:

Sending message 1: Hello_1  
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
the callback is NULL
the callback is NULL
the callback is NULL
the callback is NULL

我现在已经解决了这个问题。

在我的测试文件中,我现在只是调用 sendMyMessage;我不再期待来自 sendMyMessage 的回调。在 drain 中,我等待响应总数达到,然后循环遍历这些响应。

var myQueue = async.queue(function(options, callback){   
    var counter = options.counter;

    myService.sendMyMessage(options.text, counter);
    callback();
}, NUMBER_OF_THREADS);

myQueue.drain = function(){ 
    var myInterval = setInterval(function() {

    if (myService.responseCounter == myNumberOfMessages) {

      clearInterval(myInterval);
      for (var i = 0; i < myNumberOfMessages; i++) {
        assert.equal(myExpectedResponse,myService.responses[i],"error");
      }
      done();
    }
  }, 5000);

};  

for (var j = 1; j <= myNumberOfMessages; j++) {
  var options = {
    counter: j,
    text: "Hello"
  };

  myQueue.push(options);
}

然后,在我的服务文件中,我现在使用 array 回调;我不再依赖于设置或取消单个回调。每次调用 myModel.post 都会在此回调数组中定义一个新元素。

myService.sendLineMessage = function(message, counter) {

myModel.post(content, sign, config, request, log, run)
    .then(function(res) {
        myService.callbacks[counter] = function(result) {
            var myText = result.content.text;
            myService.responses.push(resultText);
        };
    });
};

myService.otherService = function(done) {
    app = express();
    app.use(express.bodyParser());

    app.post('/myRoute/events', function(req, res, next) {

    var response = {
        "myId": "1234567890",
        "myVersion": 1
    };

    res.set('Content-Type', 'application/json;charset=UTF-8');
    res.send(JSON.stringify(response));

    myService.responseCounter++;

    if (myService.callbacks[myService.responseCounter])
    {
        myService.callbacks[myService.responseCounter](req.body);
        myService.callbacks[myService.responseCounter] = null;
    }
    else
    {
        console.log('the callback is NULL');
    }
});