Rebus 基于内容的多个队列
Rebus multiple Queues based on content
设置:asp.net mvc 项目中的 Rebus 使用 SimpleInjector。
我需要创建两个处理程序来接收消息,每个处理程序都来自一个特定的队列。按照我在 上找到的内容,我创建了类似的代码。
在 class 库中,我有一个 class 实现了 SimpleInjector IPackage
,它有如下代码:
public void RegisterServices( Container container ) {
container.Register<IHandleMessages<MyMessage>, MyMessageHandler>( Lifestyle.Scoped );
IContainerAdapter adapter = new SimpleInjectorContainerAdapter( container );
Configure.With( adapter )
.Transport( t => t.UseAzureServiceBus( connectionString, "A_QUEUE_NAME", AzureServiceBusMode.Standard ) )
.Options( oc => {
oc.SetNumberOfWorkers( 1 );
oc.SimpleRetryStrategy( errorQueueAddress: "A_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
} )
.Start();
Configure.With(adapter
.Transport(t => t.UseAzureServiceBus(connectionString, "B_QUEUE_NAME")
.Options( oc => {
oc.SetNumberOfWorkers( 1 );
oc.SimpleRetryStrategy( errorQueueAddress: "B_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
} )
.Start();
}
然而,当调试器到达第二个 Configure.With( ... ) 调用时,我终止并显示一条错误消息:
Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.
堆栈跟踪:
[InvalidOperationException: Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.]
SimpleInjector.Internals.NonGenericRegistrationEntry.ThrowWhenTypeAlreadyRegistered(InstanceProducer producer) +102
SimpleInjector.Internals.NonGenericRegistrationEntry.Add(InstanceProducer producer) +59
SimpleInjector.Container.AddInstanceProducer(InstanceProducer producer) +105
SimpleInjector.Container.AddRegistrationInternal(Type serviceType, Registration registration) +69
SimpleInjector.Container.AddRegistration(Type serviceType, Registration registration) +131
SimpleInjector.Container.RegisterSingleton(TService instance) +183
Rebus.SimpleInjector.SimpleInjectorContainerAdapter.SetBus(IBus bus) +55
Rebus.Config.RebusConfigurer.Start() +2356
MyModule.RegisterServices(Container container) +497
SimpleInjector.PackageExtensions.RegisterPackages(Container container, IEnumerable`1 assemblies) +50
Myproject.SimpleInjectorInitializer.InitializeContainer(Container container) +35
Myproject.SimpleInjectorInitializer.Initialize() +68
Myproject.Startup.Configuration(IAppBuilder app) +28
编辑
然后我删除了第二个 Configure.With( ... )
代码块,现在当我执行 _bus.Send( message )
时,我在消费者进程中收到另一个错误
Unhandled exception 1 while handling message with ID fef3acca-97f4-4495-b09d-96e6c9f66c4d: SimpleInjector.ActivationException: No registration for type IEnumerable<IHandleMessages<MyMessage>> could be found. There is, however, a registration for IHandleMessages<MyMessage>; Did you mean to call GetInstance<IHandleMessages<MyMessage>>() or depend on IHandleMessages<MyMessage>? Or did you mean to register a collection of types using RegisterCollection?
堆栈跟踪:
2017-04-13 10:21:03,805 [77] WARN Rebus.Retry.ErrorTracking.InMemErrorTracker -
at SimpleInjector.Container.ThrowMissingInstanceProducerException(Type serviceType)
at SimpleInjector.Container.GetInstanceForRootType[TService]()
at SimpleInjector.Container.GetInstance[TService]()
at SimpleInjector.Container.GetAllInstances[TService]()
at Rebus.SimpleInjector.SimpleInjectorContainerAdapter.<GetHandlers>d__3`1.MoveNext()
异常状态:"Type IBus has already been registered"。根据您的堆栈跟踪,第二次添加 IBus 是在 SimpleInjectorContainerAdapter
内。您必须查明它是何时首次注册的。这很容易做到;刚刚注册了一个虚拟 IBus
作为创建 Container
后的第一次注册,并查看它爆炸的堆栈跟踪。
我通常建议每个容器实例只保留一个 IBus
,因为总线本身可以考虑 "an application",这恰好符合 IoC 容器是一个对象的事实可以 "host" 应用程序在其生命周期内。
Rebus 不提供 Conforming Container 抽象,因为我同意 Mark Seemann 的观点,那是一个注定要失败的项目。事实上,正如the wiki page mentions,Rebus曾经提供处理程序的自动注册,但后来证明这是有问题的。
相反,Rebus 鼓励您提供 "container adapter"(IContainerAdapter
的实现),其职责是:
- 查找处理程序
- 提供一种在正确的方式中注册
IBus
和 IMessageContext
的方法
为 Autofac、Castle Windsor、SimpleInjector 等提供开箱即用的容器适配器。但是,不需要提供容器适配器 – Configure.With(...)
咆哮很高兴只收到 "handler activator"(IHandlerActivator
的实现),因此如果您只想使用 IoC 容器查找处理程序并自行注册 IBus
,您也可以通过实现 IHandlerActivator
来实现并在您的容器中查找处理程序。
TL;DR:Rebus 方法是将 IoC 容器的实例视为单独的应用程序,因此只注册一个 IBus
是有意义的在里面。
如果您想在单个进程中托管多个应用程序(甚至是具有不同消息 SLA 的应用程序的多个实例),则可以新建多个容器实例。
您引用了我的 SO Answer(问题),所以我将与您分享我是如何实现它的。
正如您将看到的,使用特定的接口,我将命令与事件分开。
然后,就在队列的消费部分,我做了这样的注册:
public static void Register()
{
var assemblies = AppDomain.CurrentDomain.GetAssemblies()
.Where(i => i.FullName.StartsWith("MySolutionPrefix"))
;
var container = new Container();
// http://simpleinjector.readthedocs.io/en/latest/lifetimes.html#perexecutioncontextscope
container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
var serviceType = typeof(IHandleMessages<>).Name;
// this is extension method to get specific handlers
IEnumerable<Type> commandHandlers = assemblies.GetHandlers(serviceType, typeof(ICommand));
container.Register(typeof(IHandleMessages<>), commandHandlers.Concat(new List<Type> { typeof(HistorizeCommandHanlder) }));
// NOTE Just command Handlers
container.RegisterCollection(typeof(IHandleMessages<>), commandHandlers);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
//.... logging, transport (I created my own transport on mongoDb), routing, sagas and so on
.Options(o =>
{
// This simply my personal transport as Register<IOneWayClientTransport>
o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
// this is more complicated because i want that automatically the message is copied on
// a separate topic for each handler
o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
})
.Start();
container.Verify();
// It is necessary because otherwise it sends published messages to no-one
commandHandlers.GetHandledSubTypes(serviceType, typeof(IVersionedEvent))
.ToList()
.ForEach(i => bus.Subscribe(i).Wait());
// NOTE just events handlers
IEnumerable<Type> eventHandlers = assemblies
.GetHandlers(serviceType, typeof(IVersionedEvent))
.Except(commandHandlers)
//.Except(new List<Type> { typeof(TempHandlerLogDecorator) })
;
foreach (var handler in eventHandlers)
ConfigureServiceBus(mongoDbConnectionProvider, db, handler.FullName, new[] { handler });
}
private static IBus ConfigureServiceBus(MongoDbConnectionProvider mongoDbConnectionProvider, IMongoDatabase db, string inputQueueName, IEnumerable<Type> handlers)
{
var container = new Container();
container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
container.RegisterCollection(typeof(IHandleMessages<>), handlers);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.... logging, Subscriptions
// this is a consumer only for inputQueueName
.Transport(t => t.UseMongoDb(db, settings.TopicBasedMessageQueueName, inputQueueName))
.Options(o =>
{
o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
})
.Start();
container.Verify();
handlers.GetHandledSubTypes(typeof(IHandleMessages<>).Name, typeof(IVersionedEvent))
.ToList()
.ForEach(i => bus.Advanced.Topics.Subscribe(i.GetDecoupledTopic(settings.DecouplingHandlersRegistration)).Wait());
return bus;
}
我还在使用 Rebus 3.0.1 和 SimpleInjector 3.2.3。
设置:asp.net mvc 项目中的 Rebus 使用 SimpleInjector。
我需要创建两个处理程序来接收消息,每个处理程序都来自一个特定的队列。按照我在
在 class 库中,我有一个 class 实现了 SimpleInjector IPackage
,它有如下代码:
public void RegisterServices( Container container ) {
container.Register<IHandleMessages<MyMessage>, MyMessageHandler>( Lifestyle.Scoped );
IContainerAdapter adapter = new SimpleInjectorContainerAdapter( container );
Configure.With( adapter )
.Transport( t => t.UseAzureServiceBus( connectionString, "A_QUEUE_NAME", AzureServiceBusMode.Standard ) )
.Options( oc => {
oc.SetNumberOfWorkers( 1 );
oc.SimpleRetryStrategy( errorQueueAddress: "A_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
} )
.Start();
Configure.With(adapter
.Transport(t => t.UseAzureServiceBus(connectionString, "B_QUEUE_NAME")
.Options( oc => {
oc.SetNumberOfWorkers( 1 );
oc.SimpleRetryStrategy( errorQueueAddress: "B_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
} )
.Start();
}
然而,当调试器到达第二个 Configure.With( ... ) 调用时,我终止并显示一条错误消息:
Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.
堆栈跟踪:
[InvalidOperationException: Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.]
SimpleInjector.Internals.NonGenericRegistrationEntry.ThrowWhenTypeAlreadyRegistered(InstanceProducer producer) +102
SimpleInjector.Internals.NonGenericRegistrationEntry.Add(InstanceProducer producer) +59
SimpleInjector.Container.AddInstanceProducer(InstanceProducer producer) +105
SimpleInjector.Container.AddRegistrationInternal(Type serviceType, Registration registration) +69
SimpleInjector.Container.AddRegistration(Type serviceType, Registration registration) +131
SimpleInjector.Container.RegisterSingleton(TService instance) +183
Rebus.SimpleInjector.SimpleInjectorContainerAdapter.SetBus(IBus bus) +55
Rebus.Config.RebusConfigurer.Start() +2356
MyModule.RegisterServices(Container container) +497
SimpleInjector.PackageExtensions.RegisterPackages(Container container, IEnumerable`1 assemblies) +50
Myproject.SimpleInjectorInitializer.InitializeContainer(Container container) +35
Myproject.SimpleInjectorInitializer.Initialize() +68
Myproject.Startup.Configuration(IAppBuilder app) +28
编辑
然后我删除了第二个 Configure.With( ... )
代码块,现在当我执行 _bus.Send( message )
时,我在消费者进程中收到另一个错误
Unhandled exception 1 while handling message with ID fef3acca-97f4-4495-b09d-96e6c9f66c4d: SimpleInjector.ActivationException: No registration for type IEnumerable<IHandleMessages<MyMessage>> could be found. There is, however, a registration for IHandleMessages<MyMessage>; Did you mean to call GetInstance<IHandleMessages<MyMessage>>() or depend on IHandleMessages<MyMessage>? Or did you mean to register a collection of types using RegisterCollection?
堆栈跟踪:
2017-04-13 10:21:03,805 [77] WARN Rebus.Retry.ErrorTracking.InMemErrorTracker -
at SimpleInjector.Container.ThrowMissingInstanceProducerException(Type serviceType)
at SimpleInjector.Container.GetInstanceForRootType[TService]()
at SimpleInjector.Container.GetInstance[TService]()
at SimpleInjector.Container.GetAllInstances[TService]()
at Rebus.SimpleInjector.SimpleInjectorContainerAdapter.<GetHandlers>d__3`1.MoveNext()
异常状态:"Type IBus has already been registered"。根据您的堆栈跟踪,第二次添加 IBus 是在 SimpleInjectorContainerAdapter
内。您必须查明它是何时首次注册的。这很容易做到;刚刚注册了一个虚拟 IBus
作为创建 Container
后的第一次注册,并查看它爆炸的堆栈跟踪。
我通常建议每个容器实例只保留一个 IBus
,因为总线本身可以考虑 "an application",这恰好符合 IoC 容器是一个对象的事实可以 "host" 应用程序在其生命周期内。
Rebus 不提供 Conforming Container 抽象,因为我同意 Mark Seemann 的观点,那是一个注定要失败的项目。事实上,正如the wiki page mentions,Rebus曾经提供处理程序的自动注册,但后来证明这是有问题的。
相反,Rebus 鼓励您提供 "container adapter"(IContainerAdapter
的实现),其职责是:
- 查找处理程序
- 提供一种在正确的方式中注册
IBus
和IMessageContext
的方法
为 Autofac、Castle Windsor、SimpleInjector 等提供开箱即用的容器适配器。但是,不需要提供容器适配器 – Configure.With(...)
咆哮很高兴只收到 "handler activator"(IHandlerActivator
的实现),因此如果您只想使用 IoC 容器查找处理程序并自行注册 IBus
,您也可以通过实现 IHandlerActivator
来实现并在您的容器中查找处理程序。
TL;DR:Rebus 方法是将 IoC 容器的实例视为单独的应用程序,因此只注册一个 IBus
是有意义的在里面。
如果您想在单个进程中托管多个应用程序(甚至是具有不同消息 SLA 的应用程序的多个实例),则可以新建多个容器实例。
您引用了我的 SO Answer(问题),所以我将与您分享我是如何实现它的。 正如您将看到的,使用特定的接口,我将命令与事件分开。
然后,就在队列的消费部分,我做了这样的注册:
public static void Register()
{
var assemblies = AppDomain.CurrentDomain.GetAssemblies()
.Where(i => i.FullName.StartsWith("MySolutionPrefix"))
;
var container = new Container();
// http://simpleinjector.readthedocs.io/en/latest/lifetimes.html#perexecutioncontextscope
container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
var serviceType = typeof(IHandleMessages<>).Name;
// this is extension method to get specific handlers
IEnumerable<Type> commandHandlers = assemblies.GetHandlers(serviceType, typeof(ICommand));
container.Register(typeof(IHandleMessages<>), commandHandlers.Concat(new List<Type> { typeof(HistorizeCommandHanlder) }));
// NOTE Just command Handlers
container.RegisterCollection(typeof(IHandleMessages<>), commandHandlers);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
//.... logging, transport (I created my own transport on mongoDb), routing, sagas and so on
.Options(o =>
{
// This simply my personal transport as Register<IOneWayClientTransport>
o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
// this is more complicated because i want that automatically the message is copied on
// a separate topic for each handler
o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
})
.Start();
container.Verify();
// It is necessary because otherwise it sends published messages to no-one
commandHandlers.GetHandledSubTypes(serviceType, typeof(IVersionedEvent))
.ToList()
.ForEach(i => bus.Subscribe(i).Wait());
// NOTE just events handlers
IEnumerable<Type> eventHandlers = assemblies
.GetHandlers(serviceType, typeof(IVersionedEvent))
.Except(commandHandlers)
//.Except(new List<Type> { typeof(TempHandlerLogDecorator) })
;
foreach (var handler in eventHandlers)
ConfigureServiceBus(mongoDbConnectionProvider, db, handler.FullName, new[] { handler });
}
private static IBus ConfigureServiceBus(MongoDbConnectionProvider mongoDbConnectionProvider, IMongoDatabase db, string inputQueueName, IEnumerable<Type> handlers)
{
var container = new Container();
container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
container.RegisterCollection(typeof(IHandleMessages<>), handlers);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.... logging, Subscriptions
// this is a consumer only for inputQueueName
.Transport(t => t.UseMongoDb(db, settings.TopicBasedMessageQueueName, inputQueueName))
.Options(o =>
{
o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
})
.Start();
container.Verify();
handlers.GetHandledSubTypes(typeof(IHandleMessages<>).Name, typeof(IVersionedEvent))
.ToList()
.ForEach(i => bus.Advanced.Topics.Subscribe(i.GetDecoupledTopic(settings.DecouplingHandlersRegistration)).Wait());
return bus;
}
我还在使用 Rebus 3.0.1 和 SimpleInjector 3.2.3。