公共交通和多个总线实例

masstransit and multiple bus instances

根据 MT 文档,每个总线实例都应具有不同的队列名称。

我可以假设以下是正确的配置吗?

是什么将 WebApi 和后端队列连接在一起?

考虑更高级的场景

当我将后端管道设计为也使用消息处理时,我可以稍后将其切片并让它通过有线传输非常轻松地使用。问题是,我能否以某种方式配置 MT,以便 Azure 配置的总线将消息中继到配置了本地传输的总线?

我将在支持 Azure 服务总线和 RabbitMQ 的 MassTransit v3 的上下文中回答这个问题。如果您要使用 Azure,我强烈建议您使用 v3 而不是 v2 的传输。

首先,关于请求。它们应该发送,而不是发布。它们通常是自然命令,而不是事件。我看到人们发布请求的唯一原因是他们没有服务的端点地址。所以了解端点有很大帮助。

其次,在您的示例中,每个 WebAPI 实例都应该有自己的队列来接收响应,因为它们会被发送回请求者。使用 MT3,每个 IBus 实例都有一个专门为此目的设置的唯一自动删除队列,以处理来自请求的响应。

GitHub 上的 MassTransit 存储库中有一个示例 Sample-RequestResponse,它展示了如何使用 RabbitMQ 进行设置。 Azure 服务总线也差不多。

将它们组合在一起的 "fabric" 是虚拟主机(在 RabbitMQ 中)或名称空间(在 ASB 中)。主题和队列之间的联系决定了它们如何协同工作以形成逻辑 bus.

如果有人试图通过我们的 .NET Core + DI 注册 多个 总线:

  • 不要使用 AddBus 调用中的构建
    • 无论您做什么,它都不会注册超过一辆到期的公交车
    • 这是因为它在内部调用 TryAddSingleton 调用
    • TryAddSingleton 如果还没有为接口注册的实例,则只向 DI 容器添加一个新实例
    • 注意:
      • 没有抛出异常或错误

我们使用的解决方案

由于所需的各种接口不是通用的:

  • 围绕内置接口创建通用包装器
  • 创建了唯一标识每个 RegisteredBus 的接口(使用通用参数)
  • 当创建一个新的 Wrapper 实例时,我们将内置接口的实例传递给它的构造函数
  • Wrapper 然后将内置内部接口的实例保存在 public 属性 Instance
  • 而不是注入例如。 IBus,我们现在注入 IBus<MyRegisteredBus>
    • 然后我们使用包装器的 Instance 属性 来访问内置接口实例并将其存储以备后用(之后包装器不再起作用)

我们希望不必使用某种带有奇怪 Instance 属性 的包装器,但如果内置接口变得通用或使用类似 DynamicProxies 的东西,我们无法想出更优雅的解决方案。

非常欢迎提出想法/反馈。

代码

通用 AddBus 调用(否则签名与内置调用 100% 相同):

        public static void AddBus<TBusType>(this IServiceCollection services, Func<IServiceProvider, IBusControl> busFactory)
            where TBusType : class, IBusType
        {
            IBusControl<TBusType> BusFactory(IServiceProvider serviceProvider)
            {
                return new BusControl<TBusType>(busFactory(serviceProvider));
            }

            services.AddSingleton<IBusControl<TBusType>>(BusFactory);
            services.AddSingleton<IBus<TBusType>>(provider => new Bus<TBusType>(provider.GetRequiredService<IBusControl<TBusType>>().Instance));
        }

我们为实现这一目标而创建的各种接口/类:

    // the only purpose of the interfaces derived from `IBusType` is to uniquely idnetify a registered Bus
    public interface IBusType { }
    public interface IHosted : IBusType { }
    public interface ILocal : IBusType { }

    public interface IBusTypeWrapper<TBusType, TInterface>
        where TBusType : IBusType
    {
        public TInterface Instance { get; }
    }

    public class BusTypeWrapper<TBusType, TInterface> : IBusTypeWrapper<TBusType, TInterface>
        where TBusType : IBusType
    {
        public TInterface Instance { get; }

        public BusTypeWrapper(TInterface instance)
        {
            Instance = instance;
        }
    }

    public interface IBusControl<T> : IBusTypeWrapper<T, IBusControl> where T : IBusType { }
    public class BusControl<T> : BusTypeWrapper<T, IBusControl>, IBusControl<T> where T : IBusType
    {
        public BusControl(IBusControl instance) : base(instance) { }
    }

    public interface IBus<T> : IBusTypeWrapper<T, IBus> where T : IBusType { }
    public class Bus<T> : BusTypeWrapper<T, IBus>, IBus<T> where T : IBusType
    {
        public Bus(IBus instance) : base(instance) { }
    }

    public interface ISendEndpointProvider<T> : IBusTypeWrapper<T, ISendEndpointProvider> where T : IBusType { }
    public class SendEndpointProvider<T> : BusTypeWrapper<T, ISendEndpointProvider>, ISendEndpointProvider<T> where T : IBusType
    {
        public SendEndpointProvider(ISendEndpointProvider instance) : base(instance) { }
    }

如何注册泛型 ISendEndpointProvider:

services.AddSingleton<ISendEndpointProvider<ILocal>>(provider => new SendEndpointProvider<ILocal>(provider.GetRequiredService<IBusControl<ILocal>>().Instance));

更新

到每个总线类型的 IHosted 服务: - 创建通用 HostedService<BusType> 服务 - 在构造函数中注入 IBusControl<BusType> - 并使用注入的实例开始停止特定的总线

然后为每种总线类型注册一个IHostedService

services.AddSingleton<IHostedService, HostedService<ILocal>>(); services.AddSingleton<IHostedService, HostedService<IHosted>>();`