在 DI 的上下文中,我如何在 MassTransit 中使用 `AddMediator()` 扩展方法将 `IConsumeMessageObserver` 连接到消费者?

How can i connnect `IConsumeMessageObserver` to the consumer, in the context of DI, using `AddMediator()` extension method, in MassTransit?

我遇到了一个难题,我不知道如何使用 AddMediator() 扩展方法在 DI 上下文中添加 IConsumeMessageObserver 实体。在该部分 (https://masstransit-project.com/advanced/observers.html#received-messages) 中,仅显示如何手动操作。

我尝试在 AddMediator() 调用后解析 IMediator,如下所示:

private static void BindInMemoryBroker ( IServiceCollection services )
        {
            services.AddMediator ( configuration =>
              {
                  AddInMemoryAddOnBroker ( configuration );
              } );

            var serviceProvider = services.BuildServiceProvider ();

            var mediator = serviceProvider.GetService<IMediator> ();

            var observer1 = serviceProvider.GetService<MailingObserver<RetrieveAddOnDownloadLinkContract>> ();

            var observer2 = serviceProvider.GetService<IncrementAddOnDownloadCounterObserver<RetrieveAddOnDownloadLinkContract>> ();

            mediator.ConnectConsumeMessageObserver ( observer1 );

            mediator.ConnectConsumeMessageObserver ( observer2 );
        }

        private static void AddInMemoryAddOnBroker ( IServiceCollectionMediatorConfigurator configuration )
        {
            AddInMemoryConsumers ( configuration );

            AddInMemoryContracts ( configuration );
        }

        private static void AddInMemoryConsumers ( IServiceCollectionMediatorConfigurator configuration )
        {

            configuration.AddConsumer<RetrieveAddOnDownloadLinkConsumer> ();

        }

        private static void AddInMemoryContracts ( IServiceCollectionMediatorConfigurator configuration )
        {
            configuration.AddRequestClient<RetrieveAddOnDownloadLinkContract> ();
        }

但是没有用,请帮助我

消费观察者有两种类型,一种是传递所有消息类型(IConsumeObserver),另一种是只传递单一消息类型(IConsumeMessageObserver)。后者只能连接到 IBusIMediator,因此它不能在 AddMediatorAddMassTransit 配置中配置。

您可以添加 IConsumeObserver 并使用模式匹配 select 仅您需要的消息类型作为替代。