Azure WorkerRole 在 WebJob 等队列上触发

Azure WorkerRole trigger on queue like WebJob

我习惯于在 Azure 上使用 webjob 触发 Azure 队列。它就像一个魅力。

Azure tutorial webjob + queue

static void Main(string[] args)
{
    JobHost host = new JobHost();
    host.RunAndBlock();
}

public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
    logger.WriteLine(logMessage);
}

queueTrigger 的真正优点是在消息触发的过程未完成之前,消息保持不可见(而不是删除)。因此,如果您关闭 webjob(例如,对于 webjob 更新),该消息将在队列中可见(稍等超时后),由更新的 webjob 处理(完美)。

现在我想做同样的事情,但在 worker 角色上。今天我喜欢这样。

while (true)
{
     var cloudMessage = await sourceImportationQueue.GetMessageAsync();
     if (cloudMessage != null)
           sourceImportationQueue.DeleteMessage(cloudMessage);
      // process my job (few hours)
     else
           await Task.Delay(1000 * 5);
}

但是如果我在工作期间停止工人,我就会丢失消息。那么我该怎么做才能触发 webJob?

默认情况下,一旦检索到队列消息,它将在 5 分钟内不可见。在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理。

在您的代码示例中,您将在从队列中获取消息后立即将其删除。如果您想确保安全,则只能在流程结束时删除该消息。您是否尝试在处理作业结束时移动 sourceImportationQueue.DeleteMessage(cloudMessage);

如果不使用某种永久性存储来跟踪您的工作进度,可能无法解决此问题。正如已经确定的那样,您将在作业开始之前删除消息,因此如果作业由于任何原因(包括停止角色)而失败,消息将丢失。一条消息的最长锁定时间为 5 分钟,这意味着该消息将在您的作业仍在 运行 时再次出现,如果删除操作移至末尾,它将因失去锁定而失败。

如果您的长 运行 作业由多个较小的步骤组成 none,其中超过 5 分钟的时间段,那么您将能够定期调用 RenewLock() 以保留对消息的锁定并阻止它重新出现在队列中。只要锁永不过期,在这种情况下最后的 DeleteMessage 就会成功。不过,这可能不太适合您的情况。

一种可能的解决方案是将作业状态写入 Azure table 并在整个作业处理过程中记录状态。您的工作者角色循环将检查 table 是否有任何尚未完成的工作并继续任何存在的工作,如果 none 找到检查服务总线是否有任何新工作。此解决方案还可以让您有机会从失败的作业到达点开始接手,而不是从头开始重新开始 2 小时的作业。

最后我找到了一个简单的解决方案。在 运行 我几个小时的工作之前,我启动了一个任务 KeepHiddenMessageAsync,用超时更新消息。在超时结束之前完成消息的新更新。如果出现问题,则将达到消息超时并且消息将变为可见。

        private bool jobIsComplete;

        private void Run()
        {
             while (true)
            {
                 jobIsComplete = false;
                 //get the message
                 var cloudMessage = await queue.GetMessageAsync();

                 if (cloudMessage != null)
                        //run the task to keep the message until end of the job and worker role stopping for an update for example 
                       var keepHiddenMessageTask = KeepHiddenMessageAsync(cloudMessage);

                        //
                        // process my job (few hours)
                        //

                      jobIsComplete = true;
                      await keepHiddenMessageTask;
                      await _queue.DeleteMessageAsync(cloudMessage);
                 else
                       await Task.Delay(1000 * 5);
            }
        }

        private async Task KeepHiddenMessageAsync(CloudQueueMessage iCloudQueueMessage)
        {
            while (true)
            {
                //Update message and hidding during 5 new minutes
                await _queue.UpdateMessageAsync(iCloudQueueMessage, TimeSpan.FromMinutes(5), MessageUpdateFields.Visibility);

                //Wait 4 minutes
                for (int i = 0; i < 60 * 4; i++)
                {
                    if (JobIsComplete)
                        return;
                    else
                        await Task.Delay(1000);
                }
            }
        }