管理多个 queue/topic 客户?

Managing multiple queue/topic clients?

这里的挑战是我试图创建一个单一的外观来处理队列和主题,同时保持 SendPublish

的语义

例如:

public interface IServiceBus
{
    Task Send<T>(T message, string destination, SendOptions options = null);
    Task Publish<T>(T message, string topic, SendOptions options = null);
}

Send() 会将消息发送到队列,Publish() 会将消息发布到主题。所以我需要有一个 IQueueClientITopicClient 的实例来实现这些;我会将这些作为依赖项注入到我的 IServiceBus 实现中,并相应地调用它们。

问题是 QueueClient 和 TopicClient 要求您在更新客户端时指定它们的目的地,这使我无法将其作为我的 IServiceBus 实现的参数。

我可以在创建消息时创建一个客户端,但这样效率极低。我四处寻找至少一个可以充当客户端工厂的连接管理器,但 MessagingFactory 似乎不在此 SDK 中(Microsoft.Azure.ServiceBus 3.4.0)。

所以问题是 - 是否有某种工厂可供我使用,它可以让我按需创建合适的客户,其效率与重用客户所获得的效率相同? - 我应该使用某种覆盖或替代客户端对象来实现这种效果吗?这两个客户端真的很有限。

因为我相信我们可以假设 QueueClient and TopicClient are thread safe 的实例,您可以做的是将已解析的 IServiceBus 具体 class 注册为您的 IoC 容器中的单例。

在具体的 ServiceBus 中,您可以创建以前看到的主题和队列客户端的缓存:

private readonly ConcurrentDictionary<string, ITopicClient> _topicClientCache
    = new ConcurrentDictionary<string, ITopicClient>();
private readonly ConcurrentDictionary<string, IQueueClient> _queueClientCache
    = new ConcurrentDictionary<string, IQueueClient>();

然后在你的Publish方法中

public async Task Publish<T>(T message, string destination, ...)
{
    // i.e. destination is the topic
    if (!_topicClientCache.TryGetValue(destination, out var topicClient))
    {
        topicClient = new TopicClient(_myServiceBusConnectionString, destination);
        _topicClientCache.TryAdd(destination, topicClient);
    }
    ... create and serialize message into azureMessage here
    await topicClient.SendAsync(azureMessage);
}

这同样适用于您的 Send 实现 - 它会检查 _queueClientCache 的目标(队列名称),并在它第一次看到它时创建并缓存它。

我终于遇到了有类似问题的人。事实证明,他们删除了 MessagingFactory,但使连接可重用。每个客户端都有一个接受连接的构造函数重载,所以我将连接注册为单例并注入它而不是客户端,然后按需创建客户端。

参见:https://github.com/Azure/azure-service-bus-dotnet/issues/556

我的解决方案看起来有点像这样(为简洁起见省略了完整实现)

public class AzureServiceBus : IServiceBus
{
    public AzureServiceBus(ServiceBusConnection connection, string replyTo)
    {
        _connection = connection;
        _replyTo = replyTo;
        _retryPolicy = new RetryExponential(
            TimeSpan.FromSeconds(1),
            TimeSpan.FromMinutes(1),
            10);
    }

    public async Task Send<T>(T message, string destination)
    {
        var client = new QueueClient(_connection, destination, ReceiveMode.PeekLock, _retryPolicy);

        // ... do work
    }

    public async Task Publish<T>(T message, string topic, SendOptions options = null)
    {
        var client = new TopicClient(_connection, topic, _retryPolicy);

        // ... do work
    }
}