SubscriptionClient.RecieveBatch 未检索所有代理消息

SubscriptionClient.RecieveBatch not retrieving all the brokered messages

我有一个控制台应用程序可以读取 Azure 服务总线上订阅中存在的所有中转消息。我在那里有大约 3500 条消息。这是我读取消息的代码:

SubscriptionClient client = messagingFactory.CreateSubscriptionClient(topic, subscription);   
long count = namespaceManager.GetSubscription(topic, subscription).MessageCountDetails.ActiveMessageCount;
Console.WriteLine("Total messages to process : {0}", count.ToString()); //Here the number is showing correctly
IEnumerable<BrokeredMessage> dlIE = null;
dlIE = client.ReceiveBatch(Convert.ToInt32(count));

当我执行代码时,在 dlIE 中,我只能看到 256 条消息。我也试过像这样 client.PrefetchCount 给出预取计数,但它也只是 returns 256 条消息。

我认为可以在 time.However 检索的消息数量有一些限制 RecieveBatch 方法的 msdn 页面上没有提到这样的事情。如何一次检索所有消息?

注:

  1. 我只想读取消息,然后让它存在于服务总线上。因此我不使用message.complete方法。

  2. 我无法从服务总线中删除并重新创建 topic/subscription。

编辑:

我像这样使用 PeekBatch 而不是 ReceiveBatch:

    IEnumerable<BrokeredMessage> dlIE = null;
                            List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count)); // count is the total number of messages in the subscription.
  bmList.AddRange(dlIE);
  i = dlIE.Count();
 if(i < count)
  {           
 while(i < count)
  {
  IEnumerable<BrokeredMessage> dlTemp = null;
   dlTemp = subsciptionClient.PeekBatch(i, Convert.ToInt32(count));
    bmList.AddRange(dlTemp);
    i = i + dlTemp.Count();
    }
    }

我的订阅中有 3255 条消息。第一次调用 peekBatch 时,它会收到 250 条消息。所以它进入 PeekBatch(250,3225) 的 while 循环。每次只收到250条消息。我在输出列表中得到的最终消息总数是 3500,其中有重复项。我无法理解这是怎么发生的。

您写的主题是偶然分区的吗?当您从分区实体接收消息时,它一次只会从一个分区中获取。 From MSDN:

"When a client wants to receive a message from a partitioned queue, or from a subscription of a partitioned topic, Service Bus queries all fragments for messages, then returns the first message that is returned from any of the messaging stores to the receiver. Service Bus caches the other messages and returns them when it receives additional receive requests. A receiving client is not aware of the partitioning; the client-facing behavior of a partitioned queue or topic (for example, read, complete, defer, deadletter, prefetching) is identical to the behavior of a regular entity."

假设即使使用非分区实体,您也可以使用 Receive 或 Peek 方法一次获得所有消息,这可能不是一个好主意。以更小的批次循环遍历消息会更有效,特别是如果您的消息具有合适的大小或大小不确定。

由于您实际上并不想从队列中删除消息,因此我建议使用 PeekBatch 而不是 ReceiveBatch。这使您可以获取消息的副本而不锁定它。我强烈建议将相同的 SubscriptionClient 与 PeekBatch 结合使用的循环。通过在引擎盖下使用相同的 SubscriptionClient 和 PeekBatch,最后提取的序列号将被保留,因为您循环遍历它应该跟踪并遍历整个队列。这基本上可以让您通读整个队列。

我想通了。订阅客户端会记住它检索的最后一批,并在再次调用时检索下一批。

所以代码是:

    IEnumerable<BrokeredMessage> dlIE = null;
List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;  
  while (i < count)
  {
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count));
   bmList.AddRange(dlIE);
   i = i + dlIE.Count();
  }

感谢麦克沃指导

注意: 一次可以查看的消息数量似乎有某种大小限制。我尝试了不同的订阅,每次获取的消息数量都不同。

我遇到了一个类似的问题,client.ReceiveBatchAsync(....) 不会从 azure 服务总线中的订阅检索任何数据。

经过一番挖掘,我发现每个订阅者都有一个位来启用批处理操作。这只能通过 powershell 启用。下面是我使用的命令:

$subObject = Get-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName' -SubscriptionName '#subscriptionName' 
$subObject.EnableBatchedOperations = $True

Set-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName'-SubscriptionObj $subObject

可以找到更多详细信息 here。虽然它仍然没有加载所有消息,但至少它开始清除队列。据我所知,批量大小参数只是作为对服务总线的建议而不是规则。

希望对您有所帮助!