我怎样才能避免 "Callback was already called" 和 async.queue?
How can I avoid "Callback was already called" with async.queue?
我正在尝试使用 async.queue 进行一系列调用,并且每个调用都有自己的回调,我在其中使用 mocha 来测试是否返回了预期结果。
当我使用并发值 1(我的 NUMBER_OF_THREADS 变量)时,这一切都很好。但是,当我使用任何大于 1 的值时,我会收到错误消息,指出 "Error: Callback was already called." 例如,如果我发送 10 条消息,并将 NUMBER_OF_THREADS 设置为 5,则前 5 条消息会顺利发送,但是然后我开始看到围绕消息 6 或 7 的重复回调错误(见下文)。你知道我怎样才能避免这个错误吗?
我的测试文件(定义异步队列的地方):
var myQueue = async.queue(function(options, callback){
var counter = options.counter;
myService.sendMyMessage(options.text, counter, function(result) {
var myText = result.content.text;
console.log("Response " + myService.counter + ": " + myText);
responses.push(myText);
callback();
});
}, NUMBER_OF_THREADS);
myQueue.drain = function(){
console.log("sends completed");
for (var i = 0; i < numberOfSends; i++) {
assert.equal(myExpectedResponse,responses[i],"text doesn't match");
}
done();
};
for (var j = 1; j <= numberOfSends; j++) {
var options = {
counter: j,
text: "Hello_" + j
};
myQueue.push(options);
}
我的服务文件(发生发送和响应的地方):
myService.callback = function() {};
myService.sendMyMessage = function(message, counter, myCallback) {
console.log("Sending message " + counter + ": " + message);
var myContent = JSON.stringify(myModel.makeMessage(message));
myModel.post(content)
.then(function(res) {
myService.callback = myCallback;
});
};
myService.otherService = function(done) {
app = express();
app.use(express.bodyParser());
app.post('/myRoute/events', function(req, res, next) {
var response = {
"myId": "1234567890",
"myVersion": 1
};
res.set('Content-Type', 'application/json;charset=UTF-8');
res.send(JSON.stringify(response));
if (myService.callback)
{
myService.counter ++;
myService.callback(req.body);
//myService.callback = null;
}
else
{
console.log('the callback is NULL');
}
});
我在控制台中的结果:
Sending message 1: Hello_1
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
Response 7: myResponse
Error: Callback was already called.
at myFile.js:12:34
如果我取消注释 myService.callback = null 行,我最后一批的第一次发送会导致 myService.callback 过早为空。例如,如果我发送 10 个 NUMBER_OF_THREADS=5 的请求,则请求 1 到 5 会很好用。但是,一旦我发送请求 1 到 10,请求 #10 就会过早地使 myService.callback 无效。回复示例:
Sending message 1: Hello_1
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
the callback is NULL
the callback is NULL
the callback is NULL
the callback is NULL
我现在已经解决了这个问题。
在我的测试文件中,我现在只是调用 sendMyMessage;我不再期待来自 sendMyMessage 的回调。在 drain 中,我等待响应总数达到,然后循环遍历这些响应。
var myQueue = async.queue(function(options, callback){
var counter = options.counter;
myService.sendMyMessage(options.text, counter);
callback();
}, NUMBER_OF_THREADS);
myQueue.drain = function(){
var myInterval = setInterval(function() {
if (myService.responseCounter == myNumberOfMessages) {
clearInterval(myInterval);
for (var i = 0; i < myNumberOfMessages; i++) {
assert.equal(myExpectedResponse,myService.responses[i],"error");
}
done();
}
}, 5000);
};
for (var j = 1; j <= myNumberOfMessages; j++) {
var options = {
counter: j,
text: "Hello"
};
myQueue.push(options);
}
然后,在我的服务文件中,我现在使用 array 回调;我不再依赖于设置或取消单个回调。每次调用 myModel.post 都会在此回调数组中定义一个新元素。
myService.sendLineMessage = function(message, counter) {
myModel.post(content, sign, config, request, log, run)
.then(function(res) {
myService.callbacks[counter] = function(result) {
var myText = result.content.text;
myService.responses.push(resultText);
};
});
};
myService.otherService = function(done) {
app = express();
app.use(express.bodyParser());
app.post('/myRoute/events', function(req, res, next) {
var response = {
"myId": "1234567890",
"myVersion": 1
};
res.set('Content-Type', 'application/json;charset=UTF-8');
res.send(JSON.stringify(response));
myService.responseCounter++;
if (myService.callbacks[myService.responseCounter])
{
myService.callbacks[myService.responseCounter](req.body);
myService.callbacks[myService.responseCounter] = null;
}
else
{
console.log('the callback is NULL');
}
});
我正在尝试使用 async.queue 进行一系列调用,并且每个调用都有自己的回调,我在其中使用 mocha 来测试是否返回了预期结果。
当我使用并发值 1(我的 NUMBER_OF_THREADS 变量)时,这一切都很好。但是,当我使用任何大于 1 的值时,我会收到错误消息,指出 "Error: Callback was already called." 例如,如果我发送 10 条消息,并将 NUMBER_OF_THREADS 设置为 5,则前 5 条消息会顺利发送,但是然后我开始看到围绕消息 6 或 7 的重复回调错误(见下文)。你知道我怎样才能避免这个错误吗?
我的测试文件(定义异步队列的地方):
var myQueue = async.queue(function(options, callback){
var counter = options.counter;
myService.sendMyMessage(options.text, counter, function(result) {
var myText = result.content.text;
console.log("Response " + myService.counter + ": " + myText);
responses.push(myText);
callback();
});
}, NUMBER_OF_THREADS);
myQueue.drain = function(){
console.log("sends completed");
for (var i = 0; i < numberOfSends; i++) {
assert.equal(myExpectedResponse,responses[i],"text doesn't match");
}
done();
};
for (var j = 1; j <= numberOfSends; j++) {
var options = {
counter: j,
text: "Hello_" + j
};
myQueue.push(options);
}
我的服务文件(发生发送和响应的地方):
myService.callback = function() {};
myService.sendMyMessage = function(message, counter, myCallback) {
console.log("Sending message " + counter + ": " + message);
var myContent = JSON.stringify(myModel.makeMessage(message));
myModel.post(content)
.then(function(res) {
myService.callback = myCallback;
});
};
myService.otherService = function(done) {
app = express();
app.use(express.bodyParser());
app.post('/myRoute/events', function(req, res, next) {
var response = {
"myId": "1234567890",
"myVersion": 1
};
res.set('Content-Type', 'application/json;charset=UTF-8');
res.send(JSON.stringify(response));
if (myService.callback)
{
myService.counter ++;
myService.callback(req.body);
//myService.callback = null;
}
else
{
console.log('the callback is NULL');
}
});
我在控制台中的结果:
Sending message 1: Hello_1
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
Response 7: myResponse
Error: Callback was already called.
at myFile.js:12:34
如果我取消注释 myService.callback = null 行,我最后一批的第一次发送会导致 myService.callback 过早为空。例如,如果我发送 10 个 NUMBER_OF_THREADS=5 的请求,则请求 1 到 5 会很好用。但是,一旦我发送请求 1 到 10,请求 #10 就会过早地使 myService.callback 无效。回复示例:
Sending message 1: Hello_1
Sending message 2: Hello_2
Sending message 3: Hello_3
Sending message 4: Hello_4
Sending message 5: Hello_5
Response 1: myResponse
Sending message 6: Hello_6
Response 2: myResponse
Sending message 7: Hello_7
Response 3: myResponse
Sending message 8: Hello_8
Response 4: myResponse
Sending message 9: Hello_9
Response 5: myResponse
Sending message 10: Hello_10
Response 6: myResponse
the callback is NULL
the callback is NULL
the callback is NULL
the callback is NULL
我现在已经解决了这个问题。
在我的测试文件中,我现在只是调用 sendMyMessage;我不再期待来自 sendMyMessage 的回调。在 drain 中,我等待响应总数达到,然后循环遍历这些响应。
var myQueue = async.queue(function(options, callback){
var counter = options.counter;
myService.sendMyMessage(options.text, counter);
callback();
}, NUMBER_OF_THREADS);
myQueue.drain = function(){
var myInterval = setInterval(function() {
if (myService.responseCounter == myNumberOfMessages) {
clearInterval(myInterval);
for (var i = 0; i < myNumberOfMessages; i++) {
assert.equal(myExpectedResponse,myService.responses[i],"error");
}
done();
}
}, 5000);
};
for (var j = 1; j <= myNumberOfMessages; j++) {
var options = {
counter: j,
text: "Hello"
};
myQueue.push(options);
}
然后,在我的服务文件中,我现在使用 array 回调;我不再依赖于设置或取消单个回调。每次调用 myModel.post 都会在此回调数组中定义一个新元素。
myService.sendLineMessage = function(message, counter) {
myModel.post(content, sign, config, request, log, run)
.then(function(res) {
myService.callbacks[counter] = function(result) {
var myText = result.content.text;
myService.responses.push(resultText);
};
});
};
myService.otherService = function(done) {
app = express();
app.use(express.bodyParser());
app.post('/myRoute/events', function(req, res, next) {
var response = {
"myId": "1234567890",
"myVersion": 1
};
res.set('Content-Type', 'application/json;charset=UTF-8');
res.send(JSON.stringify(response));
myService.responseCounter++;
if (myService.callbacks[myService.responseCounter])
{
myService.callbacks[myService.responseCounter](req.body);
myService.callbacks[myService.responseCounter] = null;
}
else
{
console.log('the callback is NULL');
}
});