在 Azure Webjob 中使用 CustomQueueProcessorFactory 消息 returns 进行排队,即使在调用 DeleteMessageAsync 之后
In Azure Webjob When using CustomQueueProcessorFactory messages returns to queue even after DeleteMessageAsync is called
我已经尝试实现我自己的 QueueProcessorFactory,它工作正常,除了一件事我无法理解。在我尝试一条消息 5 次(默认)后,它运行 CopyMessageToPoisonQueueAsync 然后 DeleteMessageAsync。
到目前为止一切顺利,但 10 分钟后消息再次出现在队列中,出队计数为 5,它也在毒物队列中,然后是相同的过程,CopyMessageToPoisonQueueAsync, DeleteMessageAsync, posionqueue 中的一个额外项目与已经复制的项目完全相同,10 分钟后相同的过程,但出队计数为 6。我应该更改 ExpirationTime 并设置删除时到现在,还是我错过了其他东西?
这是我的代码:
class Program
{
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
var host = new JobHost(config);
host.RunAndBlock();
}
}
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
CustomQueueProcessor processor = new CustomQueueProcessor(context);
CustomQueueProcessors.Add(processor);
return processor;
}
public class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context)
: base(context)
{
}
public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
{
return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
}
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
{
visibilityTimeout = TimeSpan.FromSeconds(2);
await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
}
}
}
如果我添加一些 ConsoleWritelines,我会得到这个输出:
开头因重复而省略
.
.
.
BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出列计数:5 日期:2017-06-26 13:33:42
正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=17405a55-6d28-48b2-a874-718c0b741f61)
测试队列处理器工厂
执行函数时出现异常:Functions.ProcessQueueMessage
Microsoft.Azure.WebJobs.Host.FunctionInvocationException: 执行函数时出现异常: Functions.ProcessQueueMessage ---> System.Exception: Derp!
在 WebJobTest1.Functions.ProcessQueueMessage(字符串消息,TextWriter 日志)
...消息省略....
CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5
CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5
消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。
CopyMessageToPoisonQueueAsync 消息:da643007-954a-4296-9e9a-54ebb0aec6c5 出队计数:5
十分钟等待时间:
BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 日期:2017-06-26 13:43:46
正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345)
测试队列处理器工厂
执行函数时出现异常:Functions.ProcessQueueMessage
CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6
CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6
消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。
CopyMessageToPoisonQueueAsync 消息:c43fd9fb-c2d7-4745-91ae-33cdc407ede6 出队计数:6
根据您的描述,我认为这是由于将 Storage SDK 8.x 与 WebJobs SDK 一起使用时的已知问题所致。以下是类似的问题:
Poison messages stay undeleted-but-invisible with the latest WindowsAzure.Storage 8.1.1
WebJobs V1.1.2 fails to remove poison messages from queue with V8.0.1 of Storage
根据我的测试,这个问题暂时还没有解决。您可以降级存储 SDK 或更改 CopyMessageToPoisonQueueAsync
如下:
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
newMessage.SetMessageContent(message.AsBytes);
return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}
我已经尝试实现我自己的 QueueProcessorFactory,它工作正常,除了一件事我无法理解。在我尝试一条消息 5 次(默认)后,它运行 CopyMessageToPoisonQueueAsync 然后 DeleteMessageAsync。
到目前为止一切顺利,但 10 分钟后消息再次出现在队列中,出队计数为 5,它也在毒物队列中,然后是相同的过程,CopyMessageToPoisonQueueAsync, DeleteMessageAsync, posionqueue 中的一个额外项目与已经复制的项目完全相同,10 分钟后相同的过程,但出队计数为 6。我应该更改 ExpirationTime 并设置删除时到现在,还是我错过了其他东西?
这是我的代码:
class Program
{
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
var host = new JobHost(config);
host.RunAndBlock();
}
}
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
CustomQueueProcessor processor = new CustomQueueProcessor(context);
CustomQueueProcessors.Add(processor);
return processor;
}
public class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context)
: base(context)
{
}
public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
{
return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
}
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
{
visibilityTimeout = TimeSpan.FromSeconds(2);
await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
}
}
}
如果我添加一些 ConsoleWritelines,我会得到这个输出:
开头因重复而省略 . . .
BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出列计数:5 日期:2017-06-26 13:33:42 正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=17405a55-6d28-48b2-a874-718c0b741f61) 测试队列处理器工厂 执行函数时出现异常:Functions.ProcessQueueMessage Microsoft.Azure.WebJobs.Host.FunctionInvocationException: 执行函数时出现异常: Functions.ProcessQueueMessage ---> System.Exception: Derp! 在 WebJobTest1.Functions.ProcessQueueMessage(字符串消息,TextWriter 日志)
...消息省略....
CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5 消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:da643007-954a-4296-9e9a-54ebb0aec6c5 出队计数:5 十分钟等待时间: BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 日期:2017-06-26 13:43:46 正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345) 测试队列处理器工厂 执行函数时出现异常:Functions.ProcessQueueMessage CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:c43fd9fb-c2d7-4745-91ae-33cdc407ede6 出队计数:6
根据您的描述,我认为这是由于将 Storage SDK 8.x 与 WebJobs SDK 一起使用时的已知问题所致。以下是类似的问题:
Poison messages stay undeleted-but-invisible with the latest WindowsAzure.Storage 8.1.1
WebJobs V1.1.2 fails to remove poison messages from queue with V8.0.1 of Storage
根据我的测试,这个问题暂时还没有解决。您可以降级存储 SDK 或更改 CopyMessageToPoisonQueueAsync
如下:
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
newMessage.SetMessageContent(message.AsBytes);
return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}