Azure WebJobs:具有不同批量大小的队列触发器
Azure WebJobs: Queue triggers with different batch sizes
我在 azure 上有一个同时处理来自多个队列的消息的 WebJob:
public async static Task ProcessQueueMessage1([QueueTrigger("queue1")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tze":
await Parser.Process1(message);
break;
default:
break;
}
}
public async static Task ProcessQueueMessage2([QueueTrigger("queue2")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tzr":
await Parser.Process2(message);
break;
default:
break;
}
}
并且在 MAIN
static void Main()
{
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 3;
config.Queues.MaxDequeueCount = 1;
var host = new JobHost(config);
host.RunAndBlock();
}
此处:message.Substring(message.Length - 3, 3)
只是检查文件的扩展名。
我的问题是,我将如何继续使 queue1 的批处理大小与 queue2 不同,我可以使用不同的配置创建第二个 jobhost 并具有 host.RunAndBlock()
和 host2.RunAndBlock()
吗?我将如何指定 jobhost 应该做什么队列?
我也尝试过 Groupqueuetriggers,但不幸的是它们采用字符串,在我的情况下我实际上无法将列表传递到队列。 :(
为了解决这个问题,您需要提供 IQueueProcessorFactory 的自定义实现。您只需要一个 JobHost。
有一个关于如何执行此操作的示例here.
static void Main()
{
//Configure JobHost
var config = new JobHostConfiguration();
config.Queues.BatchSize = 32;
config.Queues.MaxDequeueCount = 6;
// Create a custom configuration
// If you're using DI you should get this from the kernel
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
//Pass configuration to JobJost
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
并且在 CustomQueueProcessorFactory 中,您可以根据队列名称插入自定义配置。
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
if (context.Queue.Name == "queuename1")
{
context.MaxDequeueCount = 10;
}
else if (context.Queue.Name == "queuename2")
{
context.MaxDequeueCount = 10;
context.BatchSize = 1;
}
return new QueueProcessor(context);
}
}
我在 azure 上有一个同时处理来自多个队列的消息的 WebJob:
public async static Task ProcessQueueMessage1([QueueTrigger("queue1")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tze":
await Parser.Process1(message);
break;
default:
break;
}
}
public async static Task ProcessQueueMessage2([QueueTrigger("queue2")] string message)
{
switch (message.Substring(message.Length - 3, 3))
{
case "tzr":
await Parser.Process2(message);
break;
default:
break;
}
}
并且在 MAIN
static void Main()
{
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 3;
config.Queues.MaxDequeueCount = 1;
var host = new JobHost(config);
host.RunAndBlock();
}
此处:message.Substring(message.Length - 3, 3)
只是检查文件的扩展名。
我的问题是,我将如何继续使 queue1 的批处理大小与 queue2 不同,我可以使用不同的配置创建第二个 jobhost 并具有 host.RunAndBlock()
和 host2.RunAndBlock()
吗?我将如何指定 jobhost 应该做什么队列?
我也尝试过 Groupqueuetriggers,但不幸的是它们采用字符串,在我的情况下我实际上无法将列表传递到队列。 :(
为了解决这个问题,您需要提供 IQueueProcessorFactory 的自定义实现。您只需要一个 JobHost。
有一个关于如何执行此操作的示例here.
static void Main()
{
//Configure JobHost
var config = new JobHostConfiguration();
config.Queues.BatchSize = 32;
config.Queues.MaxDequeueCount = 6;
// Create a custom configuration
// If you're using DI you should get this from the kernel
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
//Pass configuration to JobJost
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
并且在 CustomQueueProcessorFactory 中,您可以根据队列名称插入自定义配置。
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
if (context.Queue.Name == "queuename1")
{
context.MaxDequeueCount = 10;
}
else if (context.Queue.Name == "queuename2")
{
context.MaxDequeueCount = 10;
context.BatchSize = 1;
}
return new QueueProcessor(context);
}
}