Blocked on 数据流的块设计

Blocked on block design for data flow

我们有一个数据处理管道,我们正在尝试使用 TPL Dataflow 框架。

管道的基本要点:

  1. 遍历文件系统上的 CSV 文件 (10,000)
  2. 验证我们没有导入内容,如果我们忽略
  3. 遍历单个 CSV 文件(20,000-120,000 行)的内容并创建适合我们需要的数据结构。
  4. 将这些新的 dataStructured 项目中的 100 个批量化并将它们推送到数据库中
  5. 将 CSV 文件标记为正在导入。

现在我们有一个现有的 Python 文件,它以非常缓慢和痛苦的方式执行上述所有操作 - 代码一团糟。

我的想法是下面看 TPL Dataflow

  1. BufferBlock<string>将Post全部文件放入
  2. TransformBlock<string, SensorDataDto>谓词检测是否导入此文件
  3. TransformBlock<string, SensorDataDto> 读取 CSV 文件并创建 SensorDataDto 结构
  4. BatchBlock<SensorDataDto> 中使用 TransformBlock 委托来批量处理 100 个请求。

    4.5。 ActionBlock<SensorDataDto> 将 100 条记录推送到数据库中。

  5. ActionBlock 将 CSV 标记为已导入。

我已经创建了前几个操作并且它们正在工作(BufferBlock -> TransformBlock + Predicate && Process if hasn't)但我不确定如何继续流程,以便我可以 post 100 到 TransformBlock 中的 BatchBlock 并连接以下操作。

这看起来正确吗 - 基本要点,以及如何以 TPL 数据流畅的方式处理 BufferBlock 位?

bufferBlock.LinkTo(readCsvFile, ShouldImportFile)
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>())
readCsvFile.LinkTo(normaliseData)
normaliseData.LinkTo(updateCsvImport)
updateCsvImport.LinkTo(completionBlock)

batchBlock.LinkTo(insertSensorDataBlock)

bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete());
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete());
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete());
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete());

batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete());

在我调用 BatchBlock.Post<..>(...)normaliseData 方法中,这是一个好的模式还是应该采用不同的结构?我的问题是,我只能在所有记录都被推送后才能将文件标记为正在导入。

Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait();

如果我们有一批100,如果压入了80怎么办,有没有办法排掉最后的80

我不确定是否应该 Link 主管道中的 BatchBlock,我会等到两者都完成。

首先,你不需要在这件事上使用Completion,你可以在link期间使用PropagateCompletion 属性:

// with predicate
bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile);
// without predicate
readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true });

现在,回到你的批处理问题。也许,您可以在此处使用 JoinBlock<T1, T2> or BatchedJoinBlock<T1, T2>,将它们附加到您的管道中并收集连接结果,这样您就可以全面了解正在完成的工作。也许您可以实现自己的 ITargetBlock<TInput>,这样您就可以按照自己的方式使用消息。

根据official docs, the blocks are greedy, and gather data from linked one as soon as it becomes available, so join blocks may stuck, if one target is ready and other is not, or batch block has 80% of batch size, so you need to put that in your mind. In case of your own implementation you can use ITargetBlock<TInput>.OfferMessage方法从您的来源获取信息。

BatchBlock<T> is capable of executing in both greedy and non-greedy modes. In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.

In non-greedy mode, all messages are postponed from sources until enough sources have offered messages to the block to create a batch. Thus, a BatchBlock<T> can be used to receive 1 element from each of N sources, N elements from 1 source, and a myriad of options in between.