如何配置 Rebus 以根据处理程序的类型设置主题
How to configure Rebus to have topics based on handlers' type
我正在使用 Rebus,我想在“避免多次处理事件”段落中介绍 CQRS Journey 中描述的内容,但我无法弄清楚.
我将 Rebus 配置为使用 SQL 服务器进行 传输 并使用 MongoDB 进行 订阅 和 传奇。 Routing 配置为 TypeBased 并将所有命令处理程序的类型映射到 Transport 中配置的队列。
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.Logging(l => l.Trace())
.Transport(t =>
{
t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
})
.Routing(r => r.TypeBased()
.MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
.MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
)
.Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
{
return sagaType.Name;
}))
.Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(1);
o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
})
.Start();
现在我应该以这样的方式配置 Rebus,当命令 Publish 一个事件时,它会被复制到与现有订阅者类型一样多的单独主题(虚拟或物理队列)中.
类似于:
bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();
感谢您的帮助。
有几件事似乎与您的问题混淆。
但我想您的基本问题是如何确保每条消息仅由每个订阅者处理一次。
答案很简单:每个订阅者都有一个单独的端点 - 这意味着每个订阅者都有自己的输入队列,从中处理消息,失败的消息将返回到该队列。
然后您可以根据需要在每个订阅者中拥有任意数量的处理程序。将为每个传入消息执行所有兼容的处理程序。
使用 Rebus,每次调用 Configure.With(...).(...).Start()
都会给你一个单独的端点 - 所以在你的情况下,我建议你将订阅者端点创建包装在一个方法中,然后你可以像这样调用它:
var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();
var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();
var event3Subscriber = CreateSubscriber("subscriber_event3");
event3Subscriber.Subscribe<Event3>().Wait();
// ...
其中 CreateSubscriber
将是这样的:
public IBus CreateSubscriber(string queueName)
{
return Configure.With(GetContainerAdapter())
.Transport(t => t.UseMsmq(queueName))
.Start();
}
我正在使用 Rebus,我想在“避免多次处理事件”段落中介绍 CQRS Journey 中描述的内容,但我无法弄清楚.
我将 Rebus 配置为使用 SQL 服务器进行 传输 并使用 MongoDB 进行 订阅 和 传奇。 Routing 配置为 TypeBased 并将所有命令处理程序的类型映射到 Transport 中配置的队列。
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.Logging(l => l.Trace())
.Transport(t =>
{
t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
})
.Routing(r => r.TypeBased()
.MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
.MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
)
.Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
{
return sagaType.Name;
}))
.Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(1);
o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
})
.Start();
现在我应该以这样的方式配置 Rebus,当命令 Publish 一个事件时,它会被复制到与现有订阅者类型一样多的单独主题(虚拟或物理队列)中.
类似于:
bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();
感谢您的帮助。
有几件事似乎与您的问题混淆。
但我想您的基本问题是如何确保每条消息仅由每个订阅者处理一次。
答案很简单:每个订阅者都有一个单独的端点 - 这意味着每个订阅者都有自己的输入队列,从中处理消息,失败的消息将返回到该队列。
然后您可以根据需要在每个订阅者中拥有任意数量的处理程序。将为每个传入消息执行所有兼容的处理程序。
使用 Rebus,每次调用 Configure.With(...).(...).Start()
都会给你一个单独的端点 - 所以在你的情况下,我建议你将订阅者端点创建包装在一个方法中,然后你可以像这样调用它:
var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();
var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();
var event3Subscriber = CreateSubscriber("subscriber_event3");
event3Subscriber.Subscribe<Event3>().Wait();
// ...
其中 CreateSubscriber
将是这样的:
public IBus CreateSubscriber(string queueName)
{
return Configure.With(GetContainerAdapter())
.Transport(t => t.UseMsmq(queueName))
.Start();
}