在 Azure 服务总线上完成死信队列中的消息
Complete a message in a dead letter queue on Azure Service Bus
我希望能够从我的死信队列中删除选定的消息。
这是如何实现的?
我经常收到错误消息:
操作无法完成,因为 RecieveContext 为 Null
我已经尝试了所有我能想到和阅读的方法,这就是我现在的处境:
public void DeleteMessageFromDeadletterQueue<T>(string queueName, long sequenceNumber)
{
var client = GetQueueClient(queueName, true);
var messages = GetMessages(client);
foreach(var m in messages)
{
if(m.SequenceNumber == sequenceNumber)
{
m.Complete();
}
else
{
m.Abandon();
}
}
}
/// <summary>
/// Return a list of all messages in a Queue
/// </summary>
/// <param name="client"></param>
/// <returns></returns>
private IEnumerable<BrokeredMessage> GetMessages(QueueClient client)
{
var peekedMessages = client.PeekBatch(0, peekedMessageBatchCount).ToList();
bool getmore = peekedMessages.Count() == peekedMessageBatchCount ? true : false;
while (getmore)
{
var moreMessages = client.PeekBatch(peekedMessages.Last().SequenceNumber, peekedMessageBatchCount);
peekedMessages.AddRange(moreMessages);
getmore = moreMessages.Count() == peekedMessageBatchCount ? true : false;
}
return peekedMessages;
}
不知道为什么这似乎是一项如此困难的任务。
这里的问题是您调用了 PeekBatch,其中 return 是刚刚查看的消息。没有接收上下文,您可以使用它来完成或放弃消息。 Peek 和 PeekBatch 仅操作 return 消息并且根本不锁定它们,即使 receivedMode 设置为 PeekLock。这主要是为了浏览队列,但你不能对它们采取行动。请注意放弃和完成状态的文档,"must only be called on a message that has been received by using a receiver operating in Peek-Lock ReceiveMode." 这里并不清楚,但是 Peek 和 PeekBatch 操作不算在内,因为它们实际上没有获得接收上下文。这就是当您尝试调用放弃时它失败的原因。如果您确实找到了您正在寻找的那个,当您调用 Complete 时它会抛出一个不同的错误。
您想要做的是在 PeekBatch RecieveMode 中使用 ReceiveBatch 操作。这实际上会拉回一批消息,然后当您浏览它们以找到您想要的消息时,您实际上可以影响消息的完成。当您触发放弃时,它会立即将不是您想要的消息释放回队列。
如果您的死信队列非常小,通常这不会很糟糕。如果它真的很大,那么采用这种方法并不是最有效的。您将死信队列更像是一个堆并对其进行挖掘,而不是处理消息 "in order"。在处理需要手动干预的死信队列时,这种情况并不少见,但如果你有很多死信队列,那么最好将一些处理死信队列的东西放到不同类型的商店中,这样你可以更容易地找到和销毁消息,但仍然可以重新创建可以推送到不同队列以重新处理的消息。
可能还有其他选择,比如使用 Defer,如果你是手动写死字的话。参见 How to use the MessageReceiver.Receive method by sequenceNumber on ServiceBus。
我没有成功采纳 MikeWo 的建议,因为当我结合使用 ReceiveMode.PeekLock 实例化 DLQ QueueClient 和使用 ReceiveBatch 提取消息时,我使用的是 Receive/ReceiveBatch 的版本来请求消息它的序列号。
[旁白:在我的应用程序中,我查看所有消息并列出它们,并让另一个处理程序根据它的特定序列号将死信消息重新排队到主队列...]
但是在 DLQClient 上调用 Receive(long sequenceNumber) 或 ReceiveBatch(IEnumerable sequenceNumber) 总是抛出异常,"Failed to lock one or more specified messages. The message does not exist."(即使我只传递了 1 并且它肯定在队列中)。
此外,由于不明确的原因,使用 ReceiveBatch(int messageCount),无论使用什么值作为 messageCount,总是 returns 队列中的下 1 条消息。
最终对我有用的是以下内容:
QueueClient queueClient, deadLetterClient;
GetQueueClients(qname, ReceiveMode.PeekLock, out queueClient, out deadLetterClient);
BrokeredMessage msg = null;
var mapSequenceNumberToBrokeredMessage = new Dictionary<long, BrokeredMessage>();
while (msg == null)
{
#if UseReceive
var message = deadLetterClient.Receive();
#elif UseReceiveBatch
var messageEnumerable = deadLetterClient.ReceiveBatch(CnCountOfMessagesToPeek).ToList();
if ((messageEnumerable == null) || (messageEnumerable.Count == 0))
break;
else if (messageEnumerable.Count != 1)
throw new ApplicationException("Invalid expectation that there'd always be only 1 msg returned by ReceiveBatch");
// I've yet to get back more than one in the deadletter queue, but...
var message = messageEnumerable.First();
#endif
if (message.SequenceNumber == lMessageId)
{
msg = message;
break;
}
else if (mapSequenceNumberToBrokeredMessage.ContainsKey(message.SequenceNumber))
{
// this means it's started the list over, so we didn't find it...
break;
}
else
mapSequenceNumberToBrokeredMessage.Add(message.SequenceNumber, message);
message.Abandon();
}
if (msg == null)
throw new ApplicationException("Unable to find a message in the deadletter queue with the SequenceNumber: " + msgid);
var strMessage = GetMessage(msg);
var newMsg = new BrokeredMessage(strMessage);
queueClient.Send(newMsg);
msg.Complete();
我希望能够从我的死信队列中删除选定的消息。
这是如何实现的?
我经常收到错误消息:
操作无法完成,因为 RecieveContext 为 Null
我已经尝试了所有我能想到和阅读的方法,这就是我现在的处境:
public void DeleteMessageFromDeadletterQueue<T>(string queueName, long sequenceNumber)
{
var client = GetQueueClient(queueName, true);
var messages = GetMessages(client);
foreach(var m in messages)
{
if(m.SequenceNumber == sequenceNumber)
{
m.Complete();
}
else
{
m.Abandon();
}
}
}
/// <summary>
/// Return a list of all messages in a Queue
/// </summary>
/// <param name="client"></param>
/// <returns></returns>
private IEnumerable<BrokeredMessage> GetMessages(QueueClient client)
{
var peekedMessages = client.PeekBatch(0, peekedMessageBatchCount).ToList();
bool getmore = peekedMessages.Count() == peekedMessageBatchCount ? true : false;
while (getmore)
{
var moreMessages = client.PeekBatch(peekedMessages.Last().SequenceNumber, peekedMessageBatchCount);
peekedMessages.AddRange(moreMessages);
getmore = moreMessages.Count() == peekedMessageBatchCount ? true : false;
}
return peekedMessages;
}
不知道为什么这似乎是一项如此困难的任务。
这里的问题是您调用了 PeekBatch,其中 return 是刚刚查看的消息。没有接收上下文,您可以使用它来完成或放弃消息。 Peek 和 PeekBatch 仅操作 return 消息并且根本不锁定它们,即使 receivedMode 设置为 PeekLock。这主要是为了浏览队列,但你不能对它们采取行动。请注意放弃和完成状态的文档,"must only be called on a message that has been received by using a receiver operating in Peek-Lock ReceiveMode." 这里并不清楚,但是 Peek 和 PeekBatch 操作不算在内,因为它们实际上没有获得接收上下文。这就是当您尝试调用放弃时它失败的原因。如果您确实找到了您正在寻找的那个,当您调用 Complete 时它会抛出一个不同的错误。
您想要做的是在 PeekBatch RecieveMode 中使用 ReceiveBatch 操作。这实际上会拉回一批消息,然后当您浏览它们以找到您想要的消息时,您实际上可以影响消息的完成。当您触发放弃时,它会立即将不是您想要的消息释放回队列。
如果您的死信队列非常小,通常这不会很糟糕。如果它真的很大,那么采用这种方法并不是最有效的。您将死信队列更像是一个堆并对其进行挖掘,而不是处理消息 "in order"。在处理需要手动干预的死信队列时,这种情况并不少见,但如果你有很多死信队列,那么最好将一些处理死信队列的东西放到不同类型的商店中,这样你可以更容易地找到和销毁消息,但仍然可以重新创建可以推送到不同队列以重新处理的消息。
可能还有其他选择,比如使用 Defer,如果你是手动写死字的话。参见 How to use the MessageReceiver.Receive method by sequenceNumber on ServiceBus。
我没有成功采纳 MikeWo 的建议,因为当我结合使用 ReceiveMode.PeekLock 实例化 DLQ QueueClient 和使用 ReceiveBatch 提取消息时,我使用的是 Receive/ReceiveBatch 的版本来请求消息它的序列号。
[旁白:在我的应用程序中,我查看所有消息并列出它们,并让另一个处理程序根据它的特定序列号将死信消息重新排队到主队列...]
但是在 DLQClient 上调用 Receive(long sequenceNumber) 或 ReceiveBatch(IEnumerable sequenceNumber) 总是抛出异常,"Failed to lock one or more specified messages. The message does not exist."(即使我只传递了 1 并且它肯定在队列中)。
此外,由于不明确的原因,使用 ReceiveBatch(int messageCount),无论使用什么值作为 messageCount,总是 returns 队列中的下 1 条消息。
最终对我有用的是以下内容:
QueueClient queueClient, deadLetterClient;
GetQueueClients(qname, ReceiveMode.PeekLock, out queueClient, out deadLetterClient);
BrokeredMessage msg = null;
var mapSequenceNumberToBrokeredMessage = new Dictionary<long, BrokeredMessage>();
while (msg == null)
{
#if UseReceive
var message = deadLetterClient.Receive();
#elif UseReceiveBatch
var messageEnumerable = deadLetterClient.ReceiveBatch(CnCountOfMessagesToPeek).ToList();
if ((messageEnumerable == null) || (messageEnumerable.Count == 0))
break;
else if (messageEnumerable.Count != 1)
throw new ApplicationException("Invalid expectation that there'd always be only 1 msg returned by ReceiveBatch");
// I've yet to get back more than one in the deadletter queue, but...
var message = messageEnumerable.First();
#endif
if (message.SequenceNumber == lMessageId)
{
msg = message;
break;
}
else if (mapSequenceNumberToBrokeredMessage.ContainsKey(message.SequenceNumber))
{
// this means it's started the list over, so we didn't find it...
break;
}
else
mapSequenceNumberToBrokeredMessage.Add(message.SequenceNumber, message);
message.Abandon();
}
if (msg == null)
throw new ApplicationException("Unable to find a message in the deadletter queue with the SequenceNumber: " + msgid);
var strMessage = GetMessage(msg);
var newMsg = new BrokeredMessage(strMessage);
queueClient.Send(newMsg);
msg.Complete();