Azure WorkerRole 在 WebJob 等队列上触发
Azure WorkerRole trigger on queue like WebJob
我习惯于在 Azure 上使用 webjob 触发 Azure 队列。它就像一个魅力。
static void Main(string[] args)
{
JobHost host = new JobHost();
host.RunAndBlock();
}
public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
logger.WriteLine(logMessage);
}
queueTrigger 的真正优点是在消息触发的过程未完成之前,消息保持不可见(而不是删除)。因此,如果您关闭 webjob(例如,对于 webjob 更新),该消息将在队列中可见(稍等超时后),由更新的 webjob 处理(完美)。
现在我想做同样的事情,但在 worker 角色上。今天我喜欢这样。
while (true)
{
var cloudMessage = await sourceImportationQueue.GetMessageAsync();
if (cloudMessage != null)
sourceImportationQueue.DeleteMessage(cloudMessage);
// process my job (few hours)
else
await Task.Delay(1000 * 5);
}
但是如果我在工作期间停止工人,我就会丢失消息。那么我该怎么做才能触发 webJob?
默认情况下,一旦检索到队列消息,它将在 5 分钟内不可见。在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理。
在您的代码示例中,您将在从队列中获取消息后立即将其删除。如果您想确保安全,则只能在流程结束时删除该消息。您是否尝试在处理作业结束时移动 sourceImportationQueue.DeleteMessage(cloudMessage);
?
如果不使用某种永久性存储来跟踪您的工作进度,可能无法解决此问题。正如已经确定的那样,您将在作业开始之前删除消息,因此如果作业由于任何原因(包括停止角色)而失败,消息将丢失。一条消息的最长锁定时间为 5 分钟,这意味着该消息将在您的作业仍在 运行 时再次出现,如果删除操作移至末尾,它将因失去锁定而失败。
如果您的长 运行 作业由多个较小的步骤组成 none,其中超过 5 分钟的时间段,那么您将能够定期调用 RenewLock() 以保留对消息的锁定并阻止它重新出现在队列中。只要锁永不过期,在这种情况下最后的 DeleteMessage 就会成功。不过,这可能不太适合您的情况。
一种可能的解决方案是将作业状态写入 Azure table 并在整个作业处理过程中记录状态。您的工作者角色循环将检查 table 是否有任何尚未完成的工作并继续任何存在的工作,如果 none 找到检查服务总线是否有任何新工作。此解决方案还可以让您有机会从失败的作业到达点开始接手,而不是从头开始重新开始 2 小时的作业。
最后我找到了一个简单的解决方案。在 运行 我几个小时的工作之前,我启动了一个任务 KeepHiddenMessageAsync
,用超时更新消息。在超时结束之前完成消息的新更新。如果出现问题,则将达到消息超时并且消息将变为可见。
private bool jobIsComplete;
private void Run()
{
while (true)
{
jobIsComplete = false;
//get the message
var cloudMessage = await queue.GetMessageAsync();
if (cloudMessage != null)
//run the task to keep the message until end of the job and worker role stopping for an update for example
var keepHiddenMessageTask = KeepHiddenMessageAsync(cloudMessage);
//
// process my job (few hours)
//
jobIsComplete = true;
await keepHiddenMessageTask;
await _queue.DeleteMessageAsync(cloudMessage);
else
await Task.Delay(1000 * 5);
}
}
private async Task KeepHiddenMessageAsync(CloudQueueMessage iCloudQueueMessage)
{
while (true)
{
//Update message and hidding during 5 new minutes
await _queue.UpdateMessageAsync(iCloudQueueMessage, TimeSpan.FromMinutes(5), MessageUpdateFields.Visibility);
//Wait 4 minutes
for (int i = 0; i < 60 * 4; i++)
{
if (JobIsComplete)
return;
else
await Task.Delay(1000);
}
}
}
我习惯于在 Azure 上使用 webjob 触发 Azure 队列。它就像一个魅力。
static void Main(string[] args)
{
JobHost host = new JobHost();
host.RunAndBlock();
}
public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
logger.WriteLine(logMessage);
}
queueTrigger 的真正优点是在消息触发的过程未完成之前,消息保持不可见(而不是删除)。因此,如果您关闭 webjob(例如,对于 webjob 更新),该消息将在队列中可见(稍等超时后),由更新的 webjob 处理(完美)。
现在我想做同样的事情,但在 worker 角色上。今天我喜欢这样。
while (true)
{
var cloudMessage = await sourceImportationQueue.GetMessageAsync();
if (cloudMessage != null)
sourceImportationQueue.DeleteMessage(cloudMessage);
// process my job (few hours)
else
await Task.Delay(1000 * 5);
}
但是如果我在工作期间停止工人,我就会丢失消息。那么我该怎么做才能触发 webJob?
默认情况下,一旦检索到队列消息,它将在 5 分钟内不可见。在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理。
在您的代码示例中,您将在从队列中获取消息后立即将其删除。如果您想确保安全,则只能在流程结束时删除该消息。您是否尝试在处理作业结束时移动 sourceImportationQueue.DeleteMessage(cloudMessage);
?
如果不使用某种永久性存储来跟踪您的工作进度,可能无法解决此问题。正如已经确定的那样,您将在作业开始之前删除消息,因此如果作业由于任何原因(包括停止角色)而失败,消息将丢失。一条消息的最长锁定时间为 5 分钟,这意味着该消息将在您的作业仍在 运行 时再次出现,如果删除操作移至末尾,它将因失去锁定而失败。
如果您的长 运行 作业由多个较小的步骤组成 none,其中超过 5 分钟的时间段,那么您将能够定期调用 RenewLock() 以保留对消息的锁定并阻止它重新出现在队列中。只要锁永不过期,在这种情况下最后的 DeleteMessage 就会成功。不过,这可能不太适合您的情况。
一种可能的解决方案是将作业状态写入 Azure table 并在整个作业处理过程中记录状态。您的工作者角色循环将检查 table 是否有任何尚未完成的工作并继续任何存在的工作,如果 none 找到检查服务总线是否有任何新工作。此解决方案还可以让您有机会从失败的作业到达点开始接手,而不是从头开始重新开始 2 小时的作业。
最后我找到了一个简单的解决方案。在 运行 我几个小时的工作之前,我启动了一个任务 KeepHiddenMessageAsync
,用超时更新消息。在超时结束之前完成消息的新更新。如果出现问题,则将达到消息超时并且消息将变为可见。
private bool jobIsComplete;
private void Run()
{
while (true)
{
jobIsComplete = false;
//get the message
var cloudMessage = await queue.GetMessageAsync();
if (cloudMessage != null)
//run the task to keep the message until end of the job and worker role stopping for an update for example
var keepHiddenMessageTask = KeepHiddenMessageAsync(cloudMessage);
//
// process my job (few hours)
//
jobIsComplete = true;
await keepHiddenMessageTask;
await _queue.DeleteMessageAsync(cloudMessage);
else
await Task.Delay(1000 * 5);
}
}
private async Task KeepHiddenMessageAsync(CloudQueueMessage iCloudQueueMessage)
{
while (true)
{
//Update message and hidding during 5 new minutes
await _queue.UpdateMessageAsync(iCloudQueueMessage, TimeSpan.FromMinutes(5), MessageUpdateFields.Visibility);
//Wait 4 minutes
for (int i = 0; i < 60 * 4; i++)
{
if (JobIsComplete)
return;
else
await Task.Delay(1000);
}
}
}