如何配置 Rebus 以根据处理程序的类型设置主题

How to configure Rebus to have topics based on handlers' type

我正在使用 Rebus,我想在“避免多次处理事件”段落中介绍 CQRS Journey 中描述的内容,但我无法弄清楚.

我将 Rebus 配置为使用 SQL 服务器进行 传输 并使用 MongoDB 进行 订阅 传奇Routing 配置为 TypeBased 并将所有命令处理程序的类型映射到 Transport 中配置的队列。

 var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            .Logging(l => l.Trace())
            .Transport(t =>
            {
                t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
            })
            .Routing(r => r.TypeBased()
                            .MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
                            .MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
                             )
            .Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
            {
                return sagaType.Name;
            }))
            .Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
            .Options(o =>
            {
                o.SetNumberOfWorkers(1);
                o.SetMaxParallelism(1);
                o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
            })
            .Start();

现在我应该以这样的方式配置 Rebus,当命令 Publish 一个事件时,它会被复制到与现有订阅者类型一样多的单独主题(虚拟或物理队列)中.

类似于:

bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();

感谢您的帮助。

有几件事似乎与您的问题混淆。

但我想您的基本问题是如何确保每条消息仅由每个订阅者处理一次。

答案很简单:每个订阅者都有一个单独的端点 - 这意味着每个订阅者都有自己的输入队列,从中处理消息,失败的消息将返回到该队列。

然后您可以根据需要在每个订阅者中拥有任意数量的处理程序。将为每个传入消息执行所有兼容的处理程序。

使用 Rebus,每次调用 Configure.With(...).(...).Start() 都会给你一个单独的端点 - 所以在你的情况下,我建议你将订阅者端点创建包装在一个方法中,然后你可以像这样调用它:

var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();

var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();

var event3Subscriber = CreateSubscriber("subscriber_event3");    
event3Subscriber.Subscribe<Event3>().Wait();

// ...

其中 CreateSubscriber 将是这样的:

public IBus CreateSubscriber(string queueName)
{
    return Configure.With(GetContainerAdapter())
        .Transport(t => t.UseMsmq(queueName))
        .Start();        
}