连接总线时断开消费者
Disconnect consumer while bus is connected
我们有以下用例:
我们有两条总线(内部和外部)。这个想法是我们自己的服务使用内部总线,所有第三方服务使用外部总线。我们创建了一个充当消息路由器的服务(恰如其分地命名为 MessageRouter)。
当在内部总线上发布消息时,MessageRouter 可以拾取该消息并将其放置在外部总线上,反之亦然。通过配置我们可以知道哪些消息被允许从内部传递到外部或从外部传递到内部。这本身就可以正常工作。
我们对消息进行了分组,我们有事件、命令和请求。每个服务都有三个 ReceiveEnpoint,每个消息类型一个。这意味着所有事件都在事件队列等中排队。服务 'subscribes' 获取其自己的消费者的每条消息。
var queues = new Dictionary<string, IEnumerable<Type>>
{
// ReSharper disable once PossibleMultipleEnumeration
{ "Events", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IEvent).IsAssignableFrom(ga))) },
// ReSharper disable once PossibleMultipleEnumeration
{ "Commands", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(ICommand).IsAssignableFrom(ga))) },
// ReSharper disable once PossibleMultipleEnumeration
{ "Queries", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IQuery).IsAssignableFrom(ga))) }
};
foreach (var queue in queues)
{
config.ReceiveEndpoint(GetConsumerQueueName(queue.Key), cfg =>
{
foreach (var consumerType in queue.Value)
{
cfg.Consumer(consumerType, consumerContainer.Resolve);
}
configurator?.Invoke((T)cfg);
});
}
其中 config
是 IBusFactoryConfigurator
。此代码在服务启动时调用。
我们希望能够在我们的 MessageRouter 中做的是 'dynamically' 添加,更重要的是,从 ReceiveEndpoint 中删除消费者。
到目前为止,我们还没有运气。我们尝试通过在 BusControl
实例上使用 ConnectConsumer
方法来添加消费者。这给了我们一个 ConnectHandle
,它有一个 Disconnect
方法。但是,使用这种方法时,我们的消费者不会接收到消息。查看句柄向我们显示它是 MultipleConnectHandle
,但是句柄没有 'internal' 个句柄。
有什么方法可以使用 Consumer
方法来注册不同的消费者并获取他们的 ConnectHandle
,以便我们可以 Disconnect
如果需要的话?
如前所述,理想情况下,我们希望能够动态地向 ReceiveEndpoint 添加和删除消费者。
当总线启动时,您不能 add/remove 接收端点上的消费者,这在任何方式、形状或形式上均不受支持。
但是,您可以将单独队列上的新接收端点连接到一个或多个消费者。在你上面的例子中,你似乎没有在连接接收端点之前让你的消费者注册,这就是为什么你在句柄集合中看不到任何东西。
我们有以下用例:
我们有两条总线(内部和外部)。这个想法是我们自己的服务使用内部总线,所有第三方服务使用外部总线。我们创建了一个充当消息路由器的服务(恰如其分地命名为 MessageRouter)。
当在内部总线上发布消息时,MessageRouter 可以拾取该消息并将其放置在外部总线上,反之亦然。通过配置我们可以知道哪些消息被允许从内部传递到外部或从外部传递到内部。这本身就可以正常工作。
我们对消息进行了分组,我们有事件、命令和请求。每个服务都有三个 ReceiveEnpoint,每个消息类型一个。这意味着所有事件都在事件队列等中排队。服务 'subscribes' 获取其自己的消费者的每条消息。
var queues = new Dictionary<string, IEnumerable<Type>>
{
// ReSharper disable once PossibleMultipleEnumeration
{ "Events", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IEvent).IsAssignableFrom(ga))) },
// ReSharper disable once PossibleMultipleEnumeration
{ "Commands", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(ICommand).IsAssignableFrom(ga))) },
// ReSharper disable once PossibleMultipleEnumeration
{ "Queries", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IQuery).IsAssignableFrom(ga))) }
};
foreach (var queue in queues)
{
config.ReceiveEndpoint(GetConsumerQueueName(queue.Key), cfg =>
{
foreach (var consumerType in queue.Value)
{
cfg.Consumer(consumerType, consumerContainer.Resolve);
}
configurator?.Invoke((T)cfg);
});
}
其中 config
是 IBusFactoryConfigurator
。此代码在服务启动时调用。
我们希望能够在我们的 MessageRouter 中做的是 'dynamically' 添加,更重要的是,从 ReceiveEndpoint 中删除消费者。
到目前为止,我们还没有运气。我们尝试通过在 BusControl
实例上使用 ConnectConsumer
方法来添加消费者。这给了我们一个 ConnectHandle
,它有一个 Disconnect
方法。但是,使用这种方法时,我们的消费者不会接收到消息。查看句柄向我们显示它是 MultipleConnectHandle
,但是句柄没有 'internal' 个句柄。
有什么方法可以使用 Consumer
方法来注册不同的消费者并获取他们的 ConnectHandle
,以便我们可以 Disconnect
如果需要的话?
如前所述,理想情况下,我们希望能够动态地向 ReceiveEndpoint 添加和删除消费者。
当总线启动时,您不能 add/remove 接收端点上的消费者,这在任何方式、形状或形式上均不受支持。
但是,您可以将单独队列上的新接收端点连接到一个或多个消费者。在你上面的例子中,你似乎没有在连接接收端点之前让你的消费者注册,这就是为什么你在句柄集合中看不到任何东西。