TPL 数据流:在每次调用其委托时避免重复 运行 using-block(例如写入 StreamWriter)的 ActionBlock

TPL Dataflow: ActionBlock that avoids repeatedly running a using-block (such as for writing to a StreamWriter) on every invocation of its delegate

我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 个文本文件在完成后将是 1M 行。

当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接到 BroadcastBlock,链接到 n BufferBlock/ActionBlock 对。

我试图避免的是让我的 ActionBlock 委托执行一个 using (StreamWriter x...) { x.WriteLine(); },这将打开和关闭每个输出文件一百万次。

我目前的想法是代替 ActionBlock,编写一个实现 ITargetBlock<> 的自定义 class。有没有更简单的方法?

编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都非常关注文件系统行为。为了未来搜索者的利益,问题的重点是 如何在 ActionBlock 委托 之外构建某种 setup/teardown。这适用于您通常用 using-block 包装的任何类型的一次性用品。

编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要是 在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象.

通常在使用 TPL 时,我会自定义 classes,这样我就可以创建用于管道中块的私有成员变量和私有方法,而不是实现 ITargetBlockISourceBlock,我将在我的自定义 class 中包含我需要的任何块,然后我将 ITargetBlock 和/或 ISourceBlock 公开为 public 属性,所以其他 classes 可以使用源块和目标块将 link 东西放在一起。

即使您不必每次都打开流,一次一行地写入文件本身也很昂贵。保持文件流打开还有其他问题,因为出于性能原因,文件流总是被缓冲,从 FileStream 级别一直到文件系统驱动程序。您必须定期刷新流以确保数据已写入磁盘。

要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦这样做,打开流的成本就可以忽略不计了。

这些行也应该在最后可能的时刻生成,以避免生成需要被垃圾收集的临时字符串。在 n*1M 记录时,这些分配和垃圾收集的内存和 CPU 开销会很严重。

在写入之前记录库批处理日志条目以避免这种性能影响。

您可以尝试这样的操作:

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
   
    //Create or open a file for appending
    using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    }

});

batchBlock.LinkTo(writerBlock,options);

或者,使用异步方法

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
   
    //Create or open a file for appending
    await using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    }

});

batchBlock.LinkTo(writerBlock,options);

您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。

创建写入流的实际“块”

可以使用 Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 而不是创建实际的自定义块,创建 returns LinkTo 工作所需的任何东西, 在这种情况下 ITargetBlock< T> :

ITargetBlock<Record> FileExporter(string path)
{
    var writer=new StreamWriter(path,true);
    var block=new ActionBlock<Record>(async msg=>{
        await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    });

    //Close the stream when the block completes
    block.Completion.ContinueWith(_=>write.Close());
    return (ITargetBlock<Record>)target;
}
...


var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);

这里的“技巧”是流是在块外创建的,并且在块完成之前一直保持活动状态。它不被垃圾收集,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要明确地关闭它。 block.Completion.ContinueWith(_=>write.Close()); 无论块是否正常完成,都将关闭流。

这与演练中使用的代码相同,用于关闭输出 BufferBlock:

target.Completion.ContinueWith(delegate
{
   if (queue.Count > 0 && queue.Count < windowSize)
      source.Post(queue.ToArray());
   source.Complete();
});

默认情况下缓冲流,因此调用 WriteLine 并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时真正写入文件。如果应用程序崩溃,一些数据可能会丢失。

内存、IO 和开销

在很长一段时间内处理 100 万行时,事情会加起来。可以使用例如 File.AppendAllLinesAsync 一次写入一批行,但这会导致分配 1M 临时字符串。在每次迭代中,运行时必须使用 至少 作为 RAM 来处理这些临时字符串作为批处理。在 GC 触发冻结线程之前,RAM 使用量将开始激增至数百 MB,然后是 GB。

对于 1M 行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情可能会很快 崩溃。假设有 1M 条消息卡在 one 块中,因为一条消息被阻塞了。

重要的是(出于理智和性能原因)使管道中的各个组件尽可能简单。