Azure.Messaging.ServiceBus 完成或放弃消息时出错
Azure.Messaging.ServiceBus Error when completing or abandoning message
我目前正在考虑将我们非常简单的服务总线服务更新到最新版本 (Asure.Messaging.Servicebus),我 运行 遇到了一个较小的问题。
问题是我想通过将消息委托回我的服务中的方法来手动完成或放弃接收到或达到峰值的消息 class 来处理工作。
到目前为止,这是我的简单 class,由接口公开。
using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;
namespace myProject.Services
{
/// <summary>
/// Service bus service controller
/// </summary>
public class ServiceBusService: IServiceBusService
{
private IConfigurationUtility _configurationUtility;
static string connectionString;
static ServiceBusClient client;
static ServiceBusSender sender;
static ServiceBusReceiver receiver;
public ServiceBusService(IConfigurationUtility configurationUtility)
{
_configurationUtility = configurationUtility;
connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
client = new ServiceBusClient(connectionString);
}
/// <summary>
/// Sending message.
/// </summary>
/// <param name="messageContent"></param>
/// <param name="queueName"></param>
public void SendMessage(string messageContent, string queueName)
{
sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(messageContent);
sender.SendMessageAsync(message).Wait();
}
/// <summary>
/// Receive message.
/// </summary>
/// <returns></returns>
/// <param name="queueName"></param>
public ServiceBusReceivedMessage ReceiveMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
return receivedMessage;
}
/// <summary>
/// Peek message.
/// </summary>
/// <returns></returns>
public ServiceBusReceivedMessage PeekMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
return peekedMessage;
}
/// <summary>
/// Complete message.
/// </summary>
/// <param name="message"></param>
public void CompleteMessage(ServiceBusReceivedMessage message)
{
receiver.CompleteMessageAsync(message).Wait();
}
/// <summary>
/// Abandon message.
/// </summary>
/// <param name="message"></param>
public void AbandonMessage(ServiceBusReceivedMessage message)
{
receiver.AbandonMessageAsync(message).Wait();
}
}
}
当我一次处理一条消息时一切正常,但是如果我一次处理两条消息,我会收到此错误:
"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance"
而且我认为 “或被不同的接收方实例接收” 部分是我的问题。
我是不是想错了?我不应该能够处理例如:一次放弃多个 单个 收到的消息吗?
提前致谢。
更新:
所以我想我设法让它工作,关于 Jesse Squire 被建议做的事情,以反映微软提供的新服务总线文档。
我创建了一个名为 ServicebusFactory 的新 class,添加了下面 Jesse 提供的代码以对其进行测试并更改了“= new();”部分对于示例顶部的初始化“ConcurrentDictionary's”,像这样初始化它们会导致以下错误:
CS8400 Feature 'target-typed object creation' is not available in C# 8.0. Please use language version 9.0 or greater.
结果:
public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
}
然后我为我的服务总线工厂创建了一个接口,并将其作为单例添加到我的“[=”中“ConfigurationService”方法的底部73=]" class,用我的服务总线连接字符串初始化它。
接口:
public interface IServiceBusFactory
{
public ServiceBusSender GetSender(string entity);
public ServiceBusReceiver GetReceiver(string entity);
public ValueTask DisposeAsync();
}
启动:
var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));
最后,这是一个依赖问题,将 servicebusFactory 接口注入我的控制器构造函数,然后使用它来获取“sender”并使用它向我的队列发送消息称为“添加”。
构造函数:
private readonly IServiceBusFactory _serviceBusFactory;
public ServicebusController(IServiceBusFactory serviceBusFactory)
{
_serviceBusFactory = serviceBusFactory;
}
控制器Method/Action实现:
var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));
var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);
服务总线将消息锁与从中接收消息的 AMQP link 相关联。对于 SDK,这意味着您必须使用您用来接收消息的同一 ServiceBusReceiver
实例来结算消息。
在您的代码中,您正在为每个 ReceiveMessage
调用创建一个新的接收者 - 因此当您尝试完成或放弃该消息时,您使用的是一个 link 消息如果对 ReceiveMessage
的任何其他调用已发生,则无效。
通常,您希望避免创建 short-lived 服务总线客户端对象的模式。它们 intended to be long-lived 并在应用程序的整个生命周期内重复使用。在您的代码中,您还隐含地放弃了 senders/receivers 而不关闭它们。这将孤立 AMQP link 直到服务 force-closes 它在 20 分钟后处于空闲状态。
我建议合并您的 senders/receivers 并将每个作为关联队列的单例。对于给定队列,对 SendMessage
或 ReceiveMessage
的每次调用都应使用相同的 sender/receiver 实例。
当您的应用程序关闭时,请务必关闭或处置 ServiceBusClient
,这将确保其所有子 senders/receivers 也得到适当清理。
我也非常强烈建议将您的 class 重构为异步。您正在使用的 sync-over-async 模式会给线程池带来额外的压力,并可能导致线程饥饿 and/or 负载下的死锁。
更新
要添加一些额外的上下文,我建议不要包装服务总线操作,而是拥有一个专注于管理客户端并让调用者直接与它们交互的工厂。
这确保了客户端被合并并且它们的生命周期得到正确管理,同时也让调用者可以灵活地保留 sender/receiver 引用并用于多个操作,而不是支付检索它的费用。
例如,下面是一个简单的工厂 class,您可以在应用程序中将其作为单例创建和管理。调用者可以为特定的 queue/topic/subscription 请求 sender/receiver,它们将根据需要创建,然后合并以供重用。
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusFactory : IAsyncDisposable
{
// If throughput needs scale beyond a single connection, the factory can
// manage multiple clients and ensure that child entities are evenly distributed
// among them.
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
- 请在 Jesse Squire 的回答之前或代替我的回答投票。
我创建了一个略有不同的版本。
我需要多个 service-bus(完全限定的命名空间)而不是一个。 (也就是说,我需要多个服务总线连接字符串)。
而且我需要 dead-letter 在使用 SubQueues 的“new-style”下的支持。
这是我修改后的代码版本。
在撰写本文时,我正在使用
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.7.0" />
..
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus;
namespace Me.MyServiceBusFactories
{
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusMultipleConnectionsFactory : IAsyncDisposable
{
/* create a (hopefully) thread safe ConcurrentDictionary of Lazy<ServiceBusClient>> */
private static readonly ConcurrentDictionary<string, Lazy<ServiceBusClient>> _clients =
new ConcurrentDictionary<string, Lazy<ServiceBusClient>>();
private static readonly ConcurrentDictionary<string, ServiceBusSender> _senders =
new ConcurrentDictionary<string, ServiceBusSender>();
private static readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers =
new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusSender GetSender(string connectionString, string entity) =>
_senders.GetOrAdd(entity,
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateSender(entity));
public ServiceBusReceiver GetReceiver(string fullyQualifiedNamespace, TokenCredential credential, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(fullyQualifiedNamespace, credential).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity, SubQueue sq,
ServiceBusReceiveMode receiveMode) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, sq, receiveMode),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity,
new ServiceBusReceiverOptions() {SubQueue = sq, ReceiveMode = receiveMode}));
private ServiceBusClient LazyInitAndFindServiceBusClient(string fullyQualifiedNamespace,
TokenCredential credential)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(null, fullyQualifiedNamespace, credential),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(fullyQualifiedNamespace, credential)));
return valueFound.Value;
}
private ServiceBusClient LazyInitAndFindServiceBusClient(string connectionString)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(connectionString, null, null),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(connectionString)));
return valueFound.Value;
}
public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<string, Lazy<ServiceBusClient>> currentClient in _clients)
{
await currentClient.Value.Value.DisposeAsync().ConfigureAwait(false);
}
GC.SuppressFinalize(this);
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness, some will be null at times based on connection-string VS (namespace and token-credential) */
private string ComputeServiceBusClientDictionaryKeyName(string connectionString, string fullyQualifiedNamespace,
TokenCredential tc)
{
string returnValue =
$"{connectionString?.GetHashCode()}{fullyQualifiedNamespace?.GetHashCode()}{tc?.GetHashCode()}";
return returnValue;
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness */
private string ComputeReceiverKeyName(string entity, SubQueue sq, ServiceBusReceiveMode receiveMode)
{
string returnValue = $"{entity}{sq.GetHashCode()}{receiveMode.GetHashCode()}";
return returnValue;
}
}
}
...
“用法”
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
public class MyThing : IMyThing
{
private readonly ILogger<MyThing> _logger;
private readonly ServiceBusMultipleConnectionsFactory _busFactory;
public DeadLetterProcessor(ILoggerFactory loggerFactory,
ServiceBusMultipleConnectionsFactory busFactory)
{
this._busFactory = busFactory;
/* not shown, create ILogger from ILoggerFactory */
}
public void DoSomething(string myQueueName)
{
/* I actually inject the POCO that has "MyConnectionString" , but beyond the scope of this answer */
ServiceBusReceiver myReceiver = this._busFactory.GetReceiver("MyConnectionString", myQueueName, SubQueue.DeadLetter,
ServiceBusReceiveMode.ReceiveAndDelete);
}
}
然后我直接注入工厂。
private static IServiceProvider BuildDi(IConfiguration configuration)
{
////setup our DI
IServiceCollection servColl = new ServiceCollection()
.AddLogging()
.AddSingleton<IMyThing, MyThing>()
.AddSingleton<ServiceBusMultipleConnectionsFactory>()
;
其他感兴趣的文章:
https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
EntityNameFormatter(和 FormatDeadLetterPath)发生了什么???
我目前正在考虑将我们非常简单的服务总线服务更新到最新版本 (Asure.Messaging.Servicebus),我 运行 遇到了一个较小的问题。
问题是我想通过将消息委托回我的服务中的方法来手动完成或放弃接收到或达到峰值的消息 class 来处理工作。
到目前为止,这是我的简单 class,由接口公开。
using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;
namespace myProject.Services
{
/// <summary>
/// Service bus service controller
/// </summary>
public class ServiceBusService: IServiceBusService
{
private IConfigurationUtility _configurationUtility;
static string connectionString;
static ServiceBusClient client;
static ServiceBusSender sender;
static ServiceBusReceiver receiver;
public ServiceBusService(IConfigurationUtility configurationUtility)
{
_configurationUtility = configurationUtility;
connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
client = new ServiceBusClient(connectionString);
}
/// <summary>
/// Sending message.
/// </summary>
/// <param name="messageContent"></param>
/// <param name="queueName"></param>
public void SendMessage(string messageContent, string queueName)
{
sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(messageContent);
sender.SendMessageAsync(message).Wait();
}
/// <summary>
/// Receive message.
/// </summary>
/// <returns></returns>
/// <param name="queueName"></param>
public ServiceBusReceivedMessage ReceiveMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
return receivedMessage;
}
/// <summary>
/// Peek message.
/// </summary>
/// <returns></returns>
public ServiceBusReceivedMessage PeekMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
return peekedMessage;
}
/// <summary>
/// Complete message.
/// </summary>
/// <param name="message"></param>
public void CompleteMessage(ServiceBusReceivedMessage message)
{
receiver.CompleteMessageAsync(message).Wait();
}
/// <summary>
/// Abandon message.
/// </summary>
/// <param name="message"></param>
public void AbandonMessage(ServiceBusReceivedMessage message)
{
receiver.AbandonMessageAsync(message).Wait();
}
}
}
当我一次处理一条消息时一切正常,但是如果我一次处理两条消息,我会收到此错误:
"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance"
而且我认为 “或被不同的接收方实例接收” 部分是我的问题。
我是不是想错了?我不应该能够处理例如:一次放弃多个 单个 收到的消息吗?
提前致谢。
更新:
所以我想我设法让它工作,关于 Jesse Squire 被建议做的事情,以反映微软提供的新服务总线文档。
我创建了一个名为 ServicebusFactory 的新 class,添加了下面 Jesse 提供的代码以对其进行测试并更改了“= new();”部分对于示例顶部的初始化“ConcurrentDictionary's”,像这样初始化它们会导致以下错误:
CS8400 Feature 'target-typed object creation' is not available in C# 8.0. Please use language version 9.0 or greater.
结果:
public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
}
然后我为我的服务总线工厂创建了一个接口,并将其作为单例添加到我的“[=”中“ConfigurationService”方法的底部73=]" class,用我的服务总线连接字符串初始化它。
接口:
public interface IServiceBusFactory
{
public ServiceBusSender GetSender(string entity);
public ServiceBusReceiver GetReceiver(string entity);
public ValueTask DisposeAsync();
}
启动:
var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));
最后,这是一个依赖问题,将 servicebusFactory 接口注入我的控制器构造函数,然后使用它来获取“sender”并使用它向我的队列发送消息称为“添加”。
构造函数:
private readonly IServiceBusFactory _serviceBusFactory;
public ServicebusController(IServiceBusFactory serviceBusFactory)
{
_serviceBusFactory = serviceBusFactory;
}
控制器Method/Action实现:
var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));
var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);
服务总线将消息锁与从中接收消息的 AMQP link 相关联。对于 SDK,这意味着您必须使用您用来接收消息的同一 ServiceBusReceiver
实例来结算消息。
在您的代码中,您正在为每个 ReceiveMessage
调用创建一个新的接收者 - 因此当您尝试完成或放弃该消息时,您使用的是一个 link 消息如果对 ReceiveMessage
的任何其他调用已发生,则无效。
通常,您希望避免创建 short-lived 服务总线客户端对象的模式。它们 intended to be long-lived 并在应用程序的整个生命周期内重复使用。在您的代码中,您还隐含地放弃了 senders/receivers 而不关闭它们。这将孤立 AMQP link 直到服务 force-closes 它在 20 分钟后处于空闲状态。
我建议合并您的 senders/receivers 并将每个作为关联队列的单例。对于给定队列,对 SendMessage
或 ReceiveMessage
的每次调用都应使用相同的 sender/receiver 实例。
当您的应用程序关闭时,请务必关闭或处置 ServiceBusClient
,这将确保其所有子 senders/receivers 也得到适当清理。
我也非常强烈建议将您的 class 重构为异步。您正在使用的 sync-over-async 模式会给线程池带来额外的压力,并可能导致线程饥饿 and/or 负载下的死锁。
更新
要添加一些额外的上下文,我建议不要包装服务总线操作,而是拥有一个专注于管理客户端并让调用者直接与它们交互的工厂。
这确保了客户端被合并并且它们的生命周期得到正确管理,同时也让调用者可以灵活地保留 sender/receiver 引用并用于多个操作,而不是支付检索它的费用。
例如,下面是一个简单的工厂 class,您可以在应用程序中将其作为单例创建和管理。调用者可以为特定的 queue/topic/subscription 请求 sender/receiver,它们将根据需要创建,然后合并以供重用。
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusFactory : IAsyncDisposable
{
// If throughput needs scale beyond a single connection, the factory can
// manage multiple clients and ensure that child entities are evenly distributed
// among them.
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
- 请在 Jesse Squire 的回答之前或代替我的回答投票。
我创建了一个略有不同的版本。
我需要多个 service-bus(完全限定的命名空间)而不是一个。 (也就是说,我需要多个服务总线连接字符串)。
而且我需要 dead-letter 在使用 SubQueues 的“new-style”下的支持。
这是我修改后的代码版本。
在撰写本文时,我正在使用
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.7.0" />
..
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus;
namespace Me.MyServiceBusFactories
{
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusMultipleConnectionsFactory : IAsyncDisposable
{
/* create a (hopefully) thread safe ConcurrentDictionary of Lazy<ServiceBusClient>> */
private static readonly ConcurrentDictionary<string, Lazy<ServiceBusClient>> _clients =
new ConcurrentDictionary<string, Lazy<ServiceBusClient>>();
private static readonly ConcurrentDictionary<string, ServiceBusSender> _senders =
new ConcurrentDictionary<string, ServiceBusSender>();
private static readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers =
new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusSender GetSender(string connectionString, string entity) =>
_senders.GetOrAdd(entity,
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateSender(entity));
public ServiceBusReceiver GetReceiver(string fullyQualifiedNamespace, TokenCredential credential, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(fullyQualifiedNamespace, credential).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity, SubQueue sq,
ServiceBusReceiveMode receiveMode) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, sq, receiveMode),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity,
new ServiceBusReceiverOptions() {SubQueue = sq, ReceiveMode = receiveMode}));
private ServiceBusClient LazyInitAndFindServiceBusClient(string fullyQualifiedNamespace,
TokenCredential credential)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(null, fullyQualifiedNamespace, credential),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(fullyQualifiedNamespace, credential)));
return valueFound.Value;
}
private ServiceBusClient LazyInitAndFindServiceBusClient(string connectionString)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(connectionString, null, null),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(connectionString)));
return valueFound.Value;
}
public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<string, Lazy<ServiceBusClient>> currentClient in _clients)
{
await currentClient.Value.Value.DisposeAsync().ConfigureAwait(false);
}
GC.SuppressFinalize(this);
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness, some will be null at times based on connection-string VS (namespace and token-credential) */
private string ComputeServiceBusClientDictionaryKeyName(string connectionString, string fullyQualifiedNamespace,
TokenCredential tc)
{
string returnValue =
$"{connectionString?.GetHashCode()}{fullyQualifiedNamespace?.GetHashCode()}{tc?.GetHashCode()}";
return returnValue;
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness */
private string ComputeReceiverKeyName(string entity, SubQueue sq, ServiceBusReceiveMode receiveMode)
{
string returnValue = $"{entity}{sq.GetHashCode()}{receiveMode.GetHashCode()}";
return returnValue;
}
}
}
...
“用法”
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
public class MyThing : IMyThing
{
private readonly ILogger<MyThing> _logger;
private readonly ServiceBusMultipleConnectionsFactory _busFactory;
public DeadLetterProcessor(ILoggerFactory loggerFactory,
ServiceBusMultipleConnectionsFactory busFactory)
{
this._busFactory = busFactory;
/* not shown, create ILogger from ILoggerFactory */
}
public void DoSomething(string myQueueName)
{
/* I actually inject the POCO that has "MyConnectionString" , but beyond the scope of this answer */
ServiceBusReceiver myReceiver = this._busFactory.GetReceiver("MyConnectionString", myQueueName, SubQueue.DeadLetter,
ServiceBusReceiveMode.ReceiveAndDelete);
}
}
然后我直接注入工厂。
private static IServiceProvider BuildDi(IConfiguration configuration)
{
////setup our DI
IServiceCollection servColl = new ServiceCollection()
.AddLogging()
.AddSingleton<IMyThing, MyThing>()
.AddSingleton<ServiceBusMultipleConnectionsFactory>()
;
其他感兴趣的文章:
https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
EntityNameFormatter(和 FormatDeadLetterPath)发生了什么???