TPL 数据流:在每次调用其委托时避免重复 运行 using-block(例如写入 StreamWriter)的 ActionBlock
TPL Dataflow: ActionBlock that avoids repeatedly running a using-block (such as for writing to a StreamWriter) on every invocation of its delegate
我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 个文本文件在完成后将是 1M 行。
当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接到 BroadcastBlock,链接到 n BufferBlock/ActionBlock 对。
我试图避免的是让我的 ActionBlock 委托执行一个 using (StreamWriter x...) { x.WriteLine(); }
,这将打开和关闭每个输出文件一百万次。
我目前的想法是代替 ActionBlock,编写一个实现 ITargetBlock<> 的自定义 class。有没有更简单的方法?
编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都非常关注文件系统行为。为了未来搜索者的利益,问题的重点是 如何在 ActionBlock 委托 之外构建某种 setup/teardown。这适用于您通常用 using-block 包装的任何类型的一次性用品。
编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要是 在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象.
通常在使用 TPL 时,我会自定义 classes,这样我就可以创建用于管道中块的私有成员变量和私有方法,而不是实现 ITargetBlock
或 ISourceBlock
,我将在我的自定义 class 中包含我需要的任何块,然后我将 ITargetBlock
和/或 ISourceBlock
公开为 public 属性,所以其他 classes 可以使用源块和目标块将 link 东西放在一起。
即使您不必每次都打开流,一次一行地写入文件本身也很昂贵。保持文件流打开还有其他问题,因为出于性能原因,文件流总是被缓冲,从 FileStream
级别一直到文件系统驱动程序。您必须定期刷新流以确保数据已写入磁盘。
要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦这样做,打开流的成本就可以忽略不计了。
这些行也应该在最后可能的时刻生成,以避免生成需要被垃圾收集的临时字符串。在 n*1M 记录时,这些分配和垃圾收集的内存和 CPU 开销会很严重。
在写入之前记录库批处理日志条目以避免这种性能影响。
您可以尝试这样的操作:
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
或者,使用异步方法
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。
创建写入流的实际“块”
可以使用 Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 而不是创建实际的自定义块,创建 returns LinkTo
工作所需的任何东西, 在这种情况下 ITargetBlock< T>
:
ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
});
//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...
var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);
这里的“技巧”是流是在块外创建的,并且在块完成之前一直保持活动状态。它不被垃圾收集,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要明确地关闭它。 block.Completion.ContinueWith(_=>write.Close());
无论块是否正常完成,都将关闭流。
这与演练中使用的代码相同,用于关闭输出 BufferBlock:
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
默认情况下缓冲流,因此调用 WriteLine
并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时真正写入文件。如果应用程序崩溃,一些数据可能会丢失。
内存、IO 和开销
在很长一段时间内处理 100 万行时,事情会加起来。可以使用例如 File.AppendAllLinesAsync
一次写入一批行,但这会导致分配 1M 临时字符串。在每次迭代中,运行时必须使用 至少 作为 RAM 来处理这些临时字符串作为批处理。在 GC 触发冻结线程之前,RAM 使用量将开始激增至数百 MB,然后是 GB。
对于 1M 行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情可能会很快 崩溃。假设有 1M 条消息卡在 one 块中,因为一条消息被阻塞了。
重要的是(出于理智和性能原因)使管道中的各个组件尽可能简单。
我需要从 IDataReader 读取 1M 行,并同时写入 n 个文本文件。这些文件中的每一个都将是可用列的不同子集;所有 n 个文本文件在完成后将是 1M 行。
当前计划是一个 TransformManyBlock 来迭代 IDataReader,链接到 BroadcastBlock,链接到 n BufferBlock/ActionBlock 对。
我试图避免的是让我的 ActionBlock 委托执行一个 using (StreamWriter x...) { x.WriteLine(); }
,这将打开和关闭每个输出文件一百万次。
我目前的想法是代替 ActionBlock,编写一个实现 ITargetBlock<> 的自定义 class。有没有更简单的方法?
编辑 1:讨论对我当前的问题很有价值,但到目前为止的答案都非常关注文件系统行为。为了未来搜索者的利益,问题的重点是 如何在 ActionBlock 委托 之外构建某种 setup/teardown。这适用于您通常用 using-block 包装的任何类型的一次性用品。
编辑 2:根据@Panagiotis Kanavos,解决方案的执行摘要是 在定义块之前设置对象,然后在块的 Completion.ContinueWith 中拆除对象.
通常在使用 TPL 时,我会自定义 classes,这样我就可以创建用于管道中块的私有成员变量和私有方法,而不是实现 ITargetBlock
或 ISourceBlock
,我将在我的自定义 class 中包含我需要的任何块,然后我将 ITargetBlock
和/或 ISourceBlock
公开为 public 属性,所以其他 classes 可以使用源块和目标块将 link 东西放在一起。
即使您不必每次都打开流,一次一行地写入文件本身也很昂贵。保持文件流打开还有其他问题,因为出于性能原因,文件流总是被缓冲,从 FileStream
级别一直到文件系统驱动程序。您必须定期刷新流以确保数据已写入磁盘。
要真正提高性能,您必须对记录进行批处理,例如使用 BatchBlock。一旦这样做,打开流的成本就可以忽略不计了。
这些行也应该在最后可能的时刻生成,以避免生成需要被垃圾收集的临时字符串。在 n*1M 记录时,这些分配和垃圾收集的内存和 CPU 开销会很严重。
在写入之前记录库批处理日志条目以避免这种性能影响。
您可以尝试这样的操作:
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
或者,使用异步方法
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
您可以调整批处理大小和 StreamWriter 的缓冲区大小以获得最佳性能。
创建写入流的实际“块”
可以使用 Custom Dataflow block walkthrough 中显示的技术创建自定义块 - 而不是创建实际的自定义块,创建 returns LinkTo
工作所需的任何东西, 在这种情况下 ITargetBlock< T>
:
ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
});
//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...
var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);
这里的“技巧”是流是在块外创建的,并且在块完成之前一直保持活动状态。它不被垃圾收集,因为它被其他代码使用。当块完成时,无论发生什么,我们都需要明确地关闭它。 block.Completion.ContinueWith(_=>write.Close());
无论块是否正常完成,都将关闭流。
这与演练中使用的代码相同,用于关闭输出 BufferBlock:
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
默认情况下缓冲流,因此调用 WriteLine
并不意味着数据将实际写入磁盘。这意味着我们不知道数据何时真正写入文件。如果应用程序崩溃,一些数据可能会丢失。
内存、IO 和开销
在很长一段时间内处理 100 万行时,事情会加起来。可以使用例如 File.AppendAllLinesAsync
一次写入一批行,但这会导致分配 1M 临时字符串。在每次迭代中,运行时必须使用 至少 作为 RAM 来处理这些临时字符串作为批处理。在 GC 触发冻结线程之前,RAM 使用量将开始激增至数百 MB,然后是 GB。
对于 1M 行和大量数据,很难调试和跟踪管道中的数据。如果出现问题,事情可能会很快 崩溃。假设有 1M 条消息卡在 one 块中,因为一条消息被阻塞了。
重要的是(出于理智和性能原因)使管道中的各个组件尽可能简单。