Azure ServiceBus AbandonMessageAsync 在不一致的时间发布消息
Azure ServiceBus AbandonMessageAsync releasing message at inconsistent times
我需要检查死信队列,如果存在某些情况(比如超过 30 天),我想将它存档到某个数据存储(而不是删除它)。所以我要抓取消息,如果它满足这个条件,将它保存到某个存储和 complete/delete 消息,如果不满足,则放弃它。我有一个控制台应用程序,我在其中从 dlq 获取消息,它似乎可以工作,但如果我 运行 一遍又一遍,我会看到返回的消息数量不一致。它将所有这些都进行几次迭代(在我的示例中为 7),但随后它将开始仅获得 6、0 或 1,并最终返回到 dql 中的全部数量(例如 30 秒后我认为这是 peek lock 的默认锁定期)。我会假设每次我 运行 这个,我应该得到所有消息,因为我放弃了 运行 之前的消息。
我正在使用 Azure.Messaging.ServiceBus 7.8.1,您似乎只是将消息对象传递给放弃方法。如果有人有什么建议那就太好了!
github 中的代码:https://github.com/ndn2323/bustest
using Azure.Messaging.ServiceBus;
using System.Text;
namespace BusReceiver
{
public class TaskRunner
{
public TaskRunner() { }
public async Task Run() {
const string DLQPATH = "/$deadletterqueue";
var maxMsgCount = 50;
var connectionString = "[ConnectionString]";
var topicName = "testtopic1";
var subscriberName = "testsub1";
var subscriberDlqName = subscriberName + DLQPATH;
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusReceiverOptions();
options.ReceiveMode = ServiceBusReceiveMode.PeekLock;
var receiver = client.CreateReceiver(topicName, subscriberName, options);
var receiverDlq = client.CreateReceiver(topicName, subscriberDlqName, options);
Log("Starting receive from regular queue");
var msgList = await receiver.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgList.Count.ToString() + " messages found");
foreach (var msg in msgList)
{
await receiver.DeadLetterMessageAsync(msg);
}
Log("Starting receive from dead letter queue");
var msgListDlq = await receiverDlq.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgListDlq.Count.ToString() + " messages found in dlq");
foreach (var msg in msgListDlq) {
Log("MessageId: " + msg.MessageId + " Body: " + Encoding.ASCII.GetString(msg.Body));
// if some condition, archieve message to some data store, else abandon it to be picked up again
// for this test I'm abandoning all messages
await receiverDlq.AbandonMessageAsync(msg);
}
await receiver.CloseAsync();
await receiverDlq.CloseAsync();
}
private void Log(string msg) {
Console.WriteLine(DateTime.Now.ToString() + ": " + msg);
}
}
}
输出示例:
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:36 PM: Starting receive from regular queue
5/29/2022 11:45:37 PM: 0 messages found
5/29/2022 11:45:37 PM: Starting receive from dead letter queue
5/29/2022 11:45:37 PM: 7 messages found in dlq
5/29/2022 11:45:37 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:42 PM: Starting receive from regular queue
5/29/2022 11:45:43 PM: 0 messages found
5/29/2022 11:45:43 PM: Starting receive from dead letter queue
5/29/2022 11:45:43 PM: 7 messages found in dlq
5/29/2022 11:45:43 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:48 PM: Starting receive from regular queue
5/29/2022 11:45:49 PM: 0 messages found
5/29/2022 11:45:49 PM: Starting receive from dead letter queue
5/29/2022 11:45:49 PM: 1 messages found in dlq
5/29/2022 11:45:49 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:46:03 PM: Starting receive from regular queue
5/29/2022 11:46:04 PM: 0 messages found
5/29/2022 11:46:04 PM: Starting receive from dead letter queue
5/29/2022 11:46:04 PM: 1 messages found in dlq
5/29/2022 11:46:04 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
由于网络、服务和您的应用程序的变化,在调用 ReceiveMessagesAsync
时看到返回大小不一致的批次是正常的。
接收时,没有最小批量。接收方将向 link 添加足够的信用以允许 maxMessageCount
从服务流出,但不会等待尝试构建该大小的批次。一旦从服务中传输了任何消息,它们将作为批次返回。因为您指定了 maxWaitTime
,如果在该时间内服务上没有可用的消息,将返回一个空的批次。
在 ServiceBusReceiverOptions
中设置 PrefetchCount 有助于平滑批量大小。也就是说,重要的是要注意预取队列中的消息持有锁并且不会自动更新,因此发现预取计数过高将导致看到过期的锁。
在您的示例中,最好的方法可能是重复执行接收循环,直到您连续看到 1 个(或更多?)个空批次。这将是队列为空的有力指标。
我需要检查死信队列,如果存在某些情况(比如超过 30 天),我想将它存档到某个数据存储(而不是删除它)。所以我要抓取消息,如果它满足这个条件,将它保存到某个存储和 complete/delete 消息,如果不满足,则放弃它。我有一个控制台应用程序,我在其中从 dlq 获取消息,它似乎可以工作,但如果我 运行 一遍又一遍,我会看到返回的消息数量不一致。它将所有这些都进行几次迭代(在我的示例中为 7),但随后它将开始仅获得 6、0 或 1,并最终返回到 dql 中的全部数量(例如 30 秒后我认为这是 peek lock 的默认锁定期)。我会假设每次我 运行 这个,我应该得到所有消息,因为我放弃了 运行 之前的消息。
我正在使用 Azure.Messaging.ServiceBus 7.8.1,您似乎只是将消息对象传递给放弃方法。如果有人有什么建议那就太好了!
github 中的代码:https://github.com/ndn2323/bustest
using Azure.Messaging.ServiceBus;
using System.Text;
namespace BusReceiver
{
public class TaskRunner
{
public TaskRunner() { }
public async Task Run() {
const string DLQPATH = "/$deadletterqueue";
var maxMsgCount = 50;
var connectionString = "[ConnectionString]";
var topicName = "testtopic1";
var subscriberName = "testsub1";
var subscriberDlqName = subscriberName + DLQPATH;
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusReceiverOptions();
options.ReceiveMode = ServiceBusReceiveMode.PeekLock;
var receiver = client.CreateReceiver(topicName, subscriberName, options);
var receiverDlq = client.CreateReceiver(topicName, subscriberDlqName, options);
Log("Starting receive from regular queue");
var msgList = await receiver.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgList.Count.ToString() + " messages found");
foreach (var msg in msgList)
{
await receiver.DeadLetterMessageAsync(msg);
}
Log("Starting receive from dead letter queue");
var msgListDlq = await receiverDlq.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgListDlq.Count.ToString() + " messages found in dlq");
foreach (var msg in msgListDlq) {
Log("MessageId: " + msg.MessageId + " Body: " + Encoding.ASCII.GetString(msg.Body));
// if some condition, archieve message to some data store, else abandon it to be picked up again
// for this test I'm abandoning all messages
await receiverDlq.AbandonMessageAsync(msg);
}
await receiver.CloseAsync();
await receiverDlq.CloseAsync();
}
private void Log(string msg) {
Console.WriteLine(DateTime.Now.ToString() + ": " + msg);
}
}
}
输出示例:
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:36 PM: Starting receive from regular queue
5/29/2022 11:45:37 PM: 0 messages found
5/29/2022 11:45:37 PM: Starting receive from dead letter queue
5/29/2022 11:45:37 PM: 7 messages found in dlq
5/29/2022 11:45:37 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:42 PM: Starting receive from regular queue
5/29/2022 11:45:43 PM: 0 messages found
5/29/2022 11:45:43 PM: Starting receive from dead letter queue
5/29/2022 11:45:43 PM: 7 messages found in dlq
5/29/2022 11:45:43 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:48 PM: Starting receive from regular queue
5/29/2022 11:45:49 PM: 0 messages found
5/29/2022 11:45:49 PM: Starting receive from dead letter queue
5/29/2022 11:45:49 PM: 1 messages found in dlq
5/29/2022 11:45:49 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:46:03 PM: Starting receive from regular queue
5/29/2022 11:46:04 PM: 0 messages found
5/29/2022 11:46:04 PM: Starting receive from dead letter queue
5/29/2022 11:46:04 PM: 1 messages found in dlq
5/29/2022 11:46:04 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
由于网络、服务和您的应用程序的变化,在调用 ReceiveMessagesAsync
时看到返回大小不一致的批次是正常的。
接收时,没有最小批量。接收方将向 link 添加足够的信用以允许 maxMessageCount
从服务流出,但不会等待尝试构建该大小的批次。一旦从服务中传输了任何消息,它们将作为批次返回。因为您指定了 maxWaitTime
,如果在该时间内服务上没有可用的消息,将返回一个空的批次。
在 ServiceBusReceiverOptions
中设置 PrefetchCount 有助于平滑批量大小。也就是说,重要的是要注意预取队列中的消息持有锁并且不会自动更新,因此发现预取计数过高将导致看到过期的锁。
在您的示例中,最好的方法可能是重复执行接收循环,直到您连续看到 1 个(或更多?)个空批次。这将是队列为空的有力指标。