使用 Node.js 从 Azure WebJob 轮询 Azure 服务总线队列
Polling an Azure Service Bus Queue from an Azure WebJob using Node.js
尝试使用 Node.js 中编写的 WebJob 轮询 Azure 服务总线队列。我创建了 2 个 WebJobs。第一个是按需发送 10 条唯一的消息到队列。第二个作业是连续的,轮询队列中的消息。
遇到以下问题:
轮询很慢。接收 10 条消息平均需要 10 分钟左右。请参阅下面的示例日志详细信息。这个速度基本用不上。所有延迟都来自 receiveQueueMessage
的响应。响应时间从 0 秒到 ~120 秒不等,平均为 60 秒。
正在以随机顺序接收消息。不是先进先出。
有时消息会收到两次,即使它们是在 ReceiveAndDelete 模式下读取的(我试过没有读取模式参数,应该默认为 ReceiveAndDelete,{isReceiveAndDelete:true}
和 {isPeekLock:false}
结果相同)。
当队列为空时,它应该保持接收请求打开一天,但它总是 returns 在 230 秒后没有消息错误。根据文档,最长为 24 天,所以我不知道 230 秒是从哪里来的:
The maximum timeout for a blocking receive operation in Service Bus
queues is 24 days. However, REST-based timeouts have a maximum value
of 55 seconds.
基本上没有像宣传的那样有效。我做错了什么?
发送消息测试作业:
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;
sendMessage(0);
function sendMessage(count)
{
var message = {
body: 'test message',
customProperties: {
message_number: count,
sent_date: new Date
},
brokerProperties: {
MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
}
};
serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {
if (!err) {
console.log('sent test message number ' + count.toString());
} else {
console.error('error sending message: ' + err);
}
});
//wait 5 seconds to ensure messages are received by service bus in correct order
if (count < messagesToSend) {
setTimeout(function(newCount) {
//send next message
sendMessage(newCount);
}, 5000, count+1);
}
}
接收消息连续作业:
console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);
function listenForMessages(serviceBus)
{
var start = process.hrtime();
var timeOut = 60*60*24; //long poll for 1 day
serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {
var end = process.hrtime(start);
console.log('received a response in %ds seconds', end[0]);
if (err) {
console.log('error requesting message: ' + err);
listenForMessages(serviceBus);
} else {
if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {
console.log('received test message number ' + message.customProperties.message_number.toString());
listenForMessages(serviceBus);
} else {
console.log('invalid message received');
listenForMessages(serviceBus);
}
}
});
}
示例日志输出:
[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
问题是我使用的队列已分区(在 Azure 门户中创建队列时的默认选项)。一旦我创建了一个未分区的新队列,一切都按预期工作而没有延迟(除了长时间轮询尝试中奇怪的 230 秒超时)。所以基本上 node.js 库不适用于分区队列。完全没有。浪费了很多天的时间来解决这个问题。将留在这里供其他人使用。
关闭服务总线队列的分区标志对我也有效。
使用分区队列,一些消息的延迟超过 30 分钟。
一个简单的 DotNet 网络客户端可以毫无延迟地下载所有消息。然而,一旦 nodejs 应该下载消息,只有第一条消息会毫无问题地下载,之后出现延迟。使用 nodejs 更改 http 代理选项 keepalive 和 socket timeout 并没有改善这种情况。
停止 nodejs 后,我不得不等待几分钟,DotNet 客户端才真正开始正常工作。这可重现数次。我还发现简单的DotNet webclient程序在连续启动和停止几次后也出现了类似的问题。
无论如何,您的 post 向我展示了解决方案:关闭分区标志 :)
尝试使用 amqp 从 azure 服务总线分区队列 中读取消息,这将适用于分区 topic/queue,您甚至不必投票很多。
const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;
const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'your_key_goes_here';
const serviceBusHost = 'namespace.servicebus.windows.net';
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`;
const queueName = 'partitionedQueueName';
const client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(() => Promise.all([client.createReceiver(queueName)]))
.spread((receiver) => {
console.log('--------------------------------------------------------------------------');
receiver.on('errorReceived', (err) => {
// check for errors
console.log(err);
});
receiver.on('message', (message) => {
console.log('Received message');
console.log(message);
console.log('----------------------------------------------------------------------------');
});
})
.error((e) => {
console.warn('connection error: ', e);
});
尝试使用 Node.js 中编写的 WebJob 轮询 Azure 服务总线队列。我创建了 2 个 WebJobs。第一个是按需发送 10 条唯一的消息到队列。第二个作业是连续的,轮询队列中的消息。
遇到以下问题:
轮询很慢。接收 10 条消息平均需要 10 分钟左右。请参阅下面的示例日志详细信息。这个速度基本用不上。所有延迟都来自
receiveQueueMessage
的响应。响应时间从 0 秒到 ~120 秒不等,平均为 60 秒。正在以随机顺序接收消息。不是先进先出。
有时消息会收到两次,即使它们是在 ReceiveAndDelete 模式下读取的(我试过没有读取模式参数,应该默认为 ReceiveAndDelete,
{isReceiveAndDelete:true}
和{isPeekLock:false}
结果相同)。当队列为空时,它应该保持接收请求打开一天,但它总是 returns 在 230 秒后没有消息错误。根据文档,最长为 24 天,所以我不知道 230 秒是从哪里来的:
The maximum timeout for a blocking receive operation in Service Bus queues is 24 days. However, REST-based timeouts have a maximum value of 55 seconds.
基本上没有像宣传的那样有效。我做错了什么?
发送消息测试作业:
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;
sendMessage(0);
function sendMessage(count)
{
var message = {
body: 'test message',
customProperties: {
message_number: count,
sent_date: new Date
},
brokerProperties: {
MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
}
};
serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {
if (!err) {
console.log('sent test message number ' + count.toString());
} else {
console.error('error sending message: ' + err);
}
});
//wait 5 seconds to ensure messages are received by service bus in correct order
if (count < messagesToSend) {
setTimeout(function(newCount) {
//send next message
sendMessage(newCount);
}, 5000, count+1);
}
}
接收消息连续作业:
console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);
function listenForMessages(serviceBus)
{
var start = process.hrtime();
var timeOut = 60*60*24; //long poll for 1 day
serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {
var end = process.hrtime(start);
console.log('received a response in %ds seconds', end[0]);
if (err) {
console.log('error requesting message: ' + err);
listenForMessages(serviceBus);
} else {
if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {
console.log('received test message number ' + message.customProperties.message_number.toString());
listenForMessages(serviceBus);
} else {
console.log('invalid message received');
listenForMessages(serviceBus);
}
}
});
}
示例日志输出:
[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
问题是我使用的队列已分区(在 Azure 门户中创建队列时的默认选项)。一旦我创建了一个未分区的新队列,一切都按预期工作而没有延迟(除了长时间轮询尝试中奇怪的 230 秒超时)。所以基本上 node.js 库不适用于分区队列。完全没有。浪费了很多天的时间来解决这个问题。将留在这里供其他人使用。
关闭服务总线队列的分区标志对我也有效。
使用分区队列,一些消息的延迟超过 30 分钟。 一个简单的 DotNet 网络客户端可以毫无延迟地下载所有消息。然而,一旦 nodejs 应该下载消息,只有第一条消息会毫无问题地下载,之后出现延迟。使用 nodejs 更改 http 代理选项 keepalive 和 socket timeout 并没有改善这种情况。
停止 nodejs 后,我不得不等待几分钟,DotNet 客户端才真正开始正常工作。这可重现数次。我还发现简单的DotNet webclient程序在连续启动和停止几次后也出现了类似的问题。
无论如何,您的 post 向我展示了解决方案:关闭分区标志 :)
尝试使用 amqp 从 azure 服务总线分区队列 中读取消息,这将适用于分区 topic/queue,您甚至不必投票很多。
const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;
const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'your_key_goes_here';
const serviceBusHost = 'namespace.servicebus.windows.net';
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`;
const queueName = 'partitionedQueueName';
const client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(() => Promise.all([client.createReceiver(queueName)]))
.spread((receiver) => {
console.log('--------------------------------------------------------------------------');
receiver.on('errorReceived', (err) => {
// check for errors
console.log(err);
});
receiver.on('message', (message) => {
console.log('Received message');
console.log(message);
console.log('----------------------------------------------------------------------------');
});
})
.error((e) => {
console.warn('connection error: ', e);
});