如何使用一名工作人员从多个队列中读取 Azure 服务总线消息

How to read Azure Service Bus messages from Multiple Queues with one worker

我有三个队列和一个工作人员,我想监控这三个队列(或只有其中两个)

一个队列是qPirate 一个队列是 qShips 一个队列是 qPassengers

我们的想法是,工作人员要么查看全部 3 个,要么查看其中 2 个,要么查看其中一个,并根据消息内容做不同的事情。

虽然关键是说消息失败是因为 ship1 离线,qships 中的所有队列都将刷新,正在查看该消息的工作人员和其他队列将稍微挂起,因为他们将尝试处理该队列的消息,同时只查看其他队列一点点,而正在查看其他 2 个队列并跳过 qships 的其他工作人员将继续处理消息,而不会出现滞留或延迟。

    public static void GotMessage([ServiceBusTrigger("%LookAtAllQueuesintheservicebus%")] BrokeredMessage message)
    {
        var handler = new MessageHandler();

        var manager = new MessageManager(
            handler,
            "PirateShips"
            );

        manager.ProcessMessageViaHandler(message);
    }

在网上四处看看,我猜这不可能,但似乎是这样?提前致谢!

Edit1: 我也会添加作业主机以尝试澄清一些事情

        JobHostConfiguration config = new JobHostConfiguration()
        {
            DashboardConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
            StorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=PiratesAreUs;AccountKey=Yarr",
            NameResolver = new QueueNameResolver()
        };

        ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration()
        {
            ConnectionString = "Endpoint=AllPirateQueuesLocatedHere;SharedAccessKeyName=PiratesAreUs;SharedAccessKey=Yarr"               
        };

        serviceBusConfig.MessageOptions.AutoComplete = false;
        serviceBusConfig.MessageOptions.AutoRenewTimeout = TimeSpan.FromMinutes(1);
        serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;            

        config.UseServiceBus(serviceBusConfig);


        JobHost host = new JobHost(config);

        host.RunAndBlock();

QueueNameResolverClass 也很简单

    public class QueueNameResolver : INameResolver
    {
        public string Resolve(string name)
        {
            return name;
        }
    }

无论如何,我似乎没有办法让 NameResolver 成为多个队列,虽然我可以说我希望 jobhost 查看某个 ServiceBus,但我根本不知道如何告诉它查看ServiceBus 中的队列。

换句话说,我希望这个工作线程有多个 servicebustriggers,这样如果一条消息被发送到 qpirate1 和 qships1,它们都位于服务总线 AllPirateQueuesHere 中,工作线程可以在 qpirate1 中获取消息,处理它,然后拿起qships1中的消息并处理它。

找出答案...这是可能的,而且比我想象的要简单 我不确定为什么我没有把这些点联系起来,但我仍然很好奇为什么没有更多关于这个的文档。显然,它只是为每个队列创建一个函数,您希望工作人员查看多个队列。因此,如果你有三个队列,你会想要类似下面的东西(你可以不同地处理每条消息)。

  public static void GotMessage1([ServiceBusTrigger("%qPirate1%")] BrokeredMessage message)
{
    var handler = new MessageHandler();

    var manager = new MessageManager(
        handler,
        "Pirates"
        );

    manager.ProcessMessageViaHandler(message);
}

  public static void GotMessage2([ServiceBusTrigger("%qShip1%")] BrokeredMessage message)
{
    var handler = new MessageHandler();

    var manager = new MessageManager(
        handler,
        "Ships"
        );

    manager.ProcessMessageViaHandler(message);
}

  public static void GotBooty([ServiceBusTrigger("%qBooty%")] BrokeredMessage message)
{
    var handler = new MessageHandler();

    var manager = new MessageManager(
        handler,
        "Booty"
        );

    manager.ProcessMessageViaHandler(message);
}