使用 Node.js 从 Azure WebJob 轮询 Azure 服务总线队列

Polling an Azure Service Bus Queue from an Azure WebJob using Node.js

尝试使用 Node.js 中编写的 WebJob 轮询 Azure 服务总线队列。我创建了 2 个 WebJobs。第一个是按需发送 10 条唯一的消息到队列。第二个作业是连续的,轮询队列中的消息。

遇到以下问题:

  1. 轮询很慢。接收 10 条消息平均需要 10 分钟左右。请参阅下面的示例日志详细信息。这个速度基本用不上。所有延迟都来自 receiveQueueMessage 的响应。响应时间从 0 秒到 ~120 秒不等,平均为 60 秒。

  2. 正在以随机顺序接收消息。不是先进先出。

  3. 有时消息会收到两次,即使它们是在 ReceiveAndDelete 模式下读取的(我试过没有读取模式参数,应该默认为 ReceiveAndDelete,{isReceiveAndDelete:true}{isPeekLock:false} 结果相同)。

  4. 当队列为空时,它应该保持接收请求打开一天,但它总是 returns 在 230 秒后没有消息错误。根据文档,最长为 24 天,所以我不知道 230 秒是从哪里来的:

The maximum timeout for a blocking receive operation in Service Bus queues is 24 days. However, REST-based timeouts have a maximum value of 55 seconds.

基本上没有像宣传的那样有效。我做错了什么?

发送消息测试作业:

var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;

sendMessage(0);

function sendMessage(count)
{
    var message = {
        body: 'test message',
        customProperties: {
            message_number: count,
            sent_date: new Date
        },
        brokerProperties: {
            MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
        }
    };

    serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {

        if (!err) {
            console.log('sent test message number ' + count.toString());
        } else {
            console.error('error sending message: ' + err);
        }

    });

    //wait 5 seconds to ensure messages are received by service bus in correct order
    if (count < messagesToSend) {
        setTimeout(function(newCount) {
            //send next message
            sendMessage(newCount);
        }, 5000, count+1);
    }
}    

接收消息连续作业:

console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);

function listenForMessages(serviceBus)
{
    var start = process.hrtime();
    var timeOut = 60*60*24; //long poll for 1 day
    serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {

        var end = process.hrtime(start);
        console.log('received a response in %ds seconds', end[0]);

        if (err) {

            console.log('error requesting message: ' + err);
            listenForMessages(serviceBus);

        } else {

            if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {

                console.log('received test message number ' + message.customProperties.message_number.toString());
                listenForMessages(serviceBus);

            } else {

                console.log('invalid message received');
                listenForMessages(serviceBus);

            }

        }

    });
}

示例日志输出:

[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds    
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive

问题是我使用的队列已分区(在 Azure 门户中创建队列时的默认选项)。一旦我创建了一个未分区的新队列,一切都按预期工作而没有延迟(除了长时间轮询尝试中奇怪的 230 秒超时)。所以基本上 node.js 库不适用于分区队列。完全没有。浪费了很多天的时间来解决这个问题。将留在这里供其他人使用。

关闭服务总线队列的分区标志对我也有效。

使用分区队列,一些消息的延迟超过 30 分钟。 一个简单的 DotNet 网络客户端可以毫无延迟地下载所有消息。然而,一旦 nodejs 应该下载消息,只有第一条消息会毫无问题地下载,之后出现延迟。使用 nodejs 更改 http 代理选项 keepalive 和 socket timeout 并没有改善这种情况。

停止 nodejs 后,我不得不等待几分钟,DotNet 客户端才真正开始正常工作。这可重现数次。我还发现简单的DotNet webclient程序在连续启动和停止几次后也出现了类似的问题。

无论如何,您的 post 向我展示了解决方案:关闭分区标志 :)

尝试使用 amqp 从 azure 服务总线分区队列 中读取消息,这将适用于分区 topic/queue,您甚至不必投票很多。

const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'your_key_goes_here';
const serviceBusHost = 'namespace.servicebus.windows.net';
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`;
const queueName = 'partitionedQueueName';
const client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(() => Promise.all([client.createReceiver(queueName)]))
.spread((receiver) => {
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', (err) => {
        // check for errors
        console.log(err);
    });
    receiver.on('message', (message) => {
        console.log('Received message');
        console.log(message);
        console.log('----------------------------------------------------------------------------');
    });
})
.error((e) => {
    console.warn('connection error: ', e);
});

https://www.npmjs.com/package/amqp10