提供的锁无效。锁已过期,或者消息已从队列中删除

The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue

我正在使用 Microsoft azure 服务总线队列来处理计算,我的程序可以正常运行几个小时,但是从那时起我处理的每条消息都会出现此异常。我不知道从哪里开始,因为前几个小时一切正常。我的代码似乎也很准确。我将 post 处理 azure 服务总线消息的方法。

public static async Task processCalculations(BrokeredMessage message)
    {
        try
        {
            if (message != null)
            {
                if (connection == null || !connection.IsConnected)
                {
                    connection = await ConnectionMultiplexer.ConnectAsync("connection,SyncTimeout=10000,ConnectTimeout=10000");
                    //connection = ConnectionMultiplexer.Connect("connection,SyncTimeout=10000,ConnectTimeout=10000");
                }

                cache = connection.GetDatabase();

                string sandpKey = message.Properties["sandp"].ToString();
                string dateKey = message.Properties["date"].ToString();
                string symbolclassKey = message.Properties["symbolclass"].ToString();
                string stockdataKey = message.Properties["stockdata"].ToString();
                string stockcomparedataKey = message.Properties["stockcomparedata"].ToString();

                var sandpTask = cache.GetAsync<List<StockData>>(sandpKey);
                var dateTask = cache.GetAsync<DateTime>(dateKey);
                var symbolinfoTask = cache.GetAsync<SymbolInfo>(symbolclassKey);
                var stockdataTask = cache.GetAsync<List<StockData>>(stockdataKey);
                var stockcomparedataTask = cache.GetAsync<List<StockMarketCompare>>(stockcomparedataKey);

                await Task.WhenAll(sandpTask, dateTask, symbolinfoTask,
                    stockdataTask, stockcomparedataTask);

                List<StockData> sandp = sandpTask.Result;
                DateTime date = dateTask.Result;
                SymbolInfo symbolinfo = symbolinfoTask.Result;
                List<StockData> stockdata = stockdataTask.Result;
                List<StockMarketCompare> stockcomparedata = stockcomparedataTask.Result;

                StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);

                if (rating != null)
                {
                    saveToTable(rating);
                    if (message.LockedUntilUtc.Minute <= 1)
                    {
                        await message.RenewLockAsync();
                    }
                    await message.CompleteAsync(); // getting exception here
                }
                else
                {
                    Console.WriteLine("Message " + message.MessageId + " Completed!");
                    await message.CompleteAsync();
                }
            }
        }
        catch (TimeoutException time)
        {
            Console.WriteLine(time.Message);
        }
        catch (MessageLockLostException locks)
        {
            Console.WriteLine(locks.Message);
        }
        catch (RedisConnectionException redis)
        {
            Console.WriteLine("Start the redis server service!");
        }
        catch (MessagingCommunicationException communication)
        {
            Console.WriteLine(communication.Message);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

更新:我检查了锁到期前的时间,如果需要它,我调用锁更新,但它更新锁没有错误,但我仍然收到此异常。

timeLeft = message.LockedUntilUtc - DateTime.UtcNow;
  if (timeLeft.TotalMinutes <= 2)
                    {
                        //Console.WriteLine("Renewed lock! " + ((TimeSpan)(message.LockedUntilUtc - DateTime.UtcNow)).TotalMinutes);
                        message.RenewLock();
                    }

catch (MessageLockLostException locks)
        {
            Console.WriteLine("Delivery Count: " + message.DeliveryCount);
            Console.WriteLine("Enqueued Time: " + message.EnqueuedTimeUtc);
            Console.WriteLine("Expires Time: " + message.ExpiresAtUtc);
            Console.WriteLine("Locked Until Time: " + message.LockedUntilUtc);
            Console.WriteLine("Scheduled Enqueue Time: " + message.ScheduledEnqueueTimeUtc);
            Console.WriteLine("Current Time: " + DateTime.UtcNow);
            Console.WriteLine("Time Left: " + timeLeft);
        }

到目前为止,我所知道的是我的代码可以正常运行一段时间,更新锁被调用并工作,但我仍然遇到锁异常,在该异常中,我输出了 timeleft 并且它不断增加时间代码运行时的差异让我相信锁过期之前的时间不会以某种方式改变?

创建客户端订阅时,不要手动更新锁,而是尝试使用客户端的 OnMessage Options() 自动更新它,如下所示:

        OnMessageOptions options = new OnMessageOptions();

        options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

        try
        {
            client = Subscription.CreateClient();

            client.OnMessageAsync(MessageReceivedComplete, options);
        }
        catch (Exception ex)
        {
            throw new Exception (ex);
        }

我花了几个小时试图理解为什么我得到 MessageLockLostException。我的原因是 AutoComplete 默认为 true。

如果您要调用 messsage.Complete()(或 CompleteAsync()),那么您应该实例化一个 OnMessageOptions 对象,将 AutoComplete 设置为 false,然后将其传递给你的 OnMessage 电话。

var options = new OnMessageOptions();
options.AutoComplete = false;

client.OnMessage(processCalculations, options);

我遇到了类似的问题。消息已成功处理,但当它们完成时,服务总线不再具有有效锁。原来我的TopicClient.PrefetchCount偏高了

似乎所有预取消息一被提取就开始锁定。如果您的累积消息处理时间超过锁定超时,则所有其他预取消息都将无法完成。它将 return 到服务总线。

就我而言,只是我在本地机器上使用 V2,而我已经部署了 V1-运行。

由于 V1 部署在 Azure 中(更接近同一个队列)并在发布模式下编译(相对于我在调试模式下的本地版本),部署的版本总是“赢得”队列资源的并发性。

这就是消息不再在队列中的原因:它被我的代码的部署版本消耗了。我知道它有点愚蠢。

我花了 2 天时间解决类似问题 - 同样的异常。
此异常可能有多种原因,我将描述几个可能对您陌生人有所帮助的配置选项...

ServiceBus 队列或主题订阅配置:

  • 队列/主题订阅的消息锁定持续时间 太低,将其设置为大约。消息处理时间

ServiceBusClient 选项配置:

  • tryTimeout太短,设置为~10s用于诊断

ServiceBusProcessor 选项配置:

  • AutoCompleteMessages 默认设置为 true,设置为 false
  • PrefetchCount 太高,为了诊断将其设置为 0
  • ReceiveMode 设置为 ServiceBusReceiveMode.PeekLock
  • MaxConcurrentCalls 用于诊断将其设置为 1

找到正确的值(针对给定系统优化)后,我不再发现任何问题。