Azure 毒队列:nodejs - 如何创建出队阈值
Azure poison queue: nodejs - how to create a dequeue threshold
我创建了一个存储队列,向队列中插入了一条消息。我一直在阅读用 node 编写的 webjob 的消息,我可以看到出队计数在增加。然而,它从未被放入毒药队列。
这是 azure 处理的还是我必须在我的 nodeja webjob 中写的东西?
在 C# 中创建队列
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("successemailqueue");
queue.CreateIfNotExists();
var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
queue.AddMessage(new CloudQueueMessage(messageData));
从队列中读取消息:
var retryOperations = new azure.ExponentialRetryPolicyFilter();
var queueService = azure.createQueueService(config.storageName, config.storageKey)
.withFilter(retryOperations);
var singleMessageDefaults = { numofmessages: 1, visibilitytimeout: 2 * 60 };
var getSingleMessage = function() {
var deferred = Q.defer();
queueService.getMessages(config.queueName, singleMessageDefaults,
getSingleMessageComplete(deferred));
return deferred.promise;
};
处理消息:
var processMessage = function(message){
if (message) {
if (message.dequeuecount < 5) {
sendEmail(message, queue.deleteMessage);
}else{
// move to poison queue
}
}
};
谢谢
我已尝试成功重现您的问题。当从队列中读取消息而不删除它时,dequeuecount 属性 会增加。
所以你的问题的解决方案是在读取消息后及时从队列存储中删除消息。请参考https://azure.microsoft.com/en-us/documentation/articles/storage-nodejs-how-to-use-queues/#how-to-dequeue-the-next-message.
这是我修改你的代码后的示例代码:
var azure = require('azure');
var config = {storageName: "<storage_name>", storageKey: "<storage_key>", queueName: "<queue_name>"};
var retryOperations = new azure.ExponentialRetryPolicyFilter();
var queueService = azure.createQueueService(config.storageName, config.storageKey)
.withFilter(retryOperations);
var singleMessageDefaults = { numofmessages: 1, visibilitytimeout: 2 * 60 };
var getSingleMessage = function() {
var message;
queueService.getMessages(config.queueName, singleMessageDefaults, function(error, result, response) {
if(!error) {
message = result[0];
if(message != undefined) {
queueService.deleteMessage(config.queueName, message.messageid, message.popreceipt, function(error, response){
if(!error){
console.log('message deleted from queue');
}
});
}
}
});
return message;
};
此致。
因为您已经使用了 this example,所以您可以在 queue.js 文件中添加一个方法,用于将新消息插入队列并移入毒队列,可能会像这样实现:
正在插入新消息(queue.js):
function addMessage(queueName, message){
var deferred = Q.defer();
queueService.createMessage(queueName, message,
function (error, result, response) {
if (error) {
deferred.reject({ error: error, result: result, response: response });
} else {
deferred.resolve(result);
}
});
return deferred.promise;
}
走向毒药(run.js):
var processMessage = function(message){
var deferred = Q.defer();
if (message) {
if (message.dequeuecount < 5) {
sendEmail(message, queue.deleteMessage);
} else {
moveToPoison(message);
};
}
}
function moveToPoison(message){
var deferred = Q.defer();
var poisonQueueConf = {
storageName: config.storageName,
storageKey: config.storageKey,
queueName: config.queueName + '-poison'
};
var poisonQueue = require("./queue")(poisonQueueConf);
queue.deleteMessage(message)
.then(function(){
return poisonQueue.addMessage(.messagetext)
.then(function () {
deferred.resolve();
});
}).catch(function(error){
deferred.reject(error);
});
return deferred.promise;
}
我创建了一个存储队列,向队列中插入了一条消息。我一直在阅读用 node 编写的 webjob 的消息,我可以看到出队计数在增加。然而,它从未被放入毒药队列。
这是 azure 处理的还是我必须在我的 nodeja webjob 中写的东西?
在 C# 中创建队列
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("successemailqueue");
queue.CreateIfNotExists();
var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
queue.AddMessage(new CloudQueueMessage(messageData));
从队列中读取消息:
var retryOperations = new azure.ExponentialRetryPolicyFilter();
var queueService = azure.createQueueService(config.storageName, config.storageKey)
.withFilter(retryOperations);
var singleMessageDefaults = { numofmessages: 1, visibilitytimeout: 2 * 60 };
var getSingleMessage = function() {
var deferred = Q.defer();
queueService.getMessages(config.queueName, singleMessageDefaults,
getSingleMessageComplete(deferred));
return deferred.promise;
};
处理消息:
var processMessage = function(message){
if (message) {
if (message.dequeuecount < 5) {
sendEmail(message, queue.deleteMessage);
}else{
// move to poison queue
}
}
};
谢谢
我已尝试成功重现您的问题。当从队列中读取消息而不删除它时,dequeuecount 属性 会增加。
所以你的问题的解决方案是在读取消息后及时从队列存储中删除消息。请参考https://azure.microsoft.com/en-us/documentation/articles/storage-nodejs-how-to-use-queues/#how-to-dequeue-the-next-message.
这是我修改你的代码后的示例代码:
var azure = require('azure');
var config = {storageName: "<storage_name>", storageKey: "<storage_key>", queueName: "<queue_name>"};
var retryOperations = new azure.ExponentialRetryPolicyFilter();
var queueService = azure.createQueueService(config.storageName, config.storageKey)
.withFilter(retryOperations);
var singleMessageDefaults = { numofmessages: 1, visibilitytimeout: 2 * 60 };
var getSingleMessage = function() {
var message;
queueService.getMessages(config.queueName, singleMessageDefaults, function(error, result, response) {
if(!error) {
message = result[0];
if(message != undefined) {
queueService.deleteMessage(config.queueName, message.messageid, message.popreceipt, function(error, response){
if(!error){
console.log('message deleted from queue');
}
});
}
}
});
return message;
};
此致。
因为您已经使用了 this example,所以您可以在 queue.js 文件中添加一个方法,用于将新消息插入队列并移入毒队列,可能会像这样实现:
正在插入新消息(queue.js):
function addMessage(queueName, message){
var deferred = Q.defer();
queueService.createMessage(queueName, message,
function (error, result, response) {
if (error) {
deferred.reject({ error: error, result: result, response: response });
} else {
deferred.resolve(result);
}
});
return deferred.promise;
}
走向毒药(run.js):
var processMessage = function(message){
var deferred = Q.defer();
if (message) {
if (message.dequeuecount < 5) {
sendEmail(message, queue.deleteMessage);
} else {
moveToPoison(message);
};
}
}
function moveToPoison(message){
var deferred = Q.defer();
var poisonQueueConf = {
storageName: config.storageName,
storageKey: config.storageKey,
queueName: config.queueName + '-poison'
};
var poisonQueue = require("./queue")(poisonQueueConf);
queue.deleteMessage(message)
.then(function(){
return poisonQueue.addMessage(.messagetext)
.then(function () {
deferred.resolve();
});
}).catch(function(error){
deferred.reject(error);
});
return deferred.promise;
}