服务总线会话 ReceiveBatchAsync 仅接收 1 条消息

Service Bus Session ReceiveBatchAsync only receiving 1 message

我正在使用启用了会话的服务总线队列,并且我正在发送 5 条具有相同 SessionId 的消息。我的接收代码使用 AcceptMessageSessionAsync 来获取会话锁,以便它将接收该会话的所有消息。然后它使用 session.ReceiveBatchAsync 尝试获取会话的所有消息。但是,它似乎只收到第一条消息,然后在进行另一次尝试时,它会收到所有其他消息。您应该能够看到,尽管所有这些消息都是一次发送的,但两个批次之间有将近一分钟的间隔:

Session started:AE8DC914-8693-4110-8BAE-244E42A302D5
Message received:AE8DC914-8693-4110-8BAE-244E42A302D5_1_08:03:03.36523
Session started:AE8DC914-8693-4110-8BAE-244E42A302D5
Message received:AE8DC914-8693-4110-8BAE-244E42A302D5_2_08:03:04.22964
Message received:AE8DC914-8693-4110-8BAE-244E42A302D5_3_08:03:04.29515
Message received:AE8DC914-8693-4110-8BAE-244E42A302D5_4_08:03:04.33959
Message received:AE8DC914-8693-4110-8BAE-244E42A302D5_5_08:03:04.39587

我处理这些的代码是 WebJob 中的一个函数:

[NoAutomaticTrigger]
public static async Task MessageHandlingLoop(TextWriter log, CancellationToken cancellationToken)
{
    var connectionString = ConfigurationManager.ConnectionStrings["ServiceBusListen"].ConnectionString;
    var client = QueueClient.CreateFromConnectionString(connectionString, "myqueue");

    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session = null;

        try
        {
            session = await client.AcceptMessageSessionAsync(TimeSpan.FromMinutes(1));

            log.WriteLine("Session started:" + session.SessionId);
            foreach (var msg in await session.ReceiveBatchAsync(100, TimeSpan.FromSeconds(5)))
            {
                log.WriteLine("Message received:" + msg.MessageId);
                msg.Complete();
            }
        }
        catch (TimeoutException)
        {
            log.WriteLine("Timeout occurred");
            await Task.Delay(5000, cancellationToken);
        }
        catch (Exception ex)
        {
            log.WriteLine("Error:" + ex);
        }
    }
}

这是从我的 WebJob Main 调用的,使用:

JobHost host = new JobHost();
host.Start();
var task = host.CallAsync(typeof(Functions).GetMethod("MessageHandlingLoop"));
task.Wait();
host.Stop();

为什么我在 ReceiveBatchAsync 的第一次通话中没有收到所有消息?

Hillary Caituiro Monge 在 MSDN 论坛上回答了这个问题:https://social.msdn.microsoft.com/Forums/azure/en-US/9a84f319-7bc6-4ff8-b142-4fc1d5f1e2fa/service-bus-session-receivebatchasync-only-receiving-1-message?forum=servbus

Service Bus does not guarantee you will receive the message count you specify in receive batch even if your queue has them or more. Having say that, you can change your code to try to get the 100 messages in the first call, buy remember that your application should not assume that as a guaranteed behavior.

Below this line of code varclient = QueueClient.CreateFromConnectionString(connectionString, "myqueue");
add client.PrefetchCount = 100;

The reason that you are getting only 1 message at all times in the first call is due to that when you accept a session it may be also getting 1 prefetched message with it. Then when you do receive batch, the SB client will give you that 1 message.

不幸的是,我发现设置 PrefetchCount 没有影响,但给出的只收到一条消息的原因似乎很可能,所以我接受了它作为答案。