Azure 服务总线 - 将参数传递给消息处理程序
Azure Service bus - pass parameter to message handler
在 Azure 服务总线中,我需要同时侦听来自不同服务总线的多个订阅的消息。
为此,我创建了一个列表,其中包含带有连接字符串、主题、订阅名称和一些其他信息的对象(该列表称为 'jobs')。
对于此列表中的每一项,我将创建一个不同的任务来创建 ServiceBusClient 和处理器。
var jobs = GetAllServiceBusTopics();
Parallel.ForEach(jobs, async job =>
{
var client = new ServiceBusClient(job.Environment.ServiceBusConnectionString);
var options = new ServiceBusProcessorOptions();
var processor = client.CreateProcessor(job.Environment.TopicName, _subscriptionName, new ServiceBusProcessorOptions());
try
{
processor.ProcessMessageAsync += MessageHandler;
//Pass the job object somehow to the "MessageHandler" below.
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
finally
{
await processor.DisposeAsync();
await client.DisposeAsync();
}
});
新消息到达时调用的处理程序:
static async Task MessageHandler(ProcessMessageEventArgs args)
{
//I need the "job" object from my loop above here.
}
我在 Microsoft this website 上学到的这个概念的一般工作原理。
我的第一个问题:
- 这个方法好吗,还是我运行走错方向了?我可以这样做吗?
不过就算这样也行,我还有更重要的任务:
我需要以某种方式将循环中的“作业”对象作为参数传递给消息处理程序。
但我目前不知道如何存档。有什么建议吗?
Is this approach okay, or am I running in the wrong direction? Can I do it like this?
是的,你可以做到。要记住的一件事是,您实例化了多个 ServiceBusClient
实例,每个实例都会导致建立一个新连接,而不是使用相同的连接。我不知道主题(工作)的数量可能有多少,但如果它很大,你最终会遇到连接不足的情况。
I need to pass the "job" object from my loop somehow to the message handler - as a parameter. But I have currently no idea how to archvie this. Any proposals on this?
ServiceBusProcessor
不是这样设计的。除了需要处理的传入消息外,它不会收到任何其他消息。如果您需要工作 ID,那应该是消息的一部分 payload/metadata。如果您需要知道它来自哪个实体,您可以添加订阅过滤器操作以添加带有标识符的自定义 header。另一种方法需要包装 ServiceBusProcessor
以保留作业 ID/subscription 标识符并在事件处理程序中使用它。
在 Azure 服务总线中,我需要同时侦听来自不同服务总线的多个订阅的消息。
为此,我创建了一个列表,其中包含带有连接字符串、主题、订阅名称和一些其他信息的对象(该列表称为 'jobs')。
对于此列表中的每一项,我将创建一个不同的任务来创建 ServiceBusClient 和处理器。
var jobs = GetAllServiceBusTopics();
Parallel.ForEach(jobs, async job =>
{
var client = new ServiceBusClient(job.Environment.ServiceBusConnectionString);
var options = new ServiceBusProcessorOptions();
var processor = client.CreateProcessor(job.Environment.TopicName, _subscriptionName, new ServiceBusProcessorOptions());
try
{
processor.ProcessMessageAsync += MessageHandler;
//Pass the job object somehow to the "MessageHandler" below.
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
finally
{
await processor.DisposeAsync();
await client.DisposeAsync();
}
});
新消息到达时调用的处理程序:
static async Task MessageHandler(ProcessMessageEventArgs args)
{
//I need the "job" object from my loop above here.
}
我在 Microsoft this website 上学到的这个概念的一般工作原理。
我的第一个问题:
- 这个方法好吗,还是我运行走错方向了?我可以这样做吗?
不过就算这样也行,我还有更重要的任务:
我需要以某种方式将循环中的“作业”对象作为参数传递给消息处理程序。
但我目前不知道如何存档。有什么建议吗?
Is this approach okay, or am I running in the wrong direction? Can I do it like this?
是的,你可以做到。要记住的一件事是,您实例化了多个 ServiceBusClient
实例,每个实例都会导致建立一个新连接,而不是使用相同的连接。我不知道主题(工作)的数量可能有多少,但如果它很大,你最终会遇到连接不足的情况。
I need to pass the "job" object from my loop somehow to the message handler - as a parameter. But I have currently no idea how to archvie this. Any proposals on this?
ServiceBusProcessor
不是这样设计的。除了需要处理的传入消息外,它不会收到任何其他消息。如果您需要工作 ID,那应该是消息的一部分 payload/metadata。如果您需要知道它来自哪个实体,您可以添加订阅过滤器操作以添加带有标识符的自定义 header。另一种方法需要包装 ServiceBusProcessor
以保留作业 ID/subscription 标识符并在事件处理程序中使用它。