TPL 数据流同步处理每个文件,但异步处理文件中的每一行

TPL Dataflow process each file synchronously but each line within a file asynchronously

所以我的用例要求我处理一个文件列表,对于列表中的每个文件,我遍历每一行并对这些行进行一些计算。现在我的问题是我的缓冲区块中不能有多个文件行,所以我基本上需要确保一个文件被完全处理(通过一系列数据流块),然后再进入第二个文件。

现在我查看了 ,其中的答案是要么完全停止使用 tpl 数据流,要么将多个处理块封装为一个以便我可以控制它。但如果我这样做,我将失去 tpl 提供的“可组合性”,将独立的块集中在一起似乎也有点浪费。还有其他方法吗?

我想在叶节点上使用 OutputAvailableAsync,以便在我 post 在另一个文件中清除所有内容时通知我。但我根本无法让 OutputAvailableAsync 工作。它只是永远等待。

编辑

在管道中,我将有一个带有状态的动作块,我计划为此使用 ConcurrentDictionary(对于文件中的每一行,我有多个注意事项)。现在我不可能为每一行编制索引,因为这意味着我必须保持 N 个文件一起处理的状态。这里 N 可能是要处理的文件数。

这就是我现在所拥有的,记住我刚刚编写了一个概念验证。

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

您可以利用 TPL 数据流的 conditional linking capabilities,以创建部分共享和部分专用的管道。单个 reader 块和单个解析器块将由所有文件共享,同时将为每个文件创建一个专用处理器块。下面是这个概念的简单演示:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

在此示例中,每个文件都与一个 int Id 相关联。此 Id 与每一行一起传递,以便能够在下游重建文件。 Value tuples 用于将每条数据与其原始文件的 Id 组合。在共享 parser 块和每个专用 processor 块之间创建条件 link。 null 有效负载用作文件结束指示符。接收到此信号后,处理器 block 理想情况下应该从 parser 中取消 link 自身,以便将条件 linking 机制的开销保持在最低限度。 unlinking 是通过处理 LinkTo 方法返回的 link 来执行的。为简单起见,上述示例中省略了这一重要步骤。

我可能应该在这里重复我在 previous related question 的回答中已经写过的内容,将单个字符串从一个块传递到另一个块将导致大量开销。分块(批处理)工作负载是可行的方法,以确保管道尽可能顺利(无摩擦)地执行。