未续订 Azure 服务总线消息锁?
Azure Service Bus message lock not being renewed?
我在 Azure 服务总线中构建了一个支持多个队列订阅的服务,但我遇到了一些奇怪的行为。
我的订阅单例 class 有一个如下所示的方法:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
想法是,您订阅 Azure 服务总线以获取特定类型的消息,它直接对应于一个队列。在您的订阅中,您传递一个委托来决定如何处理消息。
这似乎有效...但有一个警告。
无论我为 MaxAutoRenewDuration
或 OperationTimeout
设置了什么 ttl
,对于任何给定消息的长 运行 进程,一分钟后消息从队列中解锁,另一个订阅者拿起它并开始处理它。
我的理解是,这正是 MaxAutoRenewDuration
应该阻止的...但它似乎并没有阻止任何事情。
任何人都可以告诉我我需要做些什么来确保消费者拥有消息直到完成吗?
我能想到一些您可能想要查看的选项。
不要在 SubscriptionClient 中使用默认值 ReceiveMode = PeekLock
,而是将其设置为 ReceiveAndDelete,这样一旦消息被使用,它就会从队列中移除,并且不会被任何人使用其他客户端,这确实意味着你必须优雅地处理异常并自己执行重试;
看看 OperationTimeout
根据 doco 是 Duration after which individual operations will timeout
事实证明,消费者 运行 的远程进程静默失败并且没有返回失败状态代码(或其他任何内容);自动刷新机制挂起等待结果,所以消息超时。
我不清楚如何防止这种情况发生,但是一旦我解决了远程进程上的问题,问题就不再重现了。
故事的寓意:如果一切正常,但仍然超时,似乎自动刷新机制与您正在等待的异步操作共享一些资源。这可能是另一个寻找失败的地方。
我在 Azure 服务总线中构建了一个支持多个队列订阅的服务,但我遇到了一些奇怪的行为。
我的订阅单例 class 有一个如下所示的方法:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
想法是,您订阅 Azure 服务总线以获取特定类型的消息,它直接对应于一个队列。在您的订阅中,您传递一个委托来决定如何处理消息。
这似乎有效...但有一个警告。
无论我为 MaxAutoRenewDuration
或 OperationTimeout
设置了什么 ttl
,对于任何给定消息的长 运行 进程,一分钟后消息从队列中解锁,另一个订阅者拿起它并开始处理它。
我的理解是,这正是 MaxAutoRenewDuration
应该阻止的...但它似乎并没有阻止任何事情。
任何人都可以告诉我我需要做些什么来确保消费者拥有消息直到完成吗?
我能想到一些您可能想要查看的选项。
不要在 SubscriptionClient 中使用默认值
ReceiveMode = PeekLock
,而是将其设置为 ReceiveAndDelete,这样一旦消息被使用,它就会从队列中移除,并且不会被任何人使用其他客户端,这确实意味着你必须优雅地处理异常并自己执行重试;看看
OperationTimeout
根据 doco 是Duration after which individual operations will timeout
事实证明,消费者 运行 的远程进程静默失败并且没有返回失败状态代码(或其他任何内容);自动刷新机制挂起等待结果,所以消息超时。
我不清楚如何防止这种情况发生,但是一旦我解决了远程进程上的问题,问题就不再重现了。
故事的寓意:如果一切正常,但仍然超时,似乎自动刷新机制与您正在等待的异步操作共享一些资源。这可能是另一个寻找失败的地方。