使用 async/await 快速处理队列
Processing a queue quickly with async/await
我经常有这样的场景,我想把需要处理的数据排队,然后在后台处理,这样主程序才能继续。我写了以下 class 来尝试帮助解决这个问题:
public class ProcessQueue<T>
{
#region Properties
public Task ProcessingComplete => tcs.Task;
#endregion
#region Variables
private ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
private bool processingQueue = false;
private bool stopped = false;
private Func<T, Task> processItemFunc;
private TaskCompletionSource tcs = new TaskCompletionSource();
#endregion
public ProcessQueue(Func<T, Task> processItemFunc)
{
this.processItemFunc = processItemFunc;
}
public void EnqueueAndProcess(T data)
{
if (stopped)
{
return;
}
queue.Enqueue(data);
ProcessQueuedItems();
}
public void Stop()
{
stopped = true;
queue.Clear();
tcs.SetResult();
}
private void ProcessQueuedItems()
{
if (processingQueue)
{
return;
}
processingQueue = true;
tcs = new TaskCompletionSource();
new TaskFactory().StartNew(async () =>
{
while (queue.Count > 0)
{
if (queue.TryDequeue(out var item))
{
await processItemFunc(item);
}
}
processingQueue = false;
tcs.SetResult();
}, TaskCreationOptions.LongRunning);
}
}
看起来相当简单,所以我为它写了一些单元测试,例如:
[TestClass]
public class ProcessQueueTests
{
private ProcessQueue<int> sut;
private List<int> processedOutput = new();
[TestInitialize]
public void Initialise()
{
sut = new ProcessQueue<int>(ProcessAsync);
}
[TestMethod]
public async Task Test1()
{
for (int i = 0; i < 1000; i++)
{
sut.EnqueueAndProcess(i);
}
await sut.ProcessingComplete;
Assert.AreEqual(1000, processedOutput.Count);
for (int i = 0; i < processedOutput.Count; i++)
{
Assert.AreEqual(i, processedOutput[i]);
}
}
private async Task ProcessAsync(int value)
{
await Task.Delay(1);
processedOutput.Add(value);
}
}
我在那里设置了延迟以查看它有何影响,事实上,对于上述测试,它一直需要将近 16 秒才能完成,而我本希望稍微超过 1 秒。
为什么要花这么长时间?怎样才能更快?
Task.Delay
只是非常低的分辨率。正如 documentation 所述:
This method depends on the system clock. This means that the time
delay will approximately equal the resolution of the system clock if
the millisecondsDelay argument is less than the resolution of the
system clock, which is approximately 15 milliseconds on Windows
systems.
您的论点 (1
) 少于 15 毫秒,因此会对其进行调整。你可以这样做:
var watch = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++) {
await Task.Delay(1);
}
watch.Stop();
Console.WriteLine($"Took {watch.ElapsedMilliseconds}ms");
重现。 1000 * 15ms = 15 秒,这与您报告的内容有关。
我经常有这样的场景,我想把需要处理的数据排队,然后在后台处理,这样主程序才能继续。我写了以下 class 来尝试帮助解决这个问题:
public class ProcessQueue<T>
{
#region Properties
public Task ProcessingComplete => tcs.Task;
#endregion
#region Variables
private ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
private bool processingQueue = false;
private bool stopped = false;
private Func<T, Task> processItemFunc;
private TaskCompletionSource tcs = new TaskCompletionSource();
#endregion
public ProcessQueue(Func<T, Task> processItemFunc)
{
this.processItemFunc = processItemFunc;
}
public void EnqueueAndProcess(T data)
{
if (stopped)
{
return;
}
queue.Enqueue(data);
ProcessQueuedItems();
}
public void Stop()
{
stopped = true;
queue.Clear();
tcs.SetResult();
}
private void ProcessQueuedItems()
{
if (processingQueue)
{
return;
}
processingQueue = true;
tcs = new TaskCompletionSource();
new TaskFactory().StartNew(async () =>
{
while (queue.Count > 0)
{
if (queue.TryDequeue(out var item))
{
await processItemFunc(item);
}
}
processingQueue = false;
tcs.SetResult();
}, TaskCreationOptions.LongRunning);
}
}
看起来相当简单,所以我为它写了一些单元测试,例如:
[TestClass]
public class ProcessQueueTests
{
private ProcessQueue<int> sut;
private List<int> processedOutput = new();
[TestInitialize]
public void Initialise()
{
sut = new ProcessQueue<int>(ProcessAsync);
}
[TestMethod]
public async Task Test1()
{
for (int i = 0; i < 1000; i++)
{
sut.EnqueueAndProcess(i);
}
await sut.ProcessingComplete;
Assert.AreEqual(1000, processedOutput.Count);
for (int i = 0; i < processedOutput.Count; i++)
{
Assert.AreEqual(i, processedOutput[i]);
}
}
private async Task ProcessAsync(int value)
{
await Task.Delay(1);
processedOutput.Add(value);
}
}
我在那里设置了延迟以查看它有何影响,事实上,对于上述测试,它一直需要将近 16 秒才能完成,而我本希望稍微超过 1 秒。
为什么要花这么长时间?怎样才能更快?
Task.Delay
只是非常低的分辨率。正如 documentation 所述:
This method depends on the system clock. This means that the time delay will approximately equal the resolution of the system clock if the millisecondsDelay argument is less than the resolution of the system clock, which is approximately 15 milliseconds on Windows systems.
您的论点 (1
) 少于 15 毫秒,因此会对其进行调整。你可以这样做:
var watch = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++) {
await Task.Delay(1);
}
watch.Stop();
Console.WriteLine($"Took {watch.ElapsedMilliseconds}ms");
重现。 1000 * 15ms = 15 秒,这与您报告的内容有关。