使用 Azure 服务总线在 Rebus 中处理 Publish/Subscribe

Handle Publish/Subscribe in Rebus with Azure Service Bus

我正在研究 Rebus 并将其与 Azure 服务总线一起使用。将它与 regalure Queues 一起使用很容易,但是当我想改用 Topic 时,我无法让它工作。

这里有没有已经完成设置并与 Topic/Subscription 一起使用的人?这是我目前所拥有的。

        static void Main(string[] args)
    {
        _bus1 = InitializeBus(System.Environment.MachineName);
        _bus2 = InitializeBus(System.Environment.MachineName + "_2");
        _bus3 = InitializeBus();

        Run();
        Console.WriteLine("Press Enter to exit!");
        Console.ReadLine();
    }

    private static void Run()
    {
        try
        {
            _bus1.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
            _bus2.Handle<string>((b, c, m) => { Console.WriteLine(m); return null; });
            _bus1.Bus.Subscribe<string>();
            _bus2.Bus.Subscribe<string>();
            _bus3.Bus.Publish("Publish test message");
        }
        catch (Exception ex)
        {
            throw;
        }
    }

    private static BuiltinHandlerActivator InitializeBus(string queueName = null)
    {
        var activator = new BuiltinHandlerActivator();

        if(string.IsNullOrEmpty(queueName))
            Configure.With(activator)
                .Transport(t => t.UseAzureServiceBusAsOneWayClient(connectionString))
                .Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
                .Start();
        else
            Configure.With(activator)
                .Transport(t => t.UseAzureServiceBus(connectionString, queueName).EnablePartitioning().DoNotCreateQueues())
                .Options(o => { o.SetNumberOfWorkers(10); o.SetMaxParallelism(10); })
                .Start();

        return activator;
    }

首先我创建了所有的总线。我正在使用 DontCreateQueues(),因为我不希望在我的根目录中重复创建队列,而只是在作为订阅的主题下创建。 然后我设置了总线并且发布工作正常,创建了一个主题并在该主题下创建了 2 个订阅,并且每个订阅中都有 1 条消息。但是消息永远不会被收集。

如果我删除配置中的 DontCreateQueues() 方法,代码可以工作,但随后会在根目录中创建 2 个队列和主题,这是 2 个订阅,但我不能那样做。

最好的问候 马格努斯

Rebus 通过为您订阅的每个主题创建订阅来使用主题,然后配置订阅以将收到的消息转发到总线的输入队列。

如果总线没有具有预期名称的输入队列(Rebus 创建的或您手动创建的),事情将无法进行。

存在 DontCreateQueues() 的原因是允许专家用户配置他们的队列设置,超出 Rebus 能够(和愿意)做的事情。不过,这需要非常详细地了解 Rebus 如何期望您的队列实体进行布局,所以我建议几乎所有人都不要手动创建任何东西,然后让 Rebus 简单地进行设置。