如何使用 WebJobs 处理存储队列

How to handle the Storage Queue using the WebJobs

我刚开始使用 Azure 作为我的移动开发和 Web 开发。 我使用 NodeJs 作为我的框架来处理 azure 后端。我在 Azure 中使用移动服务和 Web 应用程序。

情况是这样的,我正在使用 Azure 中的存储队列,我正在使用我的 webapps 中的 webjob 来处理存储队列。队列中的消息将通过通知中心发送给每个特定用户。 (推送通知)

因此,队列的大小将达到 50,000 条或更多队列消息。这些消息都是用来给用户一条条推送消息的。但是,我尝试通过安排 2 分钟的间隔来使用 WebJob 处理队列。我知道 webjob 不会 运行 两个实例,而当前的时间表是 运行ning。 最初,我想连续使用 运行 的 webjob,但是一旦脚本 运行 完成,它将进入等待重新启动状态。我对 webjob 连续 运行 的假设是,它将 运行 在脚本的无限循环下一遍又一遍。直到它发现异常或错误。我的假设出错了,一​​旦它成功完成整个脚本,它将自行重新启动。我知道重启可以调整到小于 60 秒,但我不确定这是否有帮助,因为我也可以进行很多 aysnc 操作。

对于我的脚本,它将 运行 循环中的 50,000 条或更多用户消息。然后,它将通过 Azure nodejs 包发送推送消息,然后在 return 时,它将删除消息,使其不再出现在队列中。因此,对于动作中的每个循环,都会有一些异步操作。

然而,一切正常,但 webjob 最多只执行 5 分钟,然后它将在下一个计划中再次 运行。这意味着,无论操作如何,它最多只会 运行 5 分钟。我尝试使用队列中的 1,000 条消息,一切正常,但是当消息达到 5,000 条及以上时,时间就不够用了。因此,一些异步操作没有完成,导致消息没有被删除。

有没有办法延长 5 分钟的执行时间或其他更好的方法来处理存储队列。我查看了 Webjobs SDK,但它仅限于 C# 和 Visual Studio。我正在使用我无法使用的 Mac OSX 和 Javascript。

请指教,因为我浪费了很多时间来找出使用 webjobs 处理存储队列的最佳方式,但现在看来,当消息变大时以及处理异步操作时,它似乎无法达到目的总共只有 5 分钟的执行时间。我目前没有任何 VM,我只在 azure 中使用 PAAS。

WebJobs 的各种配置设置在 this wiki page 上进行了说明。在您的情况下,您应该增加 WEBJOBS_IDLE_TIMEOUT 值,该值是触发作业在一段时间内未产生任何输出时超时的时间(以秒为单位)。 WEBJOBS_IDLE_TIMEOUT 设置需要在门户应用程序设置中配置,而不是通过 app.config 文件。

根据您的描述:

All these messages are used to push out the message to the user one by one

it will run 50,000 or more users messages in the loop

所以你的要求是将队列中的每条消息发送给用户,现在你一次获取队列中的所有消息,即使消息大小将超过 50,000,并循环消息以进行进一步操作?

如有任何误会,请随时告诉我。

在我看来,cloud 可以立即获取队列中的顶部消息,并将其发送给您的用户,这样可以显着减少处理时间,并且可以在连续的 webjob 中设置。您可以参考How To: Peek at the Next Message了解如何在不将其从队列中移除的情况下查看队列前面的消息

更新

我发现你提到我在你的整个项目架构中 Node.js 中也有一个 Web 应用程序。 所以我考虑是否可以利用 Web 应用程序中的连续 webjob 来获取一条消息并一次性发送到通知中心。 这是我的测试代码片段:

var azureStorage = require('azure-storage'),
    azure = require('azure'),
    accountName = '<accountName>',
    accountKey = '<accountKey>';
var queueSvc = azureStorage.createQueueService(accountName, accountKey);
var notificationHubService = azure.createNotificationHubService('<notificationhub-name>', '<connectionstring>');
queueSvc.getMessages('myqueue', {numOfMessages:1}, function(error, result, response) {
    if (!error) {
        // Message text is in messages[0].messagetext
        var message = result[0];
        console.log(message.messagetext);
        var payload = {
            data: {
                msg: message.messagetext
            }
        };
        notificationHubService.gcm.send(null, payload, function(error) {
            if (!error) {
                //notification sent
                console.log('notification sent');
                queueSvc.deleteMessage('myqueue', message.messageid,message.popreceipt,function(error, response) {
                    if (!error) {
                        console.log(response);
                        // Message deleted
                    } else {
                        console.log(error);
                    }
                });
            }
        });
    }
});

详情参考How to use Notification Hubs from Node.js And https://github.com/Azure/azure-storage-node/blob/master/lib/services/queue/queueservice.js#L727

更新2

想到Service-bus demo on GitHub,我修改了上面的代码,大大提高了效率。

这里是代码片段,供您参考:

var queueName = 'myqueue';

function checkForMessages(queueSvc, queueName, callback) {
    queueSvc.getMessages(queueName, function(err, message) {

        if (err) {
            if (err === 'No messages to receive') {
                console.log('No messages');
            } else {
                console.log(err);
                // callback(err);
            }
        } else {
            callback(null, message[0]);
            console.log(message);
        }
    });
}

function processMessage(queueSvc, err, lockedMsg) {
    if (err) {
        console.log('Error on Rx: ', err);
    } else {
        console.log('Rx: ', lockedMsg);
        var payload = {
            data: {
                msg: lockedMsg.messagetext
            }
        };
        notificationHubService.gcm.send(null, payload, function(error) {
            if (!error) {
                //notification sent

                console.log('notification sent');
                console.log(lockedMsg)
                console.log(lockedMsg.popreceipt)
                queueSvc.deleteMessage(queueName, lockedMsg.messageid, lockedMsg.popreceipt, function(err2) {
                    if (err2) {
                        console.log('Failed to delete message: ', err2);
                    } else {
                        console.log('Deleted message.');
                    }
                })
            }
        });

    }
}

var t = setInterval(checkForMessages.bind(null, queueSvc, queueName, processMessage.bind(null, queueSvc)), 100);

我在 setInterval 中将循环时间设置为 100 毫秒,现在在我的测试中它每分钟可以处理近 600 条消息。