如何使用一名工作人员从多个队列中读取 Azure 服务总线消息
How to read Azure Service Bus messages from Multiple Queues with one worker
我有三个队列和一个工作人员,我想监控这三个队列(或只有其中两个)
一个队列是qPirate
一个队列是 qShips
一个队列是 qPassengers
我们的想法是,工作人员要么查看全部 3 个,要么查看其中 2 个,要么查看其中一个,并根据消息内容做不同的事情。
虽然关键是说消息失败是因为 ship1 离线,qships 中的所有队列都将刷新,正在查看该消息的工作人员和其他队列将稍微挂起,因为他们将尝试处理该队列的消息,同时只查看其他队列一点点,而正在查看其他 2 个队列并跳过 qships 的其他工作人员将继续处理消息,而不会出现滞留或延迟。
public static void GotMessage([ServiceBusTrigger("%LookAtAllQueuesintheservicebus%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"PirateShips"
);
manager.ProcessMessageViaHandler(message);
}
在网上四处看看,我猜这不可能,但似乎是这样?提前致谢!
Edit1: 我也会添加作业主机以尝试澄清一些事情
JobHostConfiguration config = new JobHostConfiguration()
{
DashboardConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
StorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
NameResolver = new QueueNameResolver()
};
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration()
{
ConnectionString = "Endpoint=AllPirateQueuesLocatedHere;SharedAccessKeyName=PiratesAreUs;SharedAccessKey=Yarr"
};
serviceBusConfig.MessageOptions.AutoComplete = false;
serviceBusConfig.MessageOptions.AutoRenewTimeout = TimeSpan.FromMinutes(1);
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);
JobHost host = new JobHost(config);
host.RunAndBlock();
QueueNameResolverClass 也很简单
public class QueueNameResolver : INameResolver
{
public string Resolve(string name)
{
return name;
}
}
无论如何,我似乎没有办法让 NameResolver 成为多个队列,虽然我可以说我希望 jobhost 查看某个 ServiceBus,但我根本不知道如何告诉它查看ServiceBus 中的队列。
换句话说,我希望这个工作线程有多个 servicebustriggers,这样如果一条消息被发送到 qpirate1 和 qships1,它们都位于服务总线 AllPirateQueuesHere 中,工作线程可以在 qpirate1 中获取消息,处理它,然后拿起qships1中的消息并处理它。
找出答案...这是可能的,而且比我想象的要简单 我不确定为什么我没有把这些点联系起来,但我仍然很好奇为什么没有更多关于这个的文档。显然,它只是为每个队列创建一个函数,您希望工作人员查看多个队列。因此,如果你有三个队列,你会想要类似下面的东西(你可以不同地处理每条消息)。
public static void GotMessage1([ServiceBusTrigger("%qPirate1%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Pirates"
);
manager.ProcessMessageViaHandler(message);
}
public static void GotMessage2([ServiceBusTrigger("%qShip1%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Ships"
);
manager.ProcessMessageViaHandler(message);
}
public static void GotBooty([ServiceBusTrigger("%qBooty%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Booty"
);
manager.ProcessMessageViaHandler(message);
}
我有三个队列和一个工作人员,我想监控这三个队列(或只有其中两个)
一个队列是qPirate 一个队列是 qShips 一个队列是 qPassengers
我们的想法是,工作人员要么查看全部 3 个,要么查看其中 2 个,要么查看其中一个,并根据消息内容做不同的事情。
虽然关键是说消息失败是因为 ship1 离线,qships 中的所有队列都将刷新,正在查看该消息的工作人员和其他队列将稍微挂起,因为他们将尝试处理该队列的消息,同时只查看其他队列一点点,而正在查看其他 2 个队列并跳过 qships 的其他工作人员将继续处理消息,而不会出现滞留或延迟。
public static void GotMessage([ServiceBusTrigger("%LookAtAllQueuesintheservicebus%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"PirateShips"
);
manager.ProcessMessageViaHandler(message);
}
在网上四处看看,我猜这不可能,但似乎是这样?提前致谢!
Edit1: 我也会添加作业主机以尝试澄清一些事情
JobHostConfiguration config = new JobHostConfiguration()
{
DashboardConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
StorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
NameResolver = new QueueNameResolver()
};
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration()
{
ConnectionString = "Endpoint=AllPirateQueuesLocatedHere;SharedAccessKeyName=PiratesAreUs;SharedAccessKey=Yarr"
};
serviceBusConfig.MessageOptions.AutoComplete = false;
serviceBusConfig.MessageOptions.AutoRenewTimeout = TimeSpan.FromMinutes(1);
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);
JobHost host = new JobHost(config);
host.RunAndBlock();
QueueNameResolverClass 也很简单
public class QueueNameResolver : INameResolver
{
public string Resolve(string name)
{
return name;
}
}
无论如何,我似乎没有办法让 NameResolver 成为多个队列,虽然我可以说我希望 jobhost 查看某个 ServiceBus,但我根本不知道如何告诉它查看ServiceBus 中的队列。
换句话说,我希望这个工作线程有多个 servicebustriggers,这样如果一条消息被发送到 qpirate1 和 qships1,它们都位于服务总线 AllPirateQueuesHere 中,工作线程可以在 qpirate1 中获取消息,处理它,然后拿起qships1中的消息并处理它。
找出答案...这是可能的,而且比我想象的要简单 我不确定为什么我没有把这些点联系起来,但我仍然很好奇为什么没有更多关于这个的文档。显然,它只是为每个队列创建一个函数,您希望工作人员查看多个队列。因此,如果你有三个队列,你会想要类似下面的东西(你可以不同地处理每条消息)。
public static void GotMessage1([ServiceBusTrigger("%qPirate1%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Pirates"
);
manager.ProcessMessageViaHandler(message);
}
public static void GotMessage2([ServiceBusTrigger("%qShip1%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Ships"
);
manager.ProcessMessageViaHandler(message);
}
public static void GotBooty([ServiceBusTrigger("%qBooty%")] BrokeredMessage message)
{
var handler = new MessageHandler();
var manager = new MessageManager(
handler,
"Booty"
);
manager.ProcessMessageViaHandler(message);
}