如何每隔 X 秒批量处理来自服务总线队列的消息
How to Bulk Process Messages from Service Bus Queue Every X Seconds
我的服务总线队列以前一次处理一条消息。 我想更改它以允许消息排队,这样我就可以每隔 X 秒批量处理它们。 我最初是这样做的:
var messagingFactorySettings = new MessagingFactorySettings
{
NetMessagingTransportSettings = {
BatchFlushInterval = TimeSpan.FromMilliseconds(10000) },
TokenProvider = credentials
};
现在我想接收消息,但似乎我必须执行这个无限的 while 循环才能接收消息:
while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)
{
foreach (var message in messages)
{ ...
据我了解,BatchFlushInterval 将允许请求建立 X 时间,这样我就不会一个接一个地接收消息,而是批量接收消息。因此我不确定为什么我不能像以前那样做:
myQueueClient.OnMessage((m) =>
{
但是批量版本:
myQueueClient.OnBulkMessage((listOfMessages) =>
{
我是不是遗漏了什么,或者不断轮询是实现此目的的唯一方法?我的 BatchFlushInterval 似乎也被忽略了。我预计它只会每 10 秒检查一次新消息,但它会立即收到第一批消息,并且任何收到的新消息也会立即得到处理。
假设我想每隔 X(即:1)秒从队列中拉出 Y 条消息(即:1000)并立即处理它们,我该怎么做? 为什么 BatchFlushInterval 没有任何影响?
似乎一个简单的Thread.Sleep(x)
是合适的。
我发现暂停直到某个总时间过去的问题很有趣,所以我开始并实现了一个秒表子类,使该问题可以更清晰、更易读的方式解决。
while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)
{
var sw = WaitableStopwatch.StartNew();
// ReceiveBatch() return IEnumerable<>. No need for .ToList().
foreach (var message in messages)
{
...
}
// If processing took less than 10 seconds, sleep
// for the remainder of that time span before getting
// the next batch.
sw.Wait(Timespan.FromSeconds(10));
}
/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch
{
/// <summary>
/// Initializes a new WaitableStopwatch instance, sets the elapsed
/// time property to zero, and starts measuring elapsed time.
/// </summary>
/// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
public static new WaitableStopwatch StartNew()
{
WaitableStopwatch sw = new WaitableStopwatch();
sw.Start();
return sw;
}
/// <summary>
/// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
/// </summary>
/// <param name="elapsedMilliseconds"></param>
public void Wait(int elapsedMilliseconds)
{
Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
}
/// <summary>
/// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
/// </summary>
/// <param name="elapsed"></param>
public void Wait(TimeSpan elapsed)
{
TimeSpan diff;
while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
{
Thread.Sleep(diff);
}
}
/// <summary>
/// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
/// </summary>
/// <param name="elapsedMilliseconds"></param>
public Task WaitAsync(int elapsedMilliseconds)
{
return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
}
/// <summary>
/// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
/// </summary>
/// <param name="elapsed"></param>
public async Task WaitAsync(TimeSpan elapsed)
{
TimeSpan diff;
while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
{
await Task.Delay(diff);
}
}
}
我的服务总线队列以前一次处理一条消息。 我想更改它以允许消息排队,这样我就可以每隔 X 秒批量处理它们。 我最初是这样做的:
var messagingFactorySettings = new MessagingFactorySettings
{
NetMessagingTransportSettings = {
BatchFlushInterval = TimeSpan.FromMilliseconds(10000) },
TokenProvider = credentials
};
现在我想接收消息,但似乎我必须执行这个无限的 while 循环才能接收消息:
while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)
{
foreach (var message in messages)
{ ...
据我了解,BatchFlushInterval 将允许请求建立 X 时间,这样我就不会一个接一个地接收消息,而是批量接收消息。因此我不确定为什么我不能像以前那样做:
myQueueClient.OnMessage((m) =>
{
但是批量版本:
myQueueClient.OnBulkMessage((listOfMessages) =>
{
我是不是遗漏了什么,或者不断轮询是实现此目的的唯一方法?我的 BatchFlushInterval 似乎也被忽略了。我预计它只会每 10 秒检查一次新消息,但它会立即收到第一批消息,并且任何收到的新消息也会立即得到处理。
假设我想每隔 X(即:1)秒从队列中拉出 Y 条消息(即:1000)并立即处理它们,我该怎么做? 为什么 BatchFlushInterval 没有任何影响?
似乎一个简单的Thread.Sleep(x)
是合适的。
我发现暂停直到某个总时间过去的问题很有趣,所以我开始并实现了一个秒表子类,使该问题可以更清晰、更易读的方式解决。
while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)
{
var sw = WaitableStopwatch.StartNew();
// ReceiveBatch() return IEnumerable<>. No need for .ToList().
foreach (var message in messages)
{
...
}
// If processing took less than 10 seconds, sleep
// for the remainder of that time span before getting
// the next batch.
sw.Wait(Timespan.FromSeconds(10));
}
/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch
{
/// <summary>
/// Initializes a new WaitableStopwatch instance, sets the elapsed
/// time property to zero, and starts measuring elapsed time.
/// </summary>
/// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
public static new WaitableStopwatch StartNew()
{
WaitableStopwatch sw = new WaitableStopwatch();
sw.Start();
return sw;
}
/// <summary>
/// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
/// </summary>
/// <param name="elapsedMilliseconds"></param>
public void Wait(int elapsedMilliseconds)
{
Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
}
/// <summary>
/// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
/// </summary>
/// <param name="elapsed"></param>
public void Wait(TimeSpan elapsed)
{
TimeSpan diff;
while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
{
Thread.Sleep(diff);
}
}
/// <summary>
/// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
/// </summary>
/// <param name="elapsedMilliseconds"></param>
public Task WaitAsync(int elapsedMilliseconds)
{
return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
}
/// <summary>
/// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
/// </summary>
/// <param name="elapsed"></param>
public async Task WaitAsync(TimeSpan elapsed)
{
TimeSpan diff;
while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
{
await Task.Delay(diff);
}
}
}