Azure 服务总线队列消息在 Message.Abandon 后变成死信
Azure service bus queue Message deadlettered after Message.Abandon
我正在试用 Azure 服务总线队列。我有以下代码:
队列发送:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
if (!namespaceManager.QueueExists("Test"))
{
QueueDescription qD = new QueueDescription("Test");
qD.DefaultMessageTimeToLive = new TimeSpan(05, 00, 00);
qD.LockDuration = new TimeSpan(00, 02, 30);
qD.MaxSizeInMegabytes = 5120;
namespaceManager.CreateQueue(qD);
}
if (namespaceManager.QueueExists("Test"))
{
QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test", ReceiveMode.PeekLock);
var qMessage = Console.ReadLine();
using (MemoryStream strm = new MemoryStream(Encoding.UTF8.GetBytes(qMessage)))
{
BrokeredMessage bMsg = new BrokeredMessage(strm);
bMsg.MessageId = Guid.NewGuid().ToString();
bMsg.TimeToLive = new TimeSpan(05, 00, 00);
client.Send(bMsg);
Console.WriteLine("Message sent");
}
}
Console.ReadLine();
接收码:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
if (namespaceManager.QueueExists("Test"))
{
QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test",ReceiveMode.PeekLock);
if (client != null)
{
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromSeconds(31);
client.OnMessage((message) =>
{
Console.WriteLine(message.State.ToString());
Console.WriteLine("Message Id: " + message.MessageId);
Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
Console.WriteLine("Message: " + reader.ReadToEnd());
Console.WriteLine("***************");
message.Abandon();
});
Console.ReadLine();
}
}
我发现每当我调用 Abandon 时,邮件都会变成 DeadLettered。我的假设是它应该激活并且可以被另一个客户端接收。
您对BrokeredMessage.Abandon Api的理解是正确的。它旨在放弃在消息上获得的窥视锁定(但 不 放弃消息本身),因此,其他接收者可以使用它来接收消息。
以下是我们设想的查看锁定消息的不同状态:
基础知识优先
'Why':如果客户需要竞争消费者(作业队列)语义——他们需要多个工作人员同时处理来自队列的不同消息,Exactly -Once 保证 - 然后他们使用 ReceiveMode.PeekLock。在这个模型中,每个工作人员(队列接收者)都需要一种方式来将其当前消息(作业)的进度传达给其他工作人员。因此,brokeredMessage 提供 4 个函数 来表达状态。
'What':
- 如果消息被当前 Worker 成功处理 - 调用 BrokeredMessage.Complete()
- 如果 BrokeredMessage 不能被当前 worker 处理,并希望在另一个 Worker 上重试处理 - 那么,放弃消息。但是,这里的问题是:假设有 2 个工作人员,他们每个人都认为另一个可以处理此消息并调用放弃 - 很快他们将陷入重试处理该消息的无限循环!因此,为了避免这种情况,我们提供了一个名为 MaxDeliveryCount on QueueDescription 的配置。此设置保护消息从队列传递到接收方的次数限制。 在上面的示例中,每次您收到(和放弃)消息时,ServiceBus 服务上的 'deliveryCount' 都会递增。当它达到 10 时 - 消息已达到最大编号。交付,因此,将被死信。
- 如果当前接收者(工作人员)确定无法处理此消息,BrokeredMessage.DeadLetter()。这里的目标是让消费应用程序定期审核死信消息。
- 如果当前接收者(工作人员)无法处理此消息,但是,知道可以在稍后的某个时间点处理此消息BrokeredMessage.Defer()。
喂!
斯里
我正在试用 Azure 服务总线队列。我有以下代码:
队列发送:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
if (!namespaceManager.QueueExists("Test"))
{
QueueDescription qD = new QueueDescription("Test");
qD.DefaultMessageTimeToLive = new TimeSpan(05, 00, 00);
qD.LockDuration = new TimeSpan(00, 02, 30);
qD.MaxSizeInMegabytes = 5120;
namespaceManager.CreateQueue(qD);
}
if (namespaceManager.QueueExists("Test"))
{
QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test", ReceiveMode.PeekLock);
var qMessage = Console.ReadLine();
using (MemoryStream strm = new MemoryStream(Encoding.UTF8.GetBytes(qMessage)))
{
BrokeredMessage bMsg = new BrokeredMessage(strm);
bMsg.MessageId = Guid.NewGuid().ToString();
bMsg.TimeToLive = new TimeSpan(05, 00, 00);
client.Send(bMsg);
Console.WriteLine("Message sent");
}
}
Console.ReadLine();
接收码:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
if (namespaceManager.QueueExists("Test"))
{
QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test",ReceiveMode.PeekLock);
if (client != null)
{
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromSeconds(31);
client.OnMessage((message) =>
{
Console.WriteLine(message.State.ToString());
Console.WriteLine("Message Id: " + message.MessageId);
Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
Console.WriteLine("Message: " + reader.ReadToEnd());
Console.WriteLine("***************");
message.Abandon();
});
Console.ReadLine();
}
}
我发现每当我调用 Abandon 时,邮件都会变成 DeadLettered。我的假设是它应该激活并且可以被另一个客户端接收。
您对BrokeredMessage.Abandon Api的理解是正确的。它旨在放弃在消息上获得的窥视锁定(但 不 放弃消息本身),因此,其他接收者可以使用它来接收消息。
以下是我们设想的查看锁定消息的不同状态:
基础知识优先
'Why':如果客户需要竞争消费者(作业队列)语义——他们需要多个工作人员同时处理来自队列的不同消息,Exactly -Once 保证 - 然后他们使用 ReceiveMode.PeekLock。在这个模型中,每个工作人员(队列接收者)都需要一种方式来将其当前消息(作业)的进度传达给其他工作人员。因此,brokeredMessage 提供 4 个函数 来表达状态。
'What':
- 如果消息被当前 Worker 成功处理 - 调用 BrokeredMessage.Complete()
- 如果 BrokeredMessage 不能被当前 worker 处理,并希望在另一个 Worker 上重试处理 - 那么,放弃消息。但是,这里的问题是:假设有 2 个工作人员,他们每个人都认为另一个可以处理此消息并调用放弃 - 很快他们将陷入重试处理该消息的无限循环!因此,为了避免这种情况,我们提供了一个名为 MaxDeliveryCount on QueueDescription 的配置。此设置保护消息从队列传递到接收方的次数限制。 在上面的示例中,每次您收到(和放弃)消息时,ServiceBus 服务上的 'deliveryCount' 都会递增。当它达到 10 时 - 消息已达到最大编号。交付,因此,将被死信。
- 如果当前接收者(工作人员)确定无法处理此消息,BrokeredMessage.DeadLetter()。这里的目标是让消费应用程序定期审核死信消息。
- 如果当前接收者(工作人员)无法处理此消息,但是,知道可以在稍后的某个时间点处理此消息BrokeredMessage.Defer()。
喂! 斯里