在队列的两次消息读取之间创建延迟?
Create delay between two message reads of a Queue?
我正在使用 Azure 队列执行批量导入。
我正在使用 WebJobs
在后台执行该过程。
队列非常频繁地出队。如何在 2 条消息之间创建延迟
读?
这就是我向队列中添加消息的方式
public async Task<bool> Handle(CreateFileUploadCommand message)
{
var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);
var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
{
TenantId = message.TenantId,
FileExtension = message.FileExtension,
FileName = message.Name,
DeviceId = message.DeviceId,
SessionId = message.SessionId,
UserId = message.UserId,
OutletId = message.OutletId,
CorrelationId = message.CorrelationId,
}))
{
ContentType = "application/json",
};
await queueClient.SendAsync(brokeredMessage);
return true;
}
下面是 WebJobs
函数。
public class Functions
{
private readonly IValueProvider _valueProvider;
public Functions(IValueProvider valueProvider)
{
_valueProvider = valueProvider;
}
public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
TextWriter logger)
{
var queueMessage = message.GetBody<string>();
using (var client = new HttpClient())
{
client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));
var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");
var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);
if (result.IsSuccessStatusCode)
{
await message.CompleteAsync();
}
else
{
await message.AbandonAsync();
}
}
}
}
据我所知,azure webjobs sdk 在单个实例上启用并发处理(默认为 16)。
如果你 运行 你的 webjobs,它将读取 16 个队列消息(peeklock 并在函数成功完成时对消息调用 Complete,或者调用 Abandon)并同时创建 16 个进程来执行触发函数时间。所以你感觉队列出队很频繁
如果要在单个实例上禁用并发处理。
我建议您可以将 ServiceBusConfiguration 的 MessageOptions.MaxConcurrentCalls 设置为 1。
更多详情,您可以参考以下代码:
在program.cs中:
JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);
JobHost host = new JobHost(config);
host.RunAndBlock();
如果您想在两次消息读取之间创建一个延迟,我建议您可以创建一个自定义 ServiceBusConfiguration.MessagingProvider。
它包含 CompleteProcessingMessageAsync 方法,该方法在调用作业函数后完成对指定消息的处理。
我建议你可以在CompleteProcessingMessageAsync中添加thread.sleep方法来实现延迟读取。
更多细节,您可以参考下面的代码示例:
CustomMessagingProvider.cs:
注意:我覆盖了 CompleteProcessingMessageAsync 方法代码。
public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;
public CustomMessagingProvider(ServiceBusConfiguration config)
: base(config)
{
_config = config;
}
public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
{
// you could return your own NamespaceManager here, which would be used
// globally
return base.CreateNamespaceManager(connectionStringName);
}
public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
{
// you could return a customized (or new) MessagingFactory here per entity
return base.CreateMessagingFactory(entityPath, connectionStringName);
}
public override MessageProcessor CreateMessageProcessor(string entityPath)
{
// demonstrates how to plug in a custom MessageProcessor
// you could use the global MessageOptions, or use different
// options per entity
return new CustomMessageProcessor(_config.MessageOptions);
}
private class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions)
: base(messageOptions)
{
}
public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
{
// intercept messages before the job function is invoked
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();
await message.CompleteAsync();
Console.WriteLine("Begin sleep");
//Sleep 5 seconds
Thread.Sleep(5000);
Console.WriteLine("Sleep 5 seconds");
}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}
Program.cs主要方法:
static void Main()
{
var config = new JobHostConfiguration();
if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}
var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 1
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
结果:
我正在使用 Azure 队列执行批量导入。
我正在使用 WebJobs
在后台执行该过程。
队列非常频繁地出队。如何在 2 条消息之间创建延迟
读?
这就是我向队列中添加消息的方式
public async Task<bool> Handle(CreateFileUploadCommand message)
{
var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);
var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
{
TenantId = message.TenantId,
FileExtension = message.FileExtension,
FileName = message.Name,
DeviceId = message.DeviceId,
SessionId = message.SessionId,
UserId = message.UserId,
OutletId = message.OutletId,
CorrelationId = message.CorrelationId,
}))
{
ContentType = "application/json",
};
await queueClient.SendAsync(brokeredMessage);
return true;
}
下面是 WebJobs
函数。
public class Functions
{
private readonly IValueProvider _valueProvider;
public Functions(IValueProvider valueProvider)
{
_valueProvider = valueProvider;
}
public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
TextWriter logger)
{
var queueMessage = message.GetBody<string>();
using (var client = new HttpClient())
{
client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));
var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");
var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);
if (result.IsSuccessStatusCode)
{
await message.CompleteAsync();
}
else
{
await message.AbandonAsync();
}
}
}
}
据我所知,azure webjobs sdk 在单个实例上启用并发处理(默认为 16)。
如果你 运行 你的 webjobs,它将读取 16 个队列消息(peeklock 并在函数成功完成时对消息调用 Complete,或者调用 Abandon)并同时创建 16 个进程来执行触发函数时间。所以你感觉队列出队很频繁
如果要在单个实例上禁用并发处理。
我建议您可以将 ServiceBusConfiguration 的 MessageOptions.MaxConcurrentCalls 设置为 1。
更多详情,您可以参考以下代码:
在program.cs中:
JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);
JobHost host = new JobHost(config);
host.RunAndBlock();
如果您想在两次消息读取之间创建一个延迟,我建议您可以创建一个自定义 ServiceBusConfiguration.MessagingProvider。
它包含 CompleteProcessingMessageAsync 方法,该方法在调用作业函数后完成对指定消息的处理。
我建议你可以在CompleteProcessingMessageAsync中添加thread.sleep方法来实现延迟读取。
更多细节,您可以参考下面的代码示例:
CustomMessagingProvider.cs:
注意:我覆盖了 CompleteProcessingMessageAsync 方法代码。
public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;
public CustomMessagingProvider(ServiceBusConfiguration config)
: base(config)
{
_config = config;
}
public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
{
// you could return your own NamespaceManager here, which would be used
// globally
return base.CreateNamespaceManager(connectionStringName);
}
public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
{
// you could return a customized (or new) MessagingFactory here per entity
return base.CreateMessagingFactory(entityPath, connectionStringName);
}
public override MessageProcessor CreateMessageProcessor(string entityPath)
{
// demonstrates how to plug in a custom MessageProcessor
// you could use the global MessageOptions, or use different
// options per entity
return new CustomMessageProcessor(_config.MessageOptions);
}
private class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions)
: base(messageOptions)
{
}
public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
{
// intercept messages before the job function is invoked
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();
await message.CompleteAsync();
Console.WriteLine("Begin sleep");
//Sleep 5 seconds
Thread.Sleep(5000);
Console.WriteLine("Sleep 5 seconds");
}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}
Program.cs主要方法:
static void Main()
{
var config = new JobHostConfiguration();
if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}
var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 1
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
结果: