C# 在 asp.net 核心托管服务消费者中使用多个并行任务处理 IAsyncEnumerable 项目
C# process IAsyncEnumerable items with several parallel tasks in asp.net core hosted service consumer
我正在使用托管服务在 asp.net 核心应用程序中实施生产者-消费者。我能够让它工作到消费者同步处理来自 _recordProcessingChannel.ReadAllAsync()
的项目。
我正在尝试将 _recordProcessingChannel.ReadAllAsync()
的结果拆分为多个并行任务。
例如:我从频道读取了 10000 个项目,我想将这项工作分成 4 个单独的任务,并为每个 ICMService 处理 2500 个项目。
消费者:
await foreach (var record in _recordProcessingChannel.ReadAllAsync())
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<ICMService>();
processor.UploadRecord(record);
}
reader:
public IAsyncEnumerable<RecordData> ReadAllAsync(CancellationToken ct = default) => _channel.Reader.ReadAllAsync(ct);
提前感谢您提供的任何帮助
您可以启动所需数量的处理任务并使用BlockingCollection
到enqueue
工作。像这样:
// my dummy async enumerable
public async IAsyncEnumerable<int> ReadAllAsync()
{
for (int i = 0; i < 3; i++)
{
yield return i*3 + 1;
yield return i*3 + 2;
yield return i*3 + 3;
await Task.Delay(200);
}
yield return 777;
}
var collection = new BlockingCollection<int>();
// start "processors"
var tasks = Enumerable.Range(0, 4)
.Select(i =>
Task.Run(() =>
{
while (!collection.IsCompleted)
{
int? data = null;
try
{
data = collection.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
// simulate processing
Thread.Sleep(400);
Console.WriteLine(data.Value);
}
}
Console.WriteLine("No more items to take.");
}))
.ToArray();
await foreach (var record in ReadAllAsync())
{
collection.Add(record);
}
collection.CompleteAdding(); // signal that enqueuing has finished
await Task.WhenAll(tasks);
引入一些异步信号(例如 SemaphoreSlim.WaitAsync
或 AsyncManualResetEvent.WaitAsync
)可以改进这一点,因此消费者线程在等待新项目时不会消耗 CPU。例如:
var collection = new ConcurrentQueue<int>();
var semaphore = new SemaphoreSlim(0, 4);
var cts = new CancellationTokenSource(); // to signal that queueing is completed
var tasks = Enumerable.Range(0, 4)
.Select(i =>
Task.Run(async () =>
{
while (true)
{
if (cts.Token.IsCancellationRequested && !collection.Any())
{
Console.WriteLine("No more items to take.");
break;
}
else if (!cts.Token.IsCancellationRequested)
{
try
{
await semaphore.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
//ignore
}
}
if(collection.TryDequeue(out var data))
{
//simulate work
Thread.Sleep(400);
Console.WriteLine(data);
}
}
}))
.ToArray();
await foreach (var record in ReadAllAsync())
{
collection.Enqueue(record);
semaphore.Release();
}
cts.Cancel(); // addition completed.
await Task.WhenAll(tasks);
Console.WriteLine("end");
我正在使用托管服务在 asp.net 核心应用程序中实施生产者-消费者。我能够让它工作到消费者同步处理来自 _recordProcessingChannel.ReadAllAsync()
的项目。
我正在尝试将 _recordProcessingChannel.ReadAllAsync()
的结果拆分为多个并行任务。
例如:我从频道读取了 10000 个项目,我想将这项工作分成 4 个单独的任务,并为每个 ICMService 处理 2500 个项目。
消费者:
await foreach (var record in _recordProcessingChannel.ReadAllAsync())
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<ICMService>();
processor.UploadRecord(record);
}
reader:
public IAsyncEnumerable<RecordData> ReadAllAsync(CancellationToken ct = default) => _channel.Reader.ReadAllAsync(ct);
提前感谢您提供的任何帮助
您可以启动所需数量的处理任务并使用BlockingCollection
到enqueue
工作。像这样:
// my dummy async enumerable
public async IAsyncEnumerable<int> ReadAllAsync()
{
for (int i = 0; i < 3; i++)
{
yield return i*3 + 1;
yield return i*3 + 2;
yield return i*3 + 3;
await Task.Delay(200);
}
yield return 777;
}
var collection = new BlockingCollection<int>();
// start "processors"
var tasks = Enumerable.Range(0, 4)
.Select(i =>
Task.Run(() =>
{
while (!collection.IsCompleted)
{
int? data = null;
try
{
data = collection.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
// simulate processing
Thread.Sleep(400);
Console.WriteLine(data.Value);
}
}
Console.WriteLine("No more items to take.");
}))
.ToArray();
await foreach (var record in ReadAllAsync())
{
collection.Add(record);
}
collection.CompleteAdding(); // signal that enqueuing has finished
await Task.WhenAll(tasks);
引入一些异步信号(例如 SemaphoreSlim.WaitAsync
或 AsyncManualResetEvent.WaitAsync
)可以改进这一点,因此消费者线程在等待新项目时不会消耗 CPU。例如:
var collection = new ConcurrentQueue<int>();
var semaphore = new SemaphoreSlim(0, 4);
var cts = new CancellationTokenSource(); // to signal that queueing is completed
var tasks = Enumerable.Range(0, 4)
.Select(i =>
Task.Run(async () =>
{
while (true)
{
if (cts.Token.IsCancellationRequested && !collection.Any())
{
Console.WriteLine("No more items to take.");
break;
}
else if (!cts.Token.IsCancellationRequested)
{
try
{
await semaphore.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
//ignore
}
}
if(collection.TryDequeue(out var data))
{
//simulate work
Thread.Sleep(400);
Console.WriteLine(data);
}
}
}))
.ToArray();
await foreach (var record in ReadAllAsync())
{
collection.Enqueue(record);
semaphore.Release();
}
cts.Cancel(); // addition completed.
await Task.WhenAll(tasks);
Console.WriteLine("end");