Azure ServiceBus AbandonMessageAsync 在不一致的时间发布消息

Azure ServiceBus AbandonMessageAsync releasing message at inconsistent times

我需要检查死信队列,如果存在某些情况(比如超过 30 天),我想将它存档到某个数据存储(而不是删除它)。所以我要抓取消息,如果它满足这个条件,将它保存到某个存储和 complete/delete 消息,如果不满足,则放弃它。我有一个控制台应用程序,我在其中从 dlq 获取消息,它似乎可以工作,但如果我 运行 一遍又一遍,我会看到返回的消息数量不一致。它将所有这些都进行几次迭代(在我的示例中为 7),但随后它将开始仅获得 6、0 或 1,并最终返回到 dql 中的全部数量(例如 30 秒后我认为这是 peek lock 的默认锁定期)。我会假设每次我 运行 这个,我应该得到所有消息,因为我放弃了 运行 之前的消息。

我正在使用 Azure.Messaging.ServiceBus 7.8.1,您似乎只是将消息对象传递给放弃方法。如果有人有什么建议那就太好了!

github 中的代码:https://github.com/ndn2323/bustest

using Azure.Messaging.ServiceBus;
using System.Text;

namespace BusReceiver
{
    public class TaskRunner
    {
        public TaskRunner() { }
        public async Task Run() {            
            const string DLQPATH = "/$deadletterqueue";
            var maxMsgCount = 50;
            var connectionString = "[ConnectionString]";
            var topicName = "testtopic1";
            var subscriberName = "testsub1";
            var subscriberDlqName = subscriberName + DLQPATH;
            var client = new ServiceBusClient(connectionString);
            var options = new ServiceBusReceiverOptions();
            options.ReceiveMode = ServiceBusReceiveMode.PeekLock;
            var receiver = client.CreateReceiver(topicName, subscriberName, options);
            var receiverDlq = client.CreateReceiver(topicName, subscriberDlqName, options);

            Log("Starting receive from regular queue");
            var msgList = await receiver.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
            Log(msgList.Count.ToString() + " messages found");
            foreach (var msg in msgList)
            {
                await receiver.DeadLetterMessageAsync(msg);
            }

            Log("Starting receive from dead letter queue");
            var msgListDlq = await receiverDlq.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));            
            Log(msgListDlq.Count.ToString() + " messages found in dlq");
            foreach (var msg in msgListDlq) {
                Log("MessageId: " + msg.MessageId + " Body: " + Encoding.ASCII.GetString(msg.Body));
                // if some condition, archieve message to some data store, else abandon it to be picked up again
                // for this test I'm abandoning all messages                
                await receiverDlq.AbandonMessageAsync(msg);
            }

            await receiver.CloseAsync();
            await receiverDlq.CloseAsync();
        }

        private void Log(string msg) {
            Console.WriteLine(DateTime.Now.ToString() + ": " + msg);
        }
    }
}

输出示例:

C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:36 PM: Starting receive from regular queue
5/29/2022 11:45:37 PM: 0 messages found
5/29/2022 11:45:37 PM: Starting receive from dead letter queue
5/29/2022 11:45:37 PM: 7 messages found in dlq
5/29/2022 11:45:37 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage

C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:42 PM: Starting receive from regular queue
5/29/2022 11:45:43 PM: 0 messages found
5/29/2022 11:45:43 PM: Starting receive from dead letter queue
5/29/2022 11:45:43 PM: 7 messages found in dlq
5/29/2022 11:45:43 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage

C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:48 PM: Starting receive from regular queue
5/29/2022 11:45:49 PM: 0 messages found
5/29/2022 11:45:49 PM: Starting receive from dead letter queue
5/29/2022 11:45:49 PM: 1 messages found in dlq
5/29/2022 11:45:49 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage

C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:46:03 PM: Starting receive from regular queue
5/29/2022 11:46:04 PM: 0 messages found
5/29/2022 11:46:04 PM: Starting receive from dead letter queue
5/29/2022 11:46:04 PM: 1 messages found in dlq
5/29/2022 11:46:04 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage

由于网络、服务和您的应用程序的变化,在调用 ReceiveMessagesAsync 时看到返回大小不一致的批次是正常的。

接收时,没有最小批量。接收方将向 link 添加足够的信用以允许 maxMessageCount 从服务流出,但不会等待尝试构建该大小的批次。一旦从服务中传输了任何消息,它们将作为批次返回。因为您指定了 maxWaitTime,如果在该时间内服务上没有可用的消息,将返回一个空的批次。

ServiceBusReceiverOptions 中设置 PrefetchCount 有助于平滑批量大小。也就是说,重要的是要注意预取队列中的消息持有锁并且不会自动更新,因此发现预取计数过高将导致看到过期的锁。

在您的示例中,最好的方法可能是重复执行接收循环,直到您连续看到 1 个(或更多?)个空批次。这将是队列为空的有力指标。