管理多个 queue/topic 客户?
Managing multiple queue/topic clients?
这里的挑战是我试图创建一个单一的外观来处理队列和主题,同时保持 Send
与 Publish
的语义
例如:
public interface IServiceBus
{
Task Send<T>(T message, string destination, SendOptions options = null);
Task Publish<T>(T message, string topic, SendOptions options = null);
}
Send()
会将消息发送到队列,Publish()
会将消息发布到主题。所以我需要有一个 IQueueClient
和 ITopicClient
的实例来实现这些;我会将这些作为依赖项注入到我的 IServiceBus
实现中,并相应地调用它们。
问题是 QueueClient 和 TopicClient 要求您在更新客户端时指定它们的目的地,这使我无法将其作为我的 IServiceBus
实现的参数。
我可以在创建消息时创建一个客户端,但这样效率极低。我四处寻找至少一个可以充当客户端工厂的连接管理器,但 MessagingFactory
似乎不在此 SDK 中(Microsoft.Azure.ServiceBus 3.4.0)。
所以问题是
- 是否有某种工厂可供我使用,它可以让我按需创建合适的客户,其效率与重用客户所获得的效率相同?
- 我应该使用某种覆盖或替代客户端对象来实现这种效果吗?这两个客户端真的很有限。
因为我相信我们可以假设 QueueClient and TopicClient are thread safe 的实例,您可以做的是将已解析的 IServiceBus
具体 class 注册为您的 IoC 容器中的单例。
在具体的 ServiceBus 中,您可以创建以前看到的主题和队列客户端的缓存:
private readonly ConcurrentDictionary<string, ITopicClient> _topicClientCache
= new ConcurrentDictionary<string, ITopicClient>();
private readonly ConcurrentDictionary<string, IQueueClient> _queueClientCache
= new ConcurrentDictionary<string, IQueueClient>();
然后在你的Publish
方法中
public async Task Publish<T>(T message, string destination, ...)
{
// i.e. destination is the topic
if (!_topicClientCache.TryGetValue(destination, out var topicClient))
{
topicClient = new TopicClient(_myServiceBusConnectionString, destination);
_topicClientCache.TryAdd(destination, topicClient);
}
... create and serialize message into azureMessage here
await topicClient.SendAsync(azureMessage);
}
这同样适用于您的 Send
实现 - 它会检查 _queueClientCache
的目标(队列名称),并在它第一次看到它时创建并缓存它。
我终于遇到了有类似问题的人。事实证明,他们删除了 MessagingFactory,但使连接可重用。每个客户端都有一个接受连接的构造函数重载,所以我将连接注册为单例并注入它而不是客户端,然后按需创建客户端。
参见:https://github.com/Azure/azure-service-bus-dotnet/issues/556
我的解决方案看起来有点像这样(为简洁起见省略了完整实现)
public class AzureServiceBus : IServiceBus
{
public AzureServiceBus(ServiceBusConnection connection, string replyTo)
{
_connection = connection;
_replyTo = replyTo;
_retryPolicy = new RetryExponential(
TimeSpan.FromSeconds(1),
TimeSpan.FromMinutes(1),
10);
}
public async Task Send<T>(T message, string destination)
{
var client = new QueueClient(_connection, destination, ReceiveMode.PeekLock, _retryPolicy);
// ... do work
}
public async Task Publish<T>(T message, string topic, SendOptions options = null)
{
var client = new TopicClient(_connection, topic, _retryPolicy);
// ... do work
}
}
这里的挑战是我试图创建一个单一的外观来处理队列和主题,同时保持 Send
与 Publish
例如:
public interface IServiceBus
{
Task Send<T>(T message, string destination, SendOptions options = null);
Task Publish<T>(T message, string topic, SendOptions options = null);
}
Send()
会将消息发送到队列,Publish()
会将消息发布到主题。所以我需要有一个 IQueueClient
和 ITopicClient
的实例来实现这些;我会将这些作为依赖项注入到我的 IServiceBus
实现中,并相应地调用它们。
问题是 QueueClient 和 TopicClient 要求您在更新客户端时指定它们的目的地,这使我无法将其作为我的 IServiceBus
实现的参数。
我可以在创建消息时创建一个客户端,但这样效率极低。我四处寻找至少一个可以充当客户端工厂的连接管理器,但 MessagingFactory
似乎不在此 SDK 中(Microsoft.Azure.ServiceBus 3.4.0)。
所以问题是 - 是否有某种工厂可供我使用,它可以让我按需创建合适的客户,其效率与重用客户所获得的效率相同? - 我应该使用某种覆盖或替代客户端对象来实现这种效果吗?这两个客户端真的很有限。
因为我相信我们可以假设 QueueClient and TopicClient are thread safe 的实例,您可以做的是将已解析的 IServiceBus
具体 class 注册为您的 IoC 容器中的单例。
在具体的 ServiceBus 中,您可以创建以前看到的主题和队列客户端的缓存:
private readonly ConcurrentDictionary<string, ITopicClient> _topicClientCache
= new ConcurrentDictionary<string, ITopicClient>();
private readonly ConcurrentDictionary<string, IQueueClient> _queueClientCache
= new ConcurrentDictionary<string, IQueueClient>();
然后在你的Publish
方法中
public async Task Publish<T>(T message, string destination, ...)
{
// i.e. destination is the topic
if (!_topicClientCache.TryGetValue(destination, out var topicClient))
{
topicClient = new TopicClient(_myServiceBusConnectionString, destination);
_topicClientCache.TryAdd(destination, topicClient);
}
... create and serialize message into azureMessage here
await topicClient.SendAsync(azureMessage);
}
这同样适用于您的 Send
实现 - 它会检查 _queueClientCache
的目标(队列名称),并在它第一次看到它时创建并缓存它。
我终于遇到了有类似问题的人。事实证明,他们删除了 MessagingFactory,但使连接可重用。每个客户端都有一个接受连接的构造函数重载,所以我将连接注册为单例并注入它而不是客户端,然后按需创建客户端。
参见:https://github.com/Azure/azure-service-bus-dotnet/issues/556
我的解决方案看起来有点像这样(为简洁起见省略了完整实现)
public class AzureServiceBus : IServiceBus
{
public AzureServiceBus(ServiceBusConnection connection, string replyTo)
{
_connection = connection;
_replyTo = replyTo;
_retryPolicy = new RetryExponential(
TimeSpan.FromSeconds(1),
TimeSpan.FromMinutes(1),
10);
}
public async Task Send<T>(T message, string destination)
{
var client = new QueueClient(_connection, destination, ReceiveMode.PeekLock, _retryPolicy);
// ... do work
}
public async Task Publish<T>(T message, string topic, SendOptions options = null)
{
var client = new TopicClient(_connection, topic, _retryPolicy);
// ... do work
}
}