Azure 触发的队列无法删除
Azure triggered queue failing to delete
根据 here and here, numerous SO questions like this one 的文档,我的理解是,当排队的消息失败给定次数(在本例中为 5 次)时,它会自动从当前队列移入有害队列。
不幸的是,我有限的经验发现这只是部分正确,因为当作业确实未能达到最大出队计数时,它会自动添加到毒物队列中,但不会从原始队列中删除,然后重新处理显然无法更改,10 分钟后,将相同的消息添加到毒物队列,创建了重复项,但仍然没有删除它。
当我实现自己的 IQueueProcessorFactory
class 时,在覆盖 DeleteMessageAsync
的同时创建了自定义 QueueProcessor
,我能够确认异常时正在调用该方法被抛出 5 次,该方法无一例外地完成,但队列中的消息仍然存在。我也试过删除正常队列和毒队列。
我使用的代码:
public class Program
{
private const string QUEUE_NAME = "some-queue";
// Please set the following connection strings in app.config for this WebJob to run:
// AzureWebJobsDashboard and AzureWebJobsStorage
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.QueueProcessorFactory = new CustomFactory();
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
private class CustomFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
return new CustomQueueProcessor(context);
}
private class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context) : base(context)
{
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
}
}
public static void QueueTrigger([QueueTrigger(QUEUE_NAME)] CloudQueueMessage message)
{
Console.WriteLine($"Processing message: {message.AsString}");
throw new Exception("test exception");
}
}
除消息保留在原始队列中外,一切都按预期工作。我假设并希望错误就在我头上,或者我只是忽略了一些愚蠢的事情,因为我是队列的新手,但是在花了将近 2 天的时间在互联网上搜索信息之后,我正式不知所措接下来该做什么或尝试什么。
编辑
虽然我们最终选择了服务总线,但值得注意的是,我们想出了一个替代方案,即我们自己从队列触发器中半管理队列。
这需要检查出队计数,如果它高于最大出队(重试)计数,只需 return。这将向调用者发出信号,消息 "successfully" 已处理,然后将其从队列中删除。该方法将导致几乎预期的行为,即邮件将被添加到有害队列,同时在 10 分钟后从正常队列中删除。
它还有一个额外的好处,那就是继续使用未来的软件包版本或更新队列本身,这将解决原始问题,因为 if 永远不会成立。
public class Program
{
private const int MAX_DEQUEUE_COUNT = 5;
static void Main()
{
var config = new JobHostConfiguration();
...
config.Queues.MaxDequeueCount = MAX_DEQUEUE_COUNT;
...
}
public static void QueueTrigger([QueueTrigger("some-queue")] CloudQueueMessage message)
{
if (message.DequeueCount > MAX_DEQUEUE_COUNT)
{
// prevents the message from indefinitely retrying every 10 minutes and ultimately creating duplicates within the poison queue.
return;
}
// do stuff
}
我将根据个人经验和我在 Whosebug 上看到的内容给你一个非参考答案。您不是第一个遇到自动死信和遵守 WebJob QueueTriggerAttributes 的最大出队计数问题的人。我的建议是回避存储队列 + QueueTriggers 的脆弱性,转而使用服务总线队列和服务总线触发器。
作为一种消息传递技术,服务总线队列的功能更加全面,而且成本也相当。我选择使用存储队列而不是服务总线队列的唯一真正原因是,如果您需要存储超过 80GB 的消息,这是分区的服务总线队列限制。
我遇到了同样的行为,这是由 webjobs sdk (v2) 和存储客户端库之间产生的错误引起的 v8.x。
自 2.1.0-beta1-10851 以来,这应该得到修复。缺点是目前还没有稳定发布的2.1.0版本。
根据 here and here, numerous SO questions like this one 的文档,我的理解是,当排队的消息失败给定次数(在本例中为 5 次)时,它会自动从当前队列移入有害队列。
不幸的是,我有限的经验发现这只是部分正确,因为当作业确实未能达到最大出队计数时,它会自动添加到毒物队列中,但不会从原始队列中删除,然后重新处理显然无法更改,10 分钟后,将相同的消息添加到毒物队列,创建了重复项,但仍然没有删除它。
当我实现自己的 IQueueProcessorFactory
class 时,在覆盖 DeleteMessageAsync
的同时创建了自定义 QueueProcessor
,我能够确认异常时正在调用该方法被抛出 5 次,该方法无一例外地完成,但队列中的消息仍然存在。我也试过删除正常队列和毒队列。
我使用的代码:
public class Program
{
private const string QUEUE_NAME = "some-queue";
// Please set the following connection strings in app.config for this WebJob to run:
// AzureWebJobsDashboard and AzureWebJobsStorage
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.QueueProcessorFactory = new CustomFactory();
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
private class CustomFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
return new CustomQueueProcessor(context);
}
private class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context) : base(context)
{
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
}
}
public static void QueueTrigger([QueueTrigger(QUEUE_NAME)] CloudQueueMessage message)
{
Console.WriteLine($"Processing message: {message.AsString}");
throw new Exception("test exception");
}
}
除消息保留在原始队列中外,一切都按预期工作。我假设并希望错误就在我头上,或者我只是忽略了一些愚蠢的事情,因为我是队列的新手,但是在花了将近 2 天的时间在互联网上搜索信息之后,我正式不知所措接下来该做什么或尝试什么。
编辑
虽然我们最终选择了服务总线,但值得注意的是,我们想出了一个替代方案,即我们自己从队列触发器中半管理队列。
这需要检查出队计数,如果它高于最大出队(重试)计数,只需 return。这将向调用者发出信号,消息 "successfully" 已处理,然后将其从队列中删除。该方法将导致几乎预期的行为,即邮件将被添加到有害队列,同时在 10 分钟后从正常队列中删除。
它还有一个额外的好处,那就是继续使用未来的软件包版本或更新队列本身,这将解决原始问题,因为 if 永远不会成立。
public class Program
{
private const int MAX_DEQUEUE_COUNT = 5;
static void Main()
{
var config = new JobHostConfiguration();
...
config.Queues.MaxDequeueCount = MAX_DEQUEUE_COUNT;
...
}
public static void QueueTrigger([QueueTrigger("some-queue")] CloudQueueMessage message)
{
if (message.DequeueCount > MAX_DEQUEUE_COUNT)
{
// prevents the message from indefinitely retrying every 10 minutes and ultimately creating duplicates within the poison queue.
return;
}
// do stuff
}
我将根据个人经验和我在 Whosebug 上看到的内容给你一个非参考答案。您不是第一个遇到自动死信和遵守 WebJob QueueTriggerAttributes 的最大出队计数问题的人。我的建议是回避存储队列 + QueueTriggers 的脆弱性,转而使用服务总线队列和服务总线触发器。
作为一种消息传递技术,服务总线队列的功能更加全面,而且成本也相当。我选择使用存储队列而不是服务总线队列的唯一真正原因是,如果您需要存储超过 80GB 的消息,这是分区的服务总线队列限制。
我遇到了同样的行为,这是由 webjobs sdk (v2) 和存储客户端库之间产生的错误引起的 v8.x。
自 2.1.0-beta1-10851 以来,这应该得到修复。缺点是目前还没有稳定发布的2.1.0版本。