使用 ServiceBus SubscriptionClient 时出现 RenewLock 错误
RenewLock error using ServiceBus SubscriptionClient
我正在 .net Core 2.1 中编写一个控制台应用程序,我的目的是在 ServiceBus 中收听有关主题的消息并使用 NEST api 处理到达 Elasticsearch 的新消息(NEST 可能无关紧要对于我的问题,但希望保持透明)。
我在 ServiceBus 中的主题实体称为 "test",我有一个订阅也称为 "test"(完整路径为 "test/subscriptions/test")。
在我的 .net Core 控制台应用程序中,我有以下 NuGet 引用:
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
我在使用 .net Standard ServiceBus Api 时遇到一个非常奇怪的问题,我经常收到更新锁定错误:
Message handler encountered an exception
Microsoft.Azure.ServiceBus.MessageLockLostException
我已经将我的代码剥离回一个非常可重现的示例:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;
namespace SampleApp
{
public class Program
{
private static SubscriptionClient _subscriptionClient;
private static IElasticClient _elasticClient;
private static string ServiceBusConnectionString = "[connectionString]";
private static string TopicName = "test";
private static string SubscriptionName = "test";
public static void Main(string[] args)
{
var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
_elasticClient = new ElasticClient(elasticsearchSettings);
_subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};
// Register the function that processes messages.
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
Console.WriteLine("INFO: Process message handler registered, listening for messages");
Console.Read();
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Message received.
var content = Encoding.UTF8.GetString(message.Body);
var messageBody = JsonConvert.DeserializeObject<string[]>(content);
Console.WriteLine($"INFO: Message arrived: {message}");
Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
try
{
var response = _elasticClient.Ping();
if (!response.IsValid && response.OriginalException != null)
Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
else
Console.WriteLine("INFO: ElasticSearch was contacted successfully");
}
catch (Exception e)
{
Console.WriteLine("!ERROR!: " + e);
}
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine("INFO: Message completed");
}
// Use this handler to examine the exceptions received on the message pump.
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
return Task.CompletedTask;
}
}
此代码与从此处获取的示例几乎相同:
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions
我故意 "pinging" 一个不存在的 Elasticsearch 实例来产生有助于我重现问题的套接字异常。
我注意到的一件事是,当我创建一个新主题并设置 EnabledPartioning = false 时,问题没有发生。
有人以前看过这个吗?似乎是 ServiceBus 代码本身深处的问题。
注意:我尝试使用 Receiver 通过 "ReceiveAsync" 读取消息,但在这种情况下我也遇到了这个错误。此外,我的测试驱动程序是从 .net Framework ServiceBus 客户端(它确实与分区一起工作)转移到 .net Core 版本。
提前感谢您的指点!!
我建议您在订阅 MaxAutoRenewDuration = TimeSpan.FromSeconds(xxxx)
上设置更长的锁定时间,或者您可以只使用 message.RenewLock()
.
希望对您有所帮助!
在我上面的案例中,问题归结为对我的配置的轻微误解。
在 Azure 中,如果您导航至:
资源组 > ServiceBusInstance > 主题 > testTopic > testSubscription
您可以找到订阅属性。发送消息时,您将在此处看到锁定的持续时间。这默认为 60 秒,但我将漫长的 运行 过程延长到最多 5 分钟,如下所示:
然后在代码中,为我的订阅客户端连接属性时,我需要确保 MaxAutoRenewDuration
属性 设置正确。
我假设这个 属性 意味着如果你为此定义了 30 秒,在内部,订阅客户端将每 30 秒更新一次锁,因此如果你的最大到期时间是 5 分钟,例如,只要您正在处理消息,锁就会被更新...
事实上,属性 实际表示的是他们在订阅客户端内部进行锁定更新的最长时间。
因此,如果您将其设置为 24 小时,例如Timespan.FromHours(24)
并且您的处理需要 12 个小时,它会被续订。但是,如果您使用 Timespan.FromHours(12)
将其设置为 12 小时,而您的代码 运行 将其设置为 24 小时,那么当您完成消息时,它会给出一个 lockLost 异常(因为我在更短的时间间隔内获得了以上!) .
我做过的一件事很容易实现,它是在运行时从订阅属性中动态提取 LockDuration
(我所有的主题都可以有不同的配置)并应用 MaxAutoRenewDuration
适当地使用它。
代码示例:
sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds
注意 - 我正在使用 Azure.Management.Fluent 包构建 sbNamespace。
希望对其他人有所帮助!
我正在 .net Core 2.1 中编写一个控制台应用程序,我的目的是在 ServiceBus 中收听有关主题的消息并使用 NEST api 处理到达 Elasticsearch 的新消息(NEST 可能无关紧要对于我的问题,但希望保持透明)。
我在 ServiceBus 中的主题实体称为 "test",我有一个订阅也称为 "test"(完整路径为 "test/subscriptions/test")。
在我的 .net Core 控制台应用程序中,我有以下 NuGet 引用:
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
我在使用 .net Standard ServiceBus Api 时遇到一个非常奇怪的问题,我经常收到更新锁定错误:
Message handler encountered an exception Microsoft.Azure.ServiceBus.MessageLockLostException
我已经将我的代码剥离回一个非常可重现的示例:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;
namespace SampleApp
{
public class Program
{
private static SubscriptionClient _subscriptionClient;
private static IElasticClient _elasticClient;
private static string ServiceBusConnectionString = "[connectionString]";
private static string TopicName = "test";
private static string SubscriptionName = "test";
public static void Main(string[] args)
{
var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
_elasticClient = new ElasticClient(elasticsearchSettings);
_subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};
// Register the function that processes messages.
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
Console.WriteLine("INFO: Process message handler registered, listening for messages");
Console.Read();
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Message received.
var content = Encoding.UTF8.GetString(message.Body);
var messageBody = JsonConvert.DeserializeObject<string[]>(content);
Console.WriteLine($"INFO: Message arrived: {message}");
Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
try
{
var response = _elasticClient.Ping();
if (!response.IsValid && response.OriginalException != null)
Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
else
Console.WriteLine("INFO: ElasticSearch was contacted successfully");
}
catch (Exception e)
{
Console.WriteLine("!ERROR!: " + e);
}
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine("INFO: Message completed");
}
// Use this handler to examine the exceptions received on the message pump.
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
$"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
return Task.CompletedTask;
}
}
此代码与从此处获取的示例几乎相同: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions
我故意 "pinging" 一个不存在的 Elasticsearch 实例来产生有助于我重现问题的套接字异常。
我注意到的一件事是,当我创建一个新主题并设置 EnabledPartioning = false 时,问题没有发生。
有人以前看过这个吗?似乎是 ServiceBus 代码本身深处的问题。
注意:我尝试使用 Receiver 通过 "ReceiveAsync" 读取消息,但在这种情况下我也遇到了这个错误。此外,我的测试驱动程序是从 .net Framework ServiceBus 客户端(它确实与分区一起工作)转移到 .net Core 版本。
提前感谢您的指点!!
我建议您在订阅 MaxAutoRenewDuration = TimeSpan.FromSeconds(xxxx)
上设置更长的锁定时间,或者您可以只使用 message.RenewLock()
.
希望对您有所帮助!
在我上面的案例中,问题归结为对我的配置的轻微误解。 在 Azure 中,如果您导航至:
资源组 > ServiceBusInstance > 主题 > testTopic > testSubscription
您可以找到订阅属性。发送消息时,您将在此处看到锁定的持续时间。这默认为 60 秒,但我将漫长的 运行 过程延长到最多 5 分钟,如下所示:
然后在代码中,为我的订阅客户端连接属性时,我需要确保 MaxAutoRenewDuration
属性 设置正确。
我假设这个 属性 意味着如果你为此定义了 30 秒,在内部,订阅客户端将每 30 秒更新一次锁,因此如果你的最大到期时间是 5 分钟,例如,只要您正在处理消息,锁就会被更新...
事实上,属性 实际表示的是他们在订阅客户端内部进行锁定更新的最长时间。
因此,如果您将其设置为 24 小时,例如Timespan.FromHours(24)
并且您的处理需要 12 个小时,它会被续订。但是,如果您使用 Timespan.FromHours(12)
将其设置为 12 小时,而您的代码 运行 将其设置为 24 小时,那么当您完成消息时,它会给出一个 lockLost 异常(因为我在更短的时间间隔内获得了以上!) .
我做过的一件事很容易实现,它是在运行时从订阅属性中动态提取 LockDuration
(我所有的主题都可以有不同的配置)并应用 MaxAutoRenewDuration
适当地使用它。
代码示例:
sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds
注意 - 我正在使用 Azure.Management.Fluent 包构建 sbNamespace。
希望对其他人有所帮助!