在队列的两次消息读取之间创建延迟?

Create delay between two message reads of a Queue?

我正在使用 Azure 队列执行批量导入。 我正在使用 WebJobs 在后台执行该过程。 队列非常频繁地出队。如何在 2 条消息之间创建延迟 读?

这就是我向队列中添加消息的方式

public async Task<bool> Handle(CreateFileUploadCommand message)
{
    var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);

    var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
    {
        TenantId = message.TenantId,
        FileExtension = message.FileExtension,
        FileName = message.Name,
        DeviceId = message.DeviceId,
        SessionId = message.SessionId,
        UserId = message.UserId,
        OutletId = message.OutletId,
        CorrelationId = message.CorrelationId,

    }))
    {
        ContentType = "application/json",
    };

    await queueClient.SendAsync(brokeredMessage);

    return true;
}

下面是 WebJobs 函数。

public class Functions
{
    private readonly IValueProvider _valueProvider;
    public Functions(IValueProvider valueProvider)
    {
        _valueProvider = valueProvider;
    }

    public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
    TextWriter logger)
    {

        var queueMessage = message.GetBody<string>();

        using (var client = new HttpClient())
        {
            client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));

            var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");

            var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);

            if (result.IsSuccessStatusCode)
            {
                await message.CompleteAsync();
            }
            else
            {
                await message.AbandonAsync();
            }
        }
    }
}

据我所知,azure webjobs sdk 在单个实例上启用并发处理(默认为 16)。

如果你 运行 你的 webjobs,它将读取 16 个队列消息(peeklock 并在函数成功完成时对消息调用 Complete,或者调用 Abandon)并同时创建 16 个进程来执行触发函数时间。所以你感觉队列出队很频繁

如果要在单个实例上禁用并发处理。

我建议您可以将 ServiceBusConfiguration 的 MessageOptions.MaxConcurrentCalls 设置为 1。

更多详情,您可以参考以下代码:

在program.cs中:

JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);

JobHost host = new JobHost(config);
host.RunAndBlock();

如果您想在两次消息读取之间创建一个延迟,我建议您可以创建一个自定义 ServiceBusConfiguration.MessagingProvider。

它包含 CompleteProcessingMessageAsync 方法,该方法在调用作业函数后完成对指定消息的处理。

我建议你可以在CompleteProcessingMessageAsync中添加thread.sleep方法来实现延迟读取。

更多细节,您可以参考下面的代码示例:

CustomMessagingProvider.cs:

注意:我覆盖了 CompleteProcessingMessageAsync 方法代码。

 public class CustomMessagingProvider : MessagingProvider
    {
        private readonly ServiceBusConfiguration _config;

        public CustomMessagingProvider(ServiceBusConfiguration config)
            : base(config)
        {
            _config = config;
        }

        public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
        {
            // you could return your own NamespaceManager here, which would be used
            // globally
            return base.CreateNamespaceManager(connectionStringName);
        }

        public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
        {
            // you could return a customized (or new) MessagingFactory here per entity
            return base.CreateMessagingFactory(entityPath, connectionStringName);
        }

        public override MessageProcessor CreateMessageProcessor(string entityPath)
        {
            // demonstrates how to plug in a custom MessageProcessor
            // you could use the global MessageOptions, or use different
            // options per entity
            return new CustomMessageProcessor(_config.MessageOptions);
        }

        private class CustomMessageProcessor : MessageProcessor
        {
            public CustomMessageProcessor(OnMessageOptions messageOptions)
                : base(messageOptions)
            {
            }

            public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
            {
                // intercept messages before the job function is invoked
                return base.BeginProcessingMessageAsync(message, cancellationToken);
            }

            public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
            {
                if (result.Succeeded)
                {
                    if (!MessageOptions.AutoComplete)
                    {
                        // AutoComplete is true by default, but if set to false
                        // we need to complete the message
                        cancellationToken.ThrowIfCancellationRequested();


                        await message.CompleteAsync();

                        Console.WriteLine("Begin sleep");
                        //Sleep 5 seconds
                        Thread.Sleep(5000);
                        Console.WriteLine("Sleep 5 seconds");

                    }
                }
                else
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    await message.AbandonAsync();
                }
            }
        }
    }

Program.cs主要方法:

 static void Main()
        {
            var config = new JobHostConfiguration();

            if (config.IsDevelopment)
            {
                config.UseDevelopmentSettings();
            }

            var sbConfig = new ServiceBusConfiguration
            {
                MessageOptions = new OnMessageOptions
                {
                    AutoComplete = false,
                    MaxConcurrentCalls = 1
                }
            };
            sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
            config.UseServiceBus(sbConfig);
            var host = new JobHost(config);

            // The following code ensures that the WebJob will be running continuously
            host.RunAndBlock();
        }

结果: