为什么 Peek() 和 Receive() returns 来自服务总线队列的不同消息?
Why Peek() and Receive() returns different messages from Service Bus Queue?
我正在使用 Peek() 方法查看来自 Azure 服务总线队列的消息。
然后我根据我的应用程序级别条件“延迟”消息。以下是我的操作片段:
msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
message = msgClient.Peek(); // Got a message
if (message.MessageId == "xxx")
{
if(message.State == MessageState.Active)
{
BrokeredMessage _message = null;
_message = msgClient.Receive(); // Getting a different message though there are no more than 1 listener :(
if (_message != null){
_message.Defer();
_message.Abandon();
}
}
else
{
// Perform some operation with deferred state message
}
}
这里,msgClient.Peek() 和 msgClient.Receive() returns 不同的消息。所以我被推到了推迟错误信息的境地。我该如何解决?
最可能的原因是由于多个并发接收者在查看和接收之间,另一个消费者调用 receive() 呈现当前接收以获取下一条消息。当使用带有 non-partitioned 队列的单个侦听器时,您的代码对我来说按预期工作。另一个原因可能是队列是 'partitioned',其中可以查看和接收来自不同片段的精选消息。
像下面那样修改您的代码应该可以解决您在并发竞争消费者场景中的问题。查找内嵌评论进行解释。
var msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
var message = msgClient.Receive();
if (message != null)
{
// we got here means, there was one active message found and now message is locked from others view
if (message.MessageId == "xxx")
{
//with id 'xxx', defer
message.Defer();
}
// regardless of condition match, abandoning
message.Abandon();
}
else
{
// got here means there was no active Message above
// now see if we have deferred Message
var peeked = msgClient.Peek();
// The reason of checking state here again, in case there was new message arrived in the queue by now
if (peeked != null && peeked.MessageId == "xxx" && peeked.State == MessageState.Deferred)
{
try
{
// now receive it to lock
// yes, you can receive 'deferred' message by sequence number, but not 'active' message :)
message = msgClient.Receive(peeked.SequenceNumber);
// Perform some operation with deferred state message
message.Complete(); // don't forget it
}
catch (MessageNotFoundException)
{
// just in case message was peeked by competing consumer and received
}
}
}
注意:对 non-partitioned 实体的查看操作总是 returns 最旧的消息,但不是对分区实体。相反,它 returns 消息代理最先响应的分区之一中最旧的消息。无法保证返回的消息是所有分区中最旧的消息。
我正在使用 Peek() 方法查看来自 Azure 服务总线队列的消息。
然后我根据我的应用程序级别条件“延迟”消息。以下是我的操作片段:
msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
message = msgClient.Peek(); // Got a message
if (message.MessageId == "xxx")
{
if(message.State == MessageState.Active)
{
BrokeredMessage _message = null;
_message = msgClient.Receive(); // Getting a different message though there are no more than 1 listener :(
if (_message != null){
_message.Defer();
_message.Abandon();
}
}
else
{
// Perform some operation with deferred state message
}
}
这里,msgClient.Peek() 和 msgClient.Receive() returns 不同的消息。所以我被推到了推迟错误信息的境地。我该如何解决?
最可能的原因是由于多个并发接收者在查看和接收之间,另一个消费者调用 receive() 呈现当前接收以获取下一条消息。当使用带有 non-partitioned 队列的单个侦听器时,您的代码对我来说按预期工作。另一个原因可能是队列是 'partitioned',其中可以查看和接收来自不同片段的精选消息。
像下面那样修改您的代码应该可以解决您在并发竞争消费者场景中的问题。查找内嵌评论进行解释。
var msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
var message = msgClient.Receive();
if (message != null)
{
// we got here means, there was one active message found and now message is locked from others view
if (message.MessageId == "xxx")
{
//with id 'xxx', defer
message.Defer();
}
// regardless of condition match, abandoning
message.Abandon();
}
else
{
// got here means there was no active Message above
// now see if we have deferred Message
var peeked = msgClient.Peek();
// The reason of checking state here again, in case there was new message arrived in the queue by now
if (peeked != null && peeked.MessageId == "xxx" && peeked.State == MessageState.Deferred)
{
try
{
// now receive it to lock
// yes, you can receive 'deferred' message by sequence number, but not 'active' message :)
message = msgClient.Receive(peeked.SequenceNumber);
// Perform some operation with deferred state message
message.Complete(); // don't forget it
}
catch (MessageNotFoundException)
{
// just in case message was peeked by competing consumer and received
}
}
}
注意:对 non-partitioned 实体的查看操作总是 returns 最旧的消息,但不是对分区实体。相反,它 returns 消息代理最先响应的分区之一中最旧的消息。无法保证返回的消息是所有分区中最旧的消息。