在 Azure Webjob 中使用 CustomQueueProcessorFactory 消息 returns 进行排队,即使在调用 DeleteMessageAsync 之后

In Azure Webjob When using CustomQueueProcessorFactory messages returns to queue even after DeleteMessageAsync is called

我已经尝试实现我自己的 QueueProcessorFactory,它工作正常,除了一件事我无法理解。在我尝试一条消息 5 次(默认)后,它运行 CopyMessageToPoisonQueueAsync 然后 DeleteMessageAsync

到目前为止一切顺利,但 10 分钟后消息再次出现在队列中,出队计数为 5,它也在毒物队列中,然后是相同的过程,CopyMessageToPoisonQueueAsync, DeleteMessageAsync, posionqueue 中的一个额外项目与已经复制的项目完全相同,10 分钟后相同的过程,但出队计数为 6。我应该更改 ExpirationTime 并设置删除时到现在,还是我错过了其他东西?

这是我的代码:

class Program
{
    static void Main()
    {

        var config = new JobHostConfiguration();
        config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
        config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();

        var host = new JobHost(config);
        host.RunAndBlock();
    }
}

public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
    public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
    public QueueProcessor Create(QueueProcessorFactoryContext context)
    {
        CustomQueueProcessor processor = new CustomQueueProcessor(context);
        CustomQueueProcessors.Add(processor);
        return processor;
    }

    public class CustomQueueProcessor : QueueProcessor
    {
        public CustomQueueProcessor(QueueProcessorFactoryContext context)
            : base(context)
        {
        }
        public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
        {
            return base.BeginProcessingMessageAsync(message, cancellationToken);
        }
        public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
        {
            return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
        }

        protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
        {
            return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
        }

        protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
        {
            return base.DeleteMessageAsync(message, cancellationToken);
        }
        protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
        {
            visibilityTimeout = TimeSpan.FromSeconds(2);
            await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
        }
    }
}

如果我添加一些 ConsoleWritelines,我会得到这个输出:

开头因重复而省略 . . .

BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出列计数:5 日期:2017-06-26 13:33:42 正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=17405a55-6d28-48b2-a874-718c0b741f61) 测试队列处理器工厂 执行函数时出现异常:Functions.ProcessQueueMessage Microsoft.Azure.WebJobs.Host.FunctionInvocationException: 执行函数时出现异常: Functions.ProcessQueueMessage ---> System.Exception: Derp! 在 WebJobTest1.Functions.ProcessQueueMessage(字符串消息,TextWriter 日志)

...消息省略....

CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5 消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:da643007-954a-4296-9e9a-54ebb0aec6c5 出队计数:5 十分钟等待时间: BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 日期:2017-06-26 13:43:46 正在执行 'Functions.ProcessQueueMessage'(原因='在 '01testqueue' 上检测到新队列消息。',Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345) 测试队列处理器工厂 执行函数时出现异常:Functions.ProcessQueueMessage CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 消息已达到 MaxDequeueCount 5。将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:c43fd9fb-c2d7-4745-91ae-33cdc407ede6 出队计数:6

根据您的描述,我认为这是由于将 Storage SDK 8.x 与 WebJobs SDK 一起使用时的已知问题所致。以下是类似的问题:

根据我的测试,这个问题暂时还没有解决。您可以降级存储 SDK 或更改 CopyMessageToPoisonQueueAsync 如下:

protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
    var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
    newMessage.SetMessageContent(message.AsBytes);
    return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}