Rebus 基于内容的多个队列

Rebus multiple Queues based on content

设置:asp.net mvc 项目中的 Rebus 使用 SimpleInjector。

我需要创建两个处理程序来接收消息,每个处理程序都来自一个特定的队列。按照我在 上找到的内容,我创建了类似的代码。

在 class 库中,我有一个 class 实现了 SimpleInjector IPackage,它有如下代码:

public void RegisterServices( Container container ) {
    container.Register<IHandleMessages<MyMessage>, MyMessageHandler>( Lifestyle.Scoped );
    IContainerAdapter adapter = new SimpleInjectorContainerAdapter( container );
    Configure.With( adapter )
             .Transport( t => t.UseAzureServiceBus( connectionString, "A_QUEUE_NAME", AzureServiceBusMode.Standard ) )
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "A_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();

    Configure.With(adapter
             .Transport(t => t.UseAzureServiceBus(connectionString, "B_QUEUE_NAME")
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "B_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();
}

然而,当调试器到达第二个 Configure.With( ... ) 调用时,我终止并显示一条错误消息:

Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.

堆栈跟踪:

[InvalidOperationException: Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.]
   SimpleInjector.Internals.NonGenericRegistrationEntry.ThrowWhenTypeAlreadyRegistered(InstanceProducer producer) +102
   SimpleInjector.Internals.NonGenericRegistrationEntry.Add(InstanceProducer producer) +59
   SimpleInjector.Container.AddInstanceProducer(InstanceProducer producer) +105
   SimpleInjector.Container.AddRegistrationInternal(Type serviceType, Registration registration) +69
   SimpleInjector.Container.AddRegistration(Type serviceType, Registration registration) +131
   SimpleInjector.Container.RegisterSingleton(TService instance) +183
   Rebus.SimpleInjector.SimpleInjectorContainerAdapter.SetBus(IBus bus) +55
   Rebus.Config.RebusConfigurer.Start() +2356
   MyModule.RegisterServices(Container container) +497
   SimpleInjector.PackageExtensions.RegisterPackages(Container container, IEnumerable`1 assemblies) +50
   Myproject.SimpleInjectorInitializer.InitializeContainer(Container container) +35
   Myproject.SimpleInjectorInitializer.Initialize() +68
   Myproject.Startup.Configuration(IAppBuilder app) +28

编辑

然后我删除了第二个 Configure.With( ... ) 代码块,现在当我执行 _bus.Send( message ) 时,我在消费者进程中收到另一个错误

Unhandled exception 1 while handling message with ID fef3acca-97f4-4495-b09d-96e6c9f66c4d: SimpleInjector.ActivationException: No registration for type IEnumerable<IHandleMessages<MyMessage>> could be found. There is, however, a registration for IHandleMessages<MyMessage>; Did you mean to call GetInstance<IHandleMessages<MyMessage>>() or depend on IHandleMessages<MyMessage>? Or did you mean to register a collection of types using RegisterCollection?

堆栈跟踪:

2017-04-13 10:21:03,805 [77] WARN  Rebus.Retry.ErrorTracking.InMemErrorTracker - 
   at SimpleInjector.Container.ThrowMissingInstanceProducerException(Type serviceType)
   at SimpleInjector.Container.GetInstanceForRootType[TService]()
   at SimpleInjector.Container.GetInstance[TService]()
   at SimpleInjector.Container.GetAllInstances[TService]()
   at Rebus.SimpleInjector.SimpleInjectorContainerAdapter.<GetHandlers>d__3`1.MoveNext()

异常状态:"Type IBus has already been registered"。根据您的堆栈跟踪,第二次添加 IBus 是在 SimpleInjectorContainerAdapter 内。您必须查明它是何时首次注册的。这很容易做到;刚刚注册了一个虚拟 IBus 作为创建 Container 后的第一次注册,并查看它爆炸的堆栈跟踪。

我通常建议每个容器实例只保留一个 IBus,因为总线本身可以考虑 "an application",这恰好符合 IoC 容器是一个对象的事实可以 "host" 应用程序在其生命周期内。

Rebus 不提供 Conforming Container 抽象,因为我同意 Mark Seemann 的观点,那是一个注定要失败的项目。事实上,正如the wiki page mentions,Rebus曾经提供处理程序的自动注册,但后来证明这是有问题的。

相反,Rebus 鼓励您提供 "container adapter"(IContainerAdapter 的实现),其职责是:

  • 查找处理程序
  • 提供一种在正确的方式中注册 IBusIMessageContext 的方法

为 Autofac、Castle Windsor、SimpleInjector 等提供开箱即用的容器适配器。但是,不需要提供容器适配器 – Configure.With(...) 咆哮很高兴只收到 "handler activator"(IHandlerActivator 的实现),因此如果您只想使用 IoC 容器查找处理程序并自行注册 IBus,您也可以通过实现 IHandlerActivator 来实现并在您的容器中查找处理程序。

TL;DR:Rebus 方法是将 IoC 容器的实例视为单独的应用程序,因此只注册一个 IBus 是有意义的在里面。

如果您想在单个进程中托管多个应用程序(甚至是具有不同消息 SLA 的应用程序的多个实例),则可以新建多个容器实例。

您引用了我的 SO Answer(问题),所以我将与您分享我是如何实现它的。 正如您将看到的,使用特定的接口,我将命令与事件分开。

然后,就在队列的消费部分,我做了这样的注册:

    public static void Register()
    {
        var assemblies = AppDomain.CurrentDomain.GetAssemblies()
            .Where(i => i.FullName.StartsWith("MySolutionPrefix"))
            ;

        var container = new Container();

        // http://simpleinjector.readthedocs.io/en/latest/lifetimes.html#perexecutioncontextscope
        container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();


        var serviceType = typeof(IHandleMessages<>).Name;
        // this is extension method to get specific handlers
        IEnumerable<Type> commandHandlers = assemblies.GetHandlers(serviceType, typeof(ICommand));
        container.Register(typeof(IHandleMessages<>), commandHandlers.Concat(new List<Type> { typeof(HistorizeCommandHanlder) }));

        // NOTE Just command Handlers
        container.RegisterCollection(typeof(IHandleMessages<>), commandHandlers);


        var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            //.... logging, transport (I created my own transport on mongoDb), routing, sagas and so on
            .Options(o =>
            {
                // This simply my personal transport as Register<IOneWayClientTransport> 
                o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                // this is more complicated because i want that automatically the message is copied on
                // a separate topic for each handler
                o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
            })
            .Start();

        container.Verify();

        // It is necessary because otherwise it sends published messages to no-one 
        commandHandlers.GetHandledSubTypes(serviceType, typeof(IVersionedEvent))
            .ToList()
            .ForEach(i => bus.Subscribe(i).Wait());

        // NOTE just events handlers
        IEnumerable<Type> eventHandlers = assemblies
            .GetHandlers(serviceType, typeof(IVersionedEvent))
            .Except(commandHandlers)
            //.Except(new List<Type> { typeof(TempHandlerLogDecorator) })
            ;
        foreach (var handler in eventHandlers)
            ConfigureServiceBus(mongoDbConnectionProvider, db, handler.FullName, new[] { handler });
    }

    private static IBus ConfigureServiceBus(MongoDbConnectionProvider mongoDbConnectionProvider, IMongoDatabase db, string inputQueueName, IEnumerable<Type> handlers)
    {
        var container = new Container();

        container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();

        container.RegisterCollection(typeof(IHandleMessages<>), handlers);

        var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            .... logging, Subscriptions
            // this is a consumer only for inputQueueName
            .Transport(t => t.UseMongoDb(db, settings.TopicBasedMessageQueueName, inputQueueName))
            .Options(o =>
            {
                o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
            })
            .Start();

        container.Verify();

        handlers.GetHandledSubTypes(typeof(IHandleMessages<>).Name, typeof(IVersionedEvent))
            .ToList()
            .ForEach(i => bus.Advanced.Topics.Subscribe(i.GetDecoupledTopic(settings.DecouplingHandlersRegistration)).Wait());

        return bus;
    }

我还在使用 Rebus 3.0.1 和 SimpleInjector 3.2.3。