公共交通和多个总线实例
masstransit and multiple bus instances
根据 MT 文档,每个总线实例都应具有不同的队列名称。
我可以假设以下是正确的配置吗?
是什么将 WebApi 和后端队列连接在一起?
考虑更高级的场景
当我将后端管道设计为也使用消息处理时,我可以稍后将其切片并让它通过有线传输非常轻松地使用。问题是,我能否以某种方式配置 MT,以便 Azure 配置的总线将消息中继到配置了本地传输的总线?
我将在支持 Azure 服务总线和 RabbitMQ 的 MassTransit v3 的上下文中回答这个问题。如果您要使用 Azure,我强烈建议您使用 v3 而不是 v2 的传输。
首先,关于请求。它们应该发送,而不是发布。它们通常是自然命令,而不是事件。我看到人们发布请求的唯一原因是他们没有服务的端点地址。所以了解端点有很大帮助。
其次,在您的示例中,每个 WebAPI 实例都应该有自己的队列来接收响应,因为它们会被发送回请求者。使用 MT3,每个 IBus
实例都有一个专门为此目的设置的唯一自动删除队列,以处理来自请求的响应。
GitHub 上的 MassTransit 存储库中有一个示例 Sample-RequestResponse,它展示了如何使用 RabbitMQ 进行设置。 Azure 服务总线也差不多。
将它们组合在一起的 "fabric" 是虚拟主机(在 RabbitMQ 中)或名称空间(在 ASB 中)。主题和队列之间的联系决定了它们如何协同工作以形成逻辑 bus.
如果有人试图通过我们的 .NET Core + DI 注册 多个 总线:
- 不要使用
AddBus
调用中的构建
- 无论您做什么,它都不会注册超过一辆到期的公交车
- 这是因为它在内部调用
TryAddSingleton
调用
TryAddSingleton
如果还没有为接口注册的实例,则只向 DI 容器添加一个新实例
- 注意:
- 没有抛出异常或错误
我们使用的解决方案
由于所需的各种接口不是通用的:
- 围绕内置接口创建通用包装器
- 创建了唯一标识每个 RegisteredBus 的接口(使用通用参数)
- 当创建一个新的 Wrapper 实例时,我们将内置接口的实例传递给它的构造函数
- Wrapper 然后将内置内部接口的实例保存在 public 属性
Instance
- 而不是注入例如。
IBus
,我们现在注入 IBus<MyRegisteredBus>
- 然后我们使用包装器的
Instance
属性 来访问内置接口实例并将其存储以备后用(之后包装器不再起作用)
我们希望不必使用某种带有奇怪 Instance
属性 的包装器,但如果内置接口变得通用或使用类似 DynamicProxies
的东西,我们无法想出更优雅的解决方案。
非常欢迎提出想法/反馈。
代码
通用 AddBus
调用(否则签名与内置调用 100% 相同):
public static void AddBus<TBusType>(this IServiceCollection services, Func<IServiceProvider, IBusControl> busFactory)
where TBusType : class, IBusType
{
IBusControl<TBusType> BusFactory(IServiceProvider serviceProvider)
{
return new BusControl<TBusType>(busFactory(serviceProvider));
}
services.AddSingleton<IBusControl<TBusType>>(BusFactory);
services.AddSingleton<IBus<TBusType>>(provider => new Bus<TBusType>(provider.GetRequiredService<IBusControl<TBusType>>().Instance));
}
我们为实现这一目标而创建的各种接口/类:
// the only purpose of the interfaces derived from `IBusType` is to uniquely idnetify a registered Bus
public interface IBusType { }
public interface IHosted : IBusType { }
public interface ILocal : IBusType { }
public interface IBusTypeWrapper<TBusType, TInterface>
where TBusType : IBusType
{
public TInterface Instance { get; }
}
public class BusTypeWrapper<TBusType, TInterface> : IBusTypeWrapper<TBusType, TInterface>
where TBusType : IBusType
{
public TInterface Instance { get; }
public BusTypeWrapper(TInterface instance)
{
Instance = instance;
}
}
public interface IBusControl<T> : IBusTypeWrapper<T, IBusControl> where T : IBusType { }
public class BusControl<T> : BusTypeWrapper<T, IBusControl>, IBusControl<T> where T : IBusType
{
public BusControl(IBusControl instance) : base(instance) { }
}
public interface IBus<T> : IBusTypeWrapper<T, IBus> where T : IBusType { }
public class Bus<T> : BusTypeWrapper<T, IBus>, IBus<T> where T : IBusType
{
public Bus(IBus instance) : base(instance) { }
}
public interface ISendEndpointProvider<T> : IBusTypeWrapper<T, ISendEndpointProvider> where T : IBusType { }
public class SendEndpointProvider<T> : BusTypeWrapper<T, ISendEndpointProvider>, ISendEndpointProvider<T> where T : IBusType
{
public SendEndpointProvider(ISendEndpointProvider instance) : base(instance) { }
}
如何注册泛型 ISendEndpointProvider
:
services.AddSingleton<ISendEndpointProvider<ILocal>>(provider => new SendEndpointProvider<ILocal>(provider.GetRequiredService<IBusControl<ILocal>>().Instance));
更新
到每个总线类型的 IHosted 服务:
- 创建通用 HostedService<BusType>
服务
- 在构造函数中注入 IBusControl<BusType>
- 并使用注入的实例开始停止特定的总线
然后为每种总线类型注册一个IHostedService
。
services.AddSingleton<IHostedService, HostedService<ILocal>>(); services.AddSingleton<IHostedService, HostedService<IHosted>>();`
根据 MT 文档,每个总线实例都应具有不同的队列名称。
我可以假设以下是正确的配置吗?
是什么将 WebApi 和后端队列连接在一起?
考虑更高级的场景
当我将后端管道设计为也使用消息处理时,我可以稍后将其切片并让它通过有线传输非常轻松地使用。问题是,我能否以某种方式配置 MT,以便 Azure 配置的总线将消息中继到配置了本地传输的总线?
我将在支持 Azure 服务总线和 RabbitMQ 的 MassTransit v3 的上下文中回答这个问题。如果您要使用 Azure,我强烈建议您使用 v3 而不是 v2 的传输。
首先,关于请求。它们应该发送,而不是发布。它们通常是自然命令,而不是事件。我看到人们发布请求的唯一原因是他们没有服务的端点地址。所以了解端点有很大帮助。
其次,在您的示例中,每个 WebAPI 实例都应该有自己的队列来接收响应,因为它们会被发送回请求者。使用 MT3,每个 IBus
实例都有一个专门为此目的设置的唯一自动删除队列,以处理来自请求的响应。
GitHub 上的 MassTransit 存储库中有一个示例 Sample-RequestResponse,它展示了如何使用 RabbitMQ 进行设置。 Azure 服务总线也差不多。
将它们组合在一起的 "fabric" 是虚拟主机(在 RabbitMQ 中)或名称空间(在 ASB 中)。主题和队列之间的联系决定了它们如何协同工作以形成逻辑 bus.
如果有人试图通过我们的 .NET Core + DI 注册 多个 总线:
- 不要使用
AddBus
调用中的构建- 无论您做什么,它都不会注册超过一辆到期的公交车
- 这是因为它在内部调用
TryAddSingleton
调用 TryAddSingleton
如果还没有为接口注册的实例,则只向 DI 容器添加一个新实例- 注意:
- 没有抛出异常或错误
我们使用的解决方案
由于所需的各种接口不是通用的:
- 围绕内置接口创建通用包装器
- 创建了唯一标识每个 RegisteredBus 的接口(使用通用参数)
- 当创建一个新的 Wrapper 实例时,我们将内置接口的实例传递给它的构造函数
- Wrapper 然后将内置内部接口的实例保存在 public 属性
Instance
- 而不是注入例如。
IBus
,我们现在注入IBus<MyRegisteredBus>
- 然后我们使用包装器的
Instance
属性 来访问内置接口实例并将其存储以备后用(之后包装器不再起作用)
- 然后我们使用包装器的
我们希望不必使用某种带有奇怪 Instance
属性 的包装器,但如果内置接口变得通用或使用类似 DynamicProxies
的东西,我们无法想出更优雅的解决方案。
非常欢迎提出想法/反馈。
代码
通用 AddBus
调用(否则签名与内置调用 100% 相同):
public static void AddBus<TBusType>(this IServiceCollection services, Func<IServiceProvider, IBusControl> busFactory)
where TBusType : class, IBusType
{
IBusControl<TBusType> BusFactory(IServiceProvider serviceProvider)
{
return new BusControl<TBusType>(busFactory(serviceProvider));
}
services.AddSingleton<IBusControl<TBusType>>(BusFactory);
services.AddSingleton<IBus<TBusType>>(provider => new Bus<TBusType>(provider.GetRequiredService<IBusControl<TBusType>>().Instance));
}
我们为实现这一目标而创建的各种接口/类:
// the only purpose of the interfaces derived from `IBusType` is to uniquely idnetify a registered Bus
public interface IBusType { }
public interface IHosted : IBusType { }
public interface ILocal : IBusType { }
public interface IBusTypeWrapper<TBusType, TInterface>
where TBusType : IBusType
{
public TInterface Instance { get; }
}
public class BusTypeWrapper<TBusType, TInterface> : IBusTypeWrapper<TBusType, TInterface>
where TBusType : IBusType
{
public TInterface Instance { get; }
public BusTypeWrapper(TInterface instance)
{
Instance = instance;
}
}
public interface IBusControl<T> : IBusTypeWrapper<T, IBusControl> where T : IBusType { }
public class BusControl<T> : BusTypeWrapper<T, IBusControl>, IBusControl<T> where T : IBusType
{
public BusControl(IBusControl instance) : base(instance) { }
}
public interface IBus<T> : IBusTypeWrapper<T, IBus> where T : IBusType { }
public class Bus<T> : BusTypeWrapper<T, IBus>, IBus<T> where T : IBusType
{
public Bus(IBus instance) : base(instance) { }
}
public interface ISendEndpointProvider<T> : IBusTypeWrapper<T, ISendEndpointProvider> where T : IBusType { }
public class SendEndpointProvider<T> : BusTypeWrapper<T, ISendEndpointProvider>, ISendEndpointProvider<T> where T : IBusType
{
public SendEndpointProvider(ISendEndpointProvider instance) : base(instance) { }
}
如何注册泛型 ISendEndpointProvider
:
services.AddSingleton<ISendEndpointProvider<ILocal>>(provider => new SendEndpointProvider<ILocal>(provider.GetRequiredService<IBusControl<ILocal>>().Instance));
更新
到每个总线类型的 IHosted 服务:
- 创建通用 HostedService<BusType>
服务
- 在构造函数中注入 IBusControl<BusType>
- 并使用注入的实例开始停止特定的总线
然后为每种总线类型注册一个IHostedService
。
services.AddSingleton<IHostedService, HostedService<ILocal>>(); services.AddSingleton<IHostedService, HostedService<IHosted>>();`