Parallel.ForEach MaxDegreeOfParallelism 奇怪的行为增加 "Chunking"
Parallel.ForEach MaxDegreeOfParallelism Strange Behavior with Increasing "Chunking"
我不确定这个标题是否有意义,这是我能想到的最好的,所以这是我的场景。
我有一个 ASP.NET 核心应用程序,我更多地用作 shell 和 DI 配置。在 Startup
中,它添加了一堆 IHostedService
作为单例,以及它们的依赖项,也作为单例,除了 SqlConnection
和 DbContext
的小例外,我们将之后。托管服务是一组类似的服务:
- 侦听来自 GPS 设备的传入报告并将其放入侦听缓冲区。
- 从侦听缓冲区中解析项目并放入解析缓冲区。
最终只有一个服务可以读取已解析的缓冲区并实际处理已解析的报告。它通过将从缓冲区中取出的报告传递给处理程序并等待它完成以移动到下一个处理程序来完成此操作。这在过去的一年里运作良好,但现在我们 运行 似乎陷入了可伸缩性问题,因为它一次处理一个报告,服务器上的平均处理时间为 62 毫秒,其中包括 Dapper 访问获取所需数据的数据库和保存更改的 EF Core 行程。
但是,如果处理程序决定报告的信息需要触发后台作业,那么我怀疑它需要 100 毫秒或更长时间才能完成。随着时间的推移,缓冲区填满的速度比处理程序处理的速度要快,以至于在处理数以千计的报告之前,即使不是数十万,也只能保留 10 秒。这是一个问题,因为通知被延迟,并且如果服务器在午夜重新启动时缓冲区仍然已满,则可能会丢失数据。
综上所述,我正在尝试弄清楚如何使处理并行进行。经过昨天的大量实验,我决定在使用 GetConsumingEnumerable()
的缓冲区上使用 Parallel.ForEach
。这很好用,除了我不知道该怎么做甚至打电话的奇怪行为。当缓冲区被填满并且 ForEach
对其进行迭代时,它将开始将处理“分块”为不断增加的二的倍数。分块的大小受 MaxDegreeOfParallelism
设置的影响。例如(N# = 缓冲区中的下一份报告):
MDP = 1
- 一次N3 = 1个
- N6 = 一次2个
- N12 = 一次4个
- ...
MDP = 2
- N6 = 一次1个
- N12 = 一次2个
- N24 = 一次4个
- ...
MDP = 4
- N12 = 一次1个
- N24 = 一次2个
- N48 = 一次4个
- ...
MDP = 8(我的 CPU 核心数)
- N24 = 一次1个
- N48 = 一次2个
- N96 = 一次4个
- ...
这可以说比我现在的串行执行更糟糕,因为到一天结束时它会缓冲并等待,比方说,在实际处理它们之前有 50 万份报告。
有办法解决这个问题吗?我对 Parallel.ForEach
不是很有经验,所以从我的角度来看这是一种奇怪的行为。最终,我正在寻找一种方法来并行处理报告,只要它们在缓冲区中,所以如果有其他方法可以实现这一点,我会洗耳恭听。这大致就是我的代码。处理报告的处理程序确实使用 IServiceProvider
创建范围并获取 SqlConnection
和 DbContext
的实例。在此先感谢您的任何建议!
public sealed class GpsReportService :
IHostedService {
private readonly GpsReportBuffer _buffer;
private readonly Config _config;
private readonly GpsReportHandler _handler;
private readonly ILogger _logger;
public GpsReportService(
GpsReportBuffer buffer,
Config config,
GpsReportHandler handler,
ILogger<GpsReportService> logger) {
_buffer = buffer;
_config = config;
_handler = handler;
_logger = logger;
}
public Task StartAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Report Service => starting");
Task.Run(Process, cancellationToken).ConfigureAwait(false);// Is ConfigureAwait here correct usage?
_logger.LogInformation("GPS Report Service => started");
return Task.CompletedTask;
}
public Task StopAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Parsing Service => stopping");
_buffer.CompleteAdding();
_logger.LogInformation("GPS Parsing Service => stopped");
return Task.CompletedTask;
}
// ========================================================================
// Utilities
// ========================================================================
private void Process() {
var options = new ParallelOptions {
MaxDegreeOfParallelism = 8,
CancellationToken = CancellationToken.None
};
Parallel.ForEach(_buffer.GetConsumingEnumerable(), options, async report => {
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
});
}
private async Task ProcessAsync() {
while (!_buffer.IsCompleted) {
try {
var took = _buffer.TryTake(out var report, 10);
if (!took) {
continue;
}
await _handler.ProcessAsync(report!).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
}
}
}
public sealed class GpsReportBuffer :
BlockingCollection<GpsReport> {
}
您不能将 Parallel
方法与 async
委托一起使用 - 至少现在还不能。
由于您已经拥有“管道”风格的体系结构,我建议查看 TPL 数据流。一个 ActionBlock
可能就是您所需要的全部,一旦您开始工作,TPL 数据流中的其他块可能会替换管道的其他部分。
如果您更愿意坚持使用现有缓冲区,那么您应该使用异步并发而不是 Parallel
:
private void Process() {
var throttler = new SemaphoreSlim(8);
var tasks = _buffer.GetConsumingEnumerable()
.Select(async report =>
{
await throttler.WaitAsync();
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
finally {
throttler.Release();
}
})
.ToList();
await Task.WhenAll(tasks);
}
Parallel.ForEach
默认采用块分区,旨在减少 CPU 密集型应用程序中的同步开销,但在某些使用场景中可能会导致问题行为。块分区可以通过作为参数传递 Partitioner<T>
而不是 IEnumerable<T>
:
来禁用
Parallel.ForEach(Partitioner.Create(_buffer.GetConsumingEnumerable(),
EnumerablePartitionerOptions.NoBuffering), options, ...
您还可以在本文中找到专门为 BlockingCollection<T>
定制的自定义分区程序:ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensions
也就是说,Parallel.ForEach
不是异步友好的,这意味着它不理解异步委托。传递的 lambda 是 async void
,而是 something to avoid. So I would recommend using an ActionBlock<T>
。
您有事件流 processing/dataflow 问题,而不是并行问题。如果你使用合适的classes,比如Dataflow blocks, Channels, or Reactive Extensions,问题就简化了很多。
即使您想使用单个缓冲区和胖工作者方法,合适的缓冲区 class 是异步 Channel,而不是 BlockingCollection。代码可以变得如此简单:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
第一个选项展示了如何使用数据流创建管道。二、如何使用Channel
代替BlockingCollection
并发处理多个排队项
带数据流的管道
一旦将流程分解为独立的方法,就可以轻松地使用任何库创建处理步骤的管道。
Task<IEnumerable<GpsMessage>> Poller(DateTime time,IList<Device> devices,CancellationToken token=default)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
yield return msg;
}
}
GpsReport Parser(GpsMessage msg)
{
//Do some parsing magic.
return report;
}
async Task<GpsReport> Enrich(GpsReport report,string connectionString,CancellationToken token=default)
{
//Depend on connection pooling to eliminate the cost of connections
//We may have to use a pool of opened connections otherwise
using var con=new SqlConnection(connectionString);
var extraData=await con.QueryAsync<Extra>(sql,new {deviceId=report.DeviceId},token);
report.Extra=extraData;
return report;
}
async Task BulkImport(SqlReport[] reports,CancellationToken token=default)
{
using var bcp=new SqlBulkCopy(...);
using var reader=ObjectReader.Create(reports);
...
await bcp.WriteToServerAsync(reader,token);
}
在 BulkImport 方法中,我使用 FasMember's ObjectReader 在报告上创建一个 IDataReader 包装器,以便我可以将它们与 SqlBulkCopy 一起使用。另一种选择是将它们转换为数据表,但这会在内存中创建一个额外的数据副本。
将所有这些与 Dataflow 结合起来相对容易。
var execOptions=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
}
_poller = new TransformManyBlock<DateTime,GpsBuffer>(time=>Poller(time,devices));
_parser = new TransformBlock<GpsBuffer,GpsReport>(b=>Parser(b),execOptions);
var enricher = new TransformBlock<GpsReport,GpsReport>(rpt=>Enrich(rpt,connStr),execOptions);
_batch = new BatchBlock<GpsReport>(50);
_bcpBlock = new ActionBlock<GpsReport[]>(reports=>BulkImport(reports));
每个块都有一个输入和输出缓冲区(ActionBlock 除外)。每个块负责处理其输入缓冲区中的消息并对其进行处理。默认情况下,每个块只使用一个工作任务,但可以更改。消息顺序保持不变,因此如果我们为 parser
块使用 10 个工作任务,消息仍将按照接收顺序发出。
接下来是链接块。
var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
_poller.LinkTo(_parser,options);
_parser.LinkTo(_enricher,options);
_enricher.LinkTo(_batch,options);
_batch.LinkTo(_bcpBlock,options);
之后,可以使用计时器“ping”头块,即轮询器,只要我们需要:
private void Ping(object state)
{
_poller.Post(DateTime.Now);
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(Ping, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
为了优雅地停止管道,我们在头块上调用 Complete()
并在最后一个块上等待 Completion
任务。假设托管服务类似于 timed background service example:
public Task StopAsync(CancellationToken cancellationToken)
{
....
_timer?.Change(Timeout.Infinite, 0);
_poller.Complete();
await _bcpBlock.Completion;
...
}
将通道用作异步队列
对于异步 publisher/subscriber 场景,Channel 是比 BlockingCollection 更好的选择。粗略地说,这是一个异步队列,它通过强制调用者使用 ChannelWriter 和 ChannelReader [=128] 来极端地 阻止 发布者读取或订阅者写入=]是的。事实上,只传递那些 class 是很常见的,从不传递 Channel 实例本身。
在您的发布代码中,您可以创建一个 Channel<T>
并将其 Reader 传递给 GpsReportService
服务。假设发布者是另一个实现 IGpsPublisher
接口的服务:
public interface IGpsPublisher
{
ChannelReader<GspMessage> Reader{get;}
}
和实施
Channel<GpsMessage> _channel=Channel.CreateUnbounded<GpsMessage>();
public ChannelReader<GspMessage> Reader=>_channel;
private async void Ping(object state)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
await _channel.Writer.WriteAsync(msg);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_timer = new Timer(Ping, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite, 0);
_channel.Writer.Complete();
}
这可以作为将由 DI 容器解析的依赖项传递给 GpsReportService:
public sealed class GpsReportService : BackgroundService
{
private readonly ChannelReader<GpsMessage> _reader;
public GpsReportService(
IGpsPublisher publisher,
...)
{
_reader = publisher.Reader;
...
}
并使用
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
一旦发布者完成,订阅者循环也将在处理完所有消息后完成。
要并行处理,可以同时启动多个循环:
async Task Process(ChannelReader<GgpsMessage> reader,CancellationToken token)
{
await foreach(GpsMessage msg in reader.ReadAllAsync(token))
{
await _handler.ProcessAsync(msg);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,10)
.Select(_=>ProcessReader(_reader,stoppingToken))
.ToArray();
await Task.WhenAll(tasks);
}
解释管道
我有类似的情况:每隔15分钟我就向航空公司请求机票销售报告(实际上是GDSs),解析它们以提取数据和机票号码,下载每张机票的机票记录以获得一些额外的数据并将所有内容保存到数据库中。我必须为 20 多个城市(每个城市的票据报告)执行此操作,每个报告包含 10 到超过 100,000 张票。
这几乎是在乞求管道。使用您的示例,您可以使用以下 steps/blocks:
创建管道
- 侦听 GPS 消息并发出 未解析的 消息。
- 解析消息并发出已解析消息
- 加载每条消息所需的任何额外数据并发出组合记录
- 处理组合记录并发出结果
- (可选)批量结果
- 将结果保存到数据库
所有三个选项(Dataflow、Channels、Rx)都负责步骤之间的缓冲。 Dataflow 是用于管道处理独立事件的一些组件所需的库,Rx 是现成的,用于分析时间很重要的事件流(例如计算滑动的平均速度 window),Channels 是乐高积木可以做任何事情,但需要放在一起。
为什么不Parallel.ForEach
Parallel.ForEach
用于数据并行性,而不是异步操作。它旨在处理彼此独立的大块内存数据。 Amdah's Law 解释说,并行化的好处受到操作的 同步 部分的限制,因此所有数据并行化库都试图通过分区来减少并行化,并使用一个 core/machine/node 来处理每个分区。
Parallel.ForEach
还通过对数据进行分区并为每个 CPU 内核使用大约一个工作任务来工作,以减少内核之间的同步。它甚至会使用当前线程,这会导致错误的假设它正在阻塞。当所有核心都忙时,为什么不使用线程呢?无论如何它都无法运行。
我不确定这个标题是否有意义,这是我能想到的最好的,所以这是我的场景。
我有一个 ASP.NET 核心应用程序,我更多地用作 shell 和 DI 配置。在 Startup
中,它添加了一堆 IHostedService
作为单例,以及它们的依赖项,也作为单例,除了 SqlConnection
和 DbContext
的小例外,我们将之后。托管服务是一组类似的服务:
- 侦听来自 GPS 设备的传入报告并将其放入侦听缓冲区。
- 从侦听缓冲区中解析项目并放入解析缓冲区。
最终只有一个服务可以读取已解析的缓冲区并实际处理已解析的报告。它通过将从缓冲区中取出的报告传递给处理程序并等待它完成以移动到下一个处理程序来完成此操作。这在过去的一年里运作良好,但现在我们 运行 似乎陷入了可伸缩性问题,因为它一次处理一个报告,服务器上的平均处理时间为 62 毫秒,其中包括 Dapper 访问获取所需数据的数据库和保存更改的 EF Core 行程。
但是,如果处理程序决定报告的信息需要触发后台作业,那么我怀疑它需要 100 毫秒或更长时间才能完成。随着时间的推移,缓冲区填满的速度比处理程序处理的速度要快,以至于在处理数以千计的报告之前,即使不是数十万,也只能保留 10 秒。这是一个问题,因为通知被延迟,并且如果服务器在午夜重新启动时缓冲区仍然已满,则可能会丢失数据。
综上所述,我正在尝试弄清楚如何使处理并行进行。经过昨天的大量实验,我决定在使用 GetConsumingEnumerable()
的缓冲区上使用 Parallel.ForEach
。这很好用,除了我不知道该怎么做甚至打电话的奇怪行为。当缓冲区被填满并且 ForEach
对其进行迭代时,它将开始将处理“分块”为不断增加的二的倍数。分块的大小受 MaxDegreeOfParallelism
设置的影响。例如(N# = 缓冲区中的下一份报告):
MDP = 1
- 一次N3 = 1个
- N6 = 一次2个
- N12 = 一次4个
- ...
MDP = 2
- N6 = 一次1个
- N12 = 一次2个
- N24 = 一次4个
- ...
MDP = 4
- N12 = 一次1个
- N24 = 一次2个
- N48 = 一次4个
- ...
MDP = 8(我的 CPU 核心数)
- N24 = 一次1个
- N48 = 一次2个
- N96 = 一次4个
- ...
这可以说比我现在的串行执行更糟糕,因为到一天结束时它会缓冲并等待,比方说,在实际处理它们之前有 50 万份报告。
有办法解决这个问题吗?我对 Parallel.ForEach
不是很有经验,所以从我的角度来看这是一种奇怪的行为。最终,我正在寻找一种方法来并行处理报告,只要它们在缓冲区中,所以如果有其他方法可以实现这一点,我会洗耳恭听。这大致就是我的代码。处理报告的处理程序确实使用 IServiceProvider
创建范围并获取 SqlConnection
和 DbContext
的实例。在此先感谢您的任何建议!
public sealed class GpsReportService :
IHostedService {
private readonly GpsReportBuffer _buffer;
private readonly Config _config;
private readonly GpsReportHandler _handler;
private readonly ILogger _logger;
public GpsReportService(
GpsReportBuffer buffer,
Config config,
GpsReportHandler handler,
ILogger<GpsReportService> logger) {
_buffer = buffer;
_config = config;
_handler = handler;
_logger = logger;
}
public Task StartAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Report Service => starting");
Task.Run(Process, cancellationToken).ConfigureAwait(false);// Is ConfigureAwait here correct usage?
_logger.LogInformation("GPS Report Service => started");
return Task.CompletedTask;
}
public Task StopAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Parsing Service => stopping");
_buffer.CompleteAdding();
_logger.LogInformation("GPS Parsing Service => stopped");
return Task.CompletedTask;
}
// ========================================================================
// Utilities
// ========================================================================
private void Process() {
var options = new ParallelOptions {
MaxDegreeOfParallelism = 8,
CancellationToken = CancellationToken.None
};
Parallel.ForEach(_buffer.GetConsumingEnumerable(), options, async report => {
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
});
}
private async Task ProcessAsync() {
while (!_buffer.IsCompleted) {
try {
var took = _buffer.TryTake(out var report, 10);
if (!took) {
continue;
}
await _handler.ProcessAsync(report!).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
}
}
}
public sealed class GpsReportBuffer :
BlockingCollection<GpsReport> {
}
您不能将 Parallel
方法与 async
委托一起使用 - 至少现在还不能。
由于您已经拥有“管道”风格的体系结构,我建议查看 TPL 数据流。一个 ActionBlock
可能就是您所需要的全部,一旦您开始工作,TPL 数据流中的其他块可能会替换管道的其他部分。
如果您更愿意坚持使用现有缓冲区,那么您应该使用异步并发而不是 Parallel
:
private void Process() {
var throttler = new SemaphoreSlim(8);
var tasks = _buffer.GetConsumingEnumerable()
.Select(async report =>
{
await throttler.WaitAsync();
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
finally {
throttler.Release();
}
})
.ToList();
await Task.WhenAll(tasks);
}
Parallel.ForEach
默认采用块分区,旨在减少 CPU 密集型应用程序中的同步开销,但在某些使用场景中可能会导致问题行为。块分区可以通过作为参数传递 Partitioner<T>
而不是 IEnumerable<T>
:
Parallel.ForEach(Partitioner.Create(_buffer.GetConsumingEnumerable(),
EnumerablePartitionerOptions.NoBuffering), options, ...
您还可以在本文中找到专门为 BlockingCollection<T>
定制的自定义分区程序:ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensions
也就是说,Parallel.ForEach
不是异步友好的,这意味着它不理解异步委托。传递的 lambda 是 async void
,而是 something to avoid. So I would recommend using an ActionBlock<T>
。
您有事件流 processing/dataflow 问题,而不是并行问题。如果你使用合适的classes,比如Dataflow blocks, Channels, or Reactive Extensions,问题就简化了很多。
即使您想使用单个缓冲区和胖工作者方法,合适的缓冲区 class 是异步 Channel,而不是 BlockingCollection。代码可以变得如此简单:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
第一个选项展示了如何使用数据流创建管道。二、如何使用Channel
代替BlockingCollection
并发处理多个排队项
带数据流的管道
一旦将流程分解为独立的方法,就可以轻松地使用任何库创建处理步骤的管道。
Task<IEnumerable<GpsMessage>> Poller(DateTime time,IList<Device> devices,CancellationToken token=default)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
yield return msg;
}
}
GpsReport Parser(GpsMessage msg)
{
//Do some parsing magic.
return report;
}
async Task<GpsReport> Enrich(GpsReport report,string connectionString,CancellationToken token=default)
{
//Depend on connection pooling to eliminate the cost of connections
//We may have to use a pool of opened connections otherwise
using var con=new SqlConnection(connectionString);
var extraData=await con.QueryAsync<Extra>(sql,new {deviceId=report.DeviceId},token);
report.Extra=extraData;
return report;
}
async Task BulkImport(SqlReport[] reports,CancellationToken token=default)
{
using var bcp=new SqlBulkCopy(...);
using var reader=ObjectReader.Create(reports);
...
await bcp.WriteToServerAsync(reader,token);
}
在 BulkImport 方法中,我使用 FasMember's ObjectReader 在报告上创建一个 IDataReader 包装器,以便我可以将它们与 SqlBulkCopy 一起使用。另一种选择是将它们转换为数据表,但这会在内存中创建一个额外的数据副本。
将所有这些与 Dataflow 结合起来相对容易。
var execOptions=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
}
_poller = new TransformManyBlock<DateTime,GpsBuffer>(time=>Poller(time,devices));
_parser = new TransformBlock<GpsBuffer,GpsReport>(b=>Parser(b),execOptions);
var enricher = new TransformBlock<GpsReport,GpsReport>(rpt=>Enrich(rpt,connStr),execOptions);
_batch = new BatchBlock<GpsReport>(50);
_bcpBlock = new ActionBlock<GpsReport[]>(reports=>BulkImport(reports));
每个块都有一个输入和输出缓冲区(ActionBlock 除外)。每个块负责处理其输入缓冲区中的消息并对其进行处理。默认情况下,每个块只使用一个工作任务,但可以更改。消息顺序保持不变,因此如果我们为 parser
块使用 10 个工作任务,消息仍将按照接收顺序发出。
接下来是链接块。
var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
_poller.LinkTo(_parser,options);
_parser.LinkTo(_enricher,options);
_enricher.LinkTo(_batch,options);
_batch.LinkTo(_bcpBlock,options);
之后,可以使用计时器“ping”头块,即轮询器,只要我们需要:
private void Ping(object state)
{
_poller.Post(DateTime.Now);
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(Ping, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
为了优雅地停止管道,我们在头块上调用 Complete()
并在最后一个块上等待 Completion
任务。假设托管服务类似于 timed background service example:
public Task StopAsync(CancellationToken cancellationToken)
{
....
_timer?.Change(Timeout.Infinite, 0);
_poller.Complete();
await _bcpBlock.Completion;
...
}
将通道用作异步队列
对于异步 publisher/subscriber 场景,Channel 是比 BlockingCollection 更好的选择。粗略地说,这是一个异步队列,它通过强制调用者使用 ChannelWriter 和 ChannelReader [=128] 来极端地 阻止 发布者读取或订阅者写入=]是的。事实上,只传递那些 class 是很常见的,从不传递 Channel 实例本身。
在您的发布代码中,您可以创建一个 Channel<T>
并将其 Reader 传递给 GpsReportService
服务。假设发布者是另一个实现 IGpsPublisher
接口的服务:
public interface IGpsPublisher
{
ChannelReader<GspMessage> Reader{get;}
}
和实施
Channel<GpsMessage> _channel=Channel.CreateUnbounded<GpsMessage>();
public ChannelReader<GspMessage> Reader=>_channel;
private async void Ping(object state)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
await _channel.Writer.WriteAsync(msg);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_timer = new Timer(Ping, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite, 0);
_channel.Writer.Complete();
}
这可以作为将由 DI 容器解析的依赖项传递给 GpsReportService:
public sealed class GpsReportService : BackgroundService
{
private readonly ChannelReader<GpsMessage> _reader;
public GpsReportService(
IGpsPublisher publisher,
...)
{
_reader = publisher.Reader;
...
}
并使用
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
一旦发布者完成,订阅者循环也将在处理完所有消息后完成。
要并行处理,可以同时启动多个循环:
async Task Process(ChannelReader<GgpsMessage> reader,CancellationToken token)
{
await foreach(GpsMessage msg in reader.ReadAllAsync(token))
{
await _handler.ProcessAsync(msg);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,10)
.Select(_=>ProcessReader(_reader,stoppingToken))
.ToArray();
await Task.WhenAll(tasks);
}
解释管道
我有类似的情况:每隔15分钟我就向航空公司请求机票销售报告(实际上是GDSs),解析它们以提取数据和机票号码,下载每张机票的机票记录以获得一些额外的数据并将所有内容保存到数据库中。我必须为 20 多个城市(每个城市的票据报告)执行此操作,每个报告包含 10 到超过 100,000 张票。
这几乎是在乞求管道。使用您的示例,您可以使用以下 steps/blocks:
创建管道- 侦听 GPS 消息并发出 未解析的 消息。
- 解析消息并发出已解析消息
- 加载每条消息所需的任何额外数据并发出组合记录
- 处理组合记录并发出结果
- (可选)批量结果
- 将结果保存到数据库
所有三个选项(Dataflow、Channels、Rx)都负责步骤之间的缓冲。 Dataflow 是用于管道处理独立事件的一些组件所需的库,Rx 是现成的,用于分析时间很重要的事件流(例如计算滑动的平均速度 window),Channels 是乐高积木可以做任何事情,但需要放在一起。
为什么不Parallel.ForEach
Parallel.ForEach
用于数据并行性,而不是异步操作。它旨在处理彼此独立的大块内存数据。 Amdah's Law 解释说,并行化的好处受到操作的 同步 部分的限制,因此所有数据并行化库都试图通过分区来减少并行化,并使用一个 core/machine/node 来处理每个分区。
Parallel.ForEach
还通过对数据进行分区并为每个 CPU 内核使用大约一个工作任务来工作,以减少内核之间的同步。它甚至会使用当前线程,这会导致错误的假设它正在阻塞。当所有核心都忙时,为什么不使用线程呢?无论如何它都无法运行。