解压缩 zip、解析文件并展平为 CSV

extracting zips, parsing files and flattening out to CSV

我正在尝试最大限度地提高以下任务的性能:

我想要的 TPL 布局是:

producer -> parser block -> batch block -> csv writer block

想法是单个生产者提取 zip 并找到 json 文件,将文本发送到并行 运行 的解析器块(多消费者)。批处理块分组为 200 个批次,写入器块在每次调用时将 200 行转储到 CSV 文件中。

问题:

更新1

我添加了 async,但我不清楚如何等待所有数据流块完成(c#、async 和 tpl 的新功能)。我基本上想说,"keep running until all of the queues/blocks are empty"。我添加了以下 'wait' 代码,并且似乎可以正常工作。

// wait for crawler to finish
crawlerTask.Wait(); 
// wait for the last block
flatWriterBlock.Completion.Wait(); 

简而言之,您的帖子忽略了 return 值。您有两个选择:添加一个未绑定的 BufferBlock 来保存所有传入数据或 await SendAsync,这将防止任何消息被丢弃。

static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
    var archive = new ZipArchive(stream);
    foreach (ZipArchiveEntry entry in archive.Entries)
    {
        if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
        {
            using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
            {
                var jsonText = reader.ReadToEnd();
                await queue.SendAsync(jsonText);
            }
        }
        else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
        {
            await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
        }
    }
}

您需要将异步一直拉回来,但这应该让您开始。

来自 MSDN,关于 DataflowBlock.Post<TInput> 方法:

Return Value
Type: System.Boolean
true if the item was accepted by the target block; otherwise, false.

所以,这里的问题是您在发送消息时没有检查,管道是否可以接受另一条消息。发生这种情况是因为您对块的选择:

new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }

和这一行:

// this line isn't waiting for long operations and simply drops the message as it can't be accepted by the target block
queue.Post(jsonText);

这里你是说处理应该推迟,直到输入队列长度等于100。在这种情况下,MSDN 或他的 Introduction to Dataflow 系列中的@StephenCleary 建议使用简单的解决方案:

However, it’s possible to throttle a block by limiting its buffer size; in this case, you could use SendAsync to (asynchronously) wait for space to be available and then place the data into the block’s input buffer.

因此,正如@JSteward 已经建议的那样,您可以在工作人员之间引入无限缓冲区以避免消息丢失,这是通常的做法,如检查 Post 方法的结果可能会长时间阻塞生产者线程。

问题的第二部分,关于性能,是使用面向 async 的解决方案(对于此类操作,这将完全适合 SendAsync method usage), as you use I/O operations all the time. Asynchronous operation is basically a way to say the program "start doing this and notify me when it's done". And, as there is no thread,您将通过释放管道中其他操作的线程池。

PS:@JSteward 已经为您提供了该方法的良好示例代码。