TPL 数据流与普通信号量

TPL Dataflow vs plain Semaphore

我需要制作一个可扩展的流程。该过程主要有 I/O 操作和一些次要的 CPU 操作(主要是反序列化字符串)。该过程在数据库中查询 url 列表,然后从这些 url 中获取数据,将下载的数据反序列化为对象,然后将一些数据保存到 crm dynamics 中,也保存到另一个数据库中。之后我需要更新第一个处理了 url 的数据库。部分需求是让并行度可配置。

最初我想通过 await 的一系列任务来实现它,并使用信号量限制并行度 - 非常简单。然后我阅读了@Stephen Cleary 的一些帖子和答案,其中推荐使用 TPL Dataflow,我认为它可能是一个很好的候选者。但是,我想通过使用数据流来确保我 "complicating" 代码是有价值的。我还得到了使用 ForEachAsync extension method 的建议,它也很容易使用,但是我不确定它是否会因为它对集合的分区方式而导致内存开销。

对于这种情况,TPL Dataflow 是一个不错的选择吗?它比 Semaphore 或 ForEachAsync 方法有何优势 - 如果我通过 TPL DataFlow 实现它而不是其他每个选项 (Semaphore/ForEachASync),我实际上会获得什么好处 (Semaphore/ForEachASync)?

The process has mainly IO operations with some minor CPU operations (mainly deserializing strings).

差不多就是 I/O。除非这些字符串 很大 ,反序列化不值得并行化。您正在做的那种 CPU 工作会在噪音中消失。

因此,您需要关注并发异步。

    正如您所发现的,
  • SemaphoreSlim 是标准模式。
  • TPL Dataflow 也可以做并发(异步和并行形式)。

ForEachAsync 可以有多种形式;请注意,在您引用的 blog post 中,此方法有 5 个不同的实现,每个都是有效的。 “[T]这里有许多不同的语义可能用于迭代,每一个都会导致不同的设计选择和实现。”出于您的目的(不希望 CPU 并行化),您不应考虑使用 Task.Run 或分区的那些。在异步并发世界中,任何 ForEachAsync 实现都只是隐藏其实现的语义的语法糖,这就是我倾向于避免它的原因。

结果是 SemaphoreSlimActionBlock。我通常建议人们首先从 SemaphoreSlim 开始,如果他们的需求变得更加复杂(在某种程度上他们似乎会从数据流管道中受益),请考虑转向 TPL 数据流。

例如,"Part of the requirement is to make the parallelism degree configurable."

您可以从允许一定程度的并发开始 - 被限制的事情是一个完整的操作(从 url 获取数据,将下载的数据反序列化为对象,持久化到 crm 动态和另一个数据库,并更新第一个数据库)。这就是 SemaphoreSlim 完美的解决方案。

但是您可能决定要有多个旋钮:比如说,一个并发度表示您正在下载多少 url,一个单独的并发度用于持久化,另一个单独的度数更新原始数据库的并发性。然后您还需要限制这些点之间的 "queues":内存中只有这么多反序列化对象等 - 以确保快速 urls 与慢速数据库不会导致您的应用程序使用过多内存的问题。如果这些是有用的语义,那么您已经开始从数据流的角度来解决问题,这就是您可以更好地使用像 TPL Dataflow 这样的库的关键。

以下是 Semaphore 方法的卖点:

  1. 简单

下面是 TPL 数据流 方法的卖点:

  1. 数据并行之上的任务并行
  2. 资源的最佳利用(带宽、CPU、数据库连接)
  3. 每个异构操作的可配置并行度
  4. 减少内存占用

让我们回顾一下下面的信号量实现示例:

string[] urls = FetchUrlsFromDB();
var cts = new CancellationTokenSource();
var semaphore = new SemaphoreSlim(10); // Degree of parallelism (DOP)
Task[] tasks = urls.Select(url => Task.Run(async () =>
{
    await semaphore.WaitAsync(cts.Token);
    try
    {
        string rawData = DownloadData(url);
        var data = Deserialize(rawData);
        PersistToCRM(data);
        MarkAsCompleted(url);
    }
    finally
    {
        semaphore.Release();
    }
})).ToArray();
Task.WaitAll(tasks);

以上实现确保在任何给定时刻最多同时处理 10 urls。但是,这些并行工作流之间不会有任何协调。因此,例如,完全有可能在给定时刻所有 10 个并行工作流都将下载数据,在另一时刻所有 10 个都将反序列化原始数据,而在另一个时刻所有 10 个都将数据持久保存到 CRM。这远非理想。 理想情况下,您希望整个操作的瓶颈,无论是网络适配器、CPU 还是数据库服务器,都始终不间断地工作,并且不会在各种情况下未得到充分利用(或完全空闲)随机时刻。

另一个考虑因素是每个异构操作的最佳并行化程度。 10 DOP 对于与 web 的通信可能是最佳的,但对于与数据库的通信来说太低或太高。信号量方法不允许进行那种级别的微调。您唯一的选择是通过在这些最优值之间选择一个 DOP 值来妥协。

如果 url 的数量非常大,比如说 1,000,000,那么上面的信号量方法也会引发严重的内存使用问题。 url 的平均大小可能为 50 个字节,而连接到 CancellationTokenTask 可能重 10 倍或更多。当然,您可以更改实现并以更聪明的方式使用 SemaphoreSlim,不会生成那么多任务,但这会违背这种方法的主要(也是唯一)卖点,即它的简单性。

TPL Dataflow 库解决了所有这些问题,代价是为了能够驯服这个强大的工具所需的(较小的)学习曲线。