当队列长度 < 10 时,Azure 服务总线中的出队请求经常 returns null

Dequeue request in Azure Service Bus when queue length < 10 frequently returns null

我一直在 NodeJS 中试验 Azure 服务总线队列。我已经根据文档中的代码示例构建了 sender.js 和 listener.js。建立队列工作正常。在消息长度达到 10 之前,从队列中出队和删除消息工作正常。此时,出队请求 return 空消息大约 5 次中有 4 次。如果我继续循环出队请求,最终,它将出队并删除最后 10 条消息。但是,这似乎非常低效。还有其他人遇到过这个问题吗?

listener.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint"; // dev

console.log(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);
// var serviceBusService = azure.createServiceBusService();

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.log(error);
        if(!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if(!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if(!error){
            console.log(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.log(error, receivedMessage);
        // console.log(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.log(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.log('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

delayTimeIfQueueIsEmpty = 30000; // 30s
concurrency = 2;
queueName = "jobs";
// listen and dequeue message from azure message bus
listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName);

sender.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint";

console.log(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.log(error);
        if(!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if(!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if(!error){
            console.log(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.log(error, receivedMessage);
        // console.log(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.log(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.log('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

function pushMessageQueue(concurrency,queueName) {

    var taskHandler = function(task, done) {

        var message = {
            body: String(task.id),
            customProperties: {
                url: task.url
            }};

        _sendQueueMessage(task.queueName, message, function() {
            console.log('hello ' + task.id);
            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

concurrency = 2;
queueName = "jobs";
pushMessageQueue(concurrency,queueName); // push message to queue for testing: 100 messages per call

终于能够联系到 Azure 支持并找到答案。 ServiceBus 默认启用分区。发出 http 请求时(Azure ServiceBus 的 NodeJS SDK 发出 http REST 调用),当消息计数较低时可能会导致分区具有不同的消息集,因为它们没有机会同步。这可以通过创建禁用分区的新队列或增加保持活动状态或使用允许发出 https 请求的 DotNet SDK 来解决。