如何每隔 X 秒批量处理来自服务总线队列的消息

How to Bulk Process Messages from Service Bus Queue Every X Seconds

我的服务总线队列以前一次处理一条消息。 我想更改它以允许消息排队,这样我就可以每隔 X 秒批量处理它们。 我最初是这样做的:

var messagingFactorySettings = new MessagingFactorySettings
{
    NetMessagingTransportSettings = { 
        BatchFlushInterval = TimeSpan.FromMilliseconds(10000) },
        TokenProvider = credentials
};

现在我想接收消息,但似乎我必须执行这个无限的 while 循环才能接收消息:

while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)
{
    foreach (var message in messages)
    { ...

据我了解,BatchFlushInterval 将允许请求建立 X 时间,这样我就不会一个接一个地接收消息,而是批量接收消息。因此我不确定为什么我不能像以前那样做:

myQueueClient.OnMessage((m) =>
{

但是批量版本:

myQueueClient.OnBulkMessage((listOfMessages) =>
{

我是不是遗漏了什么,或者不断轮询是实现此目的的唯一方法?我的 BatchFlushInterval 似乎也被忽略了。我预计它只会每 10 秒检查一次新消息,但它会立即收到第一批消息,并且任何收到的新消息也会立即得到处理。

假设我想每隔 X(即:1)秒从队列中拉出 Y 条消息(即:1000)并立即处理它们,我该怎么做? 为什么 BatchFlushInterval 没有任何影响?

似乎一个简单的Thread.Sleep(x)是合适的。

我发现暂停直到某个总时间过去的问题很有趣,所以我开始并实现了一个秒表子类,使该问题可以更清晰、更易读的方式解决。

while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)
{
    var sw = WaitableStopwatch.StartNew();

    // ReceiveBatch() return IEnumerable<>. No need for .ToList().
    foreach (var message in messages)
    {
        ...
    }

    // If processing took less than 10 seconds, sleep
    // for the remainder of that time span before getting
    // the next batch.
    sw.Wait(Timespan.FromSeconds(10));
}



/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch
{
    /// <summary>
    /// Initializes a new WaitableStopwatch instance, sets the elapsed
    /// time property to zero, and starts measuring elapsed time.
    /// </summary>
    /// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
    public static new WaitableStopwatch StartNew()
    {
        WaitableStopwatch sw = new WaitableStopwatch();

        sw.Start();

        return sw;
    }

    /// <summary>
    /// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public void Wait(int elapsedMilliseconds)
    {
        Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public void Wait(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            Thread.Sleep(diff);
        }
    }

    /// <summary>
    /// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public Task WaitAsync(int elapsedMilliseconds)
    {
        return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public async Task WaitAsync(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            await Task.Delay(diff);
        }
    }
}