解压缩 zip、解析文件并展平为 CSV
extracting zips, parsing files and flattening out to CSV
我正在尝试最大限度地提高以下任务的性能:
- 枚举 zip 文件的目录
- 在内存中提取 zip 以查找
.json
文件(处理嵌套的 zip)
- 解析
json
个文件
- 将
json
文件中的属性写入聚合 .CSV
文件
我想要的 TPL 布局是:
producer -> parser block -> batch block -> csv writer block
想法是单个生产者提取 zip 并找到 json 文件,将文本发送到并行 运行 的解析器块(多消费者)。批处理块分组为 200 个批次,写入器块在每次调用时将 200 行转储到 CSV 文件中。
问题:
- jsonParseBlock
TransformBlock
花费的时间越长,丢弃的消息就越多。我怎样才能防止这种情况发生?
我怎样才能更好地利用 TPL
来最大化性能?
class Item
{
public string ID { get; set; }
public string Name { get; set; }
}
class Demo
{
const string OUT_FILE = @"c:\temp\tplflat.csv";
const string DATA_DIR = @"c:\temp\tpldata";
static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
public static void Run()
{
Console.WriteLine($"{Environment.ProcessorCount} processors available");
_InitTest(); // reset csv file, generate test data if needed
// start TPL stuff
var sw = Stopwatch.StartNew();
// transformer
var jsonParseBlock = new TransformBlock<string, Item>(rawstr =>
{
var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr);
System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost
return item;
}, parseOpts);
// batch block
var jsonBatchBlock = new BatchBlock<Item>(200);
// writer block
var flatWriterBlock = new ActionBlock<Item[]>(items =>
{
//Console.WriteLine($"writing {items.Length} to csv");
StringBuilder sb = new StringBuilder();
foreach (var item in items)
{
sb.AppendLine($"{item.ID},{item.Name}");
}
File.AppendAllText(OUT_FILE, sb.ToString());
});
jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });
// start doing the work
var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock);
crawlerTask.Wait();
flatWriterBlock.Completion.Wait();
Console.WriteLine($"ALERT: tplflat.csv row count should match the test data");
Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs");
}
static async Task GetJsons(string filepath, ITargetBlock<string> queue)
{
int count = 1;
foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip"))
{
Console.WriteLine($"working on zip #{count++}");
var zipStream = new FileStream(zip, FileMode.Open);
await ExtractJsonsInMemory(zip, zipStream, queue);
}
queue.Complete();
}
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
ZipArchive archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
}
更新1
我添加了 async
,但我不清楚如何等待所有数据流块完成(c#、async 和 tpl 的新功能)。我基本上想说,"keep running until all of the queues/blocks are empty"。我添加了以下 'wait' 代码,并且似乎可以正常工作。
// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();
简而言之,您的帖子忽略了 return 值。您有两个选择:添加一个未绑定的 BufferBlock
来保存所有传入数据或 await
SendAsync
,这将防止任何消息被丢弃。
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
var archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
您需要将异步一直拉回来,但这应该让您开始。
来自 MSDN,关于 DataflowBlock.Post<TInput>
方法:
Return Value
Type: System.Boolean
true if the item was accepted by the target block; otherwise, false.
所以,这里的问题是您在发送消息时没有检查,管道是否可以接受另一条消息。发生这种情况是因为您对块的选择:
new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }
和这一行:
// this line isn't waiting for long operations and simply drops the message as it can't be accepted by the target block
queue.Post(jsonText);
这里你是说处理应该推迟,直到输入队列长度等于100
。在这种情况下,MSDN 或他的 Introduction to Dataflow 系列中的@StephenCleary 建议使用简单的解决方案:
However, it’s possible to throttle a block by limiting its buffer size; in this case, you could use SendAsync
to (asynchronously) wait for space to be available and then place the data into the block’s input buffer.
因此,正如@JSteward 已经建议的那样,您可以在工作人员之间引入无限缓冲区以避免消息丢失,这是通常的做法,如检查 Post
方法的结果可能会长时间阻塞生产者线程。
问题的第二部分,关于性能,是使用面向 async
的解决方案(对于此类操作,这将完全适合 SendAsync
method usage), as you use I/O operations all the time. Asynchronous operation is basically a way to say the program "start doing this and notify me when it's done". And, as there is no thread,您将通过释放管道中其他操作的线程池。
PS:@JSteward 已经为您提供了该方法的良好示例代码。
我正在尝试最大限度地提高以下任务的性能:
- 枚举 zip 文件的目录
- 在内存中提取 zip 以查找
.json
文件(处理嵌套的 zip) - 解析
json
个文件 - 将
json
文件中的属性写入聚合.CSV
文件
我想要的 TPL 布局是:
producer -> parser block -> batch block -> csv writer block
想法是单个生产者提取 zip 并找到 json 文件,将文本发送到并行 运行 的解析器块(多消费者)。批处理块分组为 200 个批次,写入器块在每次调用时将 200 行转储到 CSV 文件中。
问题:
- jsonParseBlock
TransformBlock
花费的时间越长,丢弃的消息就越多。我怎样才能防止这种情况发生? 我怎样才能更好地利用
TPL
来最大化性能?class Item { public string ID { get; set; } public string Name { get; set; } } class Demo { const string OUT_FILE = @"c:\temp\tplflat.csv"; const string DATA_DIR = @"c:\temp\tpldata"; static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 }; static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }; public static void Run() { Console.WriteLine($"{Environment.ProcessorCount} processors available"); _InitTest(); // reset csv file, generate test data if needed // start TPL stuff var sw = Stopwatch.StartNew(); // transformer var jsonParseBlock = new TransformBlock<string, Item>(rawstr => { var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr); System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost return item; }, parseOpts); // batch block var jsonBatchBlock = new BatchBlock<Item>(200); // writer block var flatWriterBlock = new ActionBlock<Item[]>(items => { //Console.WriteLine($"writing {items.Length} to csv"); StringBuilder sb = new StringBuilder(); foreach (var item in items) { sb.AppendLine($"{item.ID},{item.Name}"); } File.AppendAllText(OUT_FILE, sb.ToString()); }); jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true }); jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true }); // start doing the work var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock); crawlerTask.Wait(); flatWriterBlock.Completion.Wait(); Console.WriteLine($"ALERT: tplflat.csv row count should match the test data"); Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs"); } static async Task GetJsons(string filepath, ITargetBlock<string> queue) { int count = 1; foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip")) { Console.WriteLine($"working on zip #{count++}"); var zipStream = new FileStream(zip, FileMode.Open); await ExtractJsonsInMemory(zip, zipStream, queue); } queue.Complete(); } static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue) { ZipArchive archive = new ZipArchive(stream); foreach (ZipArchiveEntry entry in archive.Entries) { if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase)) { using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8)) { var jsonText = reader.ReadToEnd(); await queue.SendAsync(jsonText); } } else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase)) { await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue); } } } }
更新1
我添加了 async
,但我不清楚如何等待所有数据流块完成(c#、async 和 tpl 的新功能)。我基本上想说,"keep running until all of the queues/blocks are empty"。我添加了以下 'wait' 代码,并且似乎可以正常工作。
// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();
简而言之,您的帖子忽略了 return 值。您有两个选择:添加一个未绑定的 BufferBlock
来保存所有传入数据或 await
SendAsync
,这将防止任何消息被丢弃。
static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
var archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}
您需要将异步一直拉回来,但这应该让您开始。
来自 MSDN,关于 DataflowBlock.Post<TInput>
方法:
Return Value
Type:System.Boolean
true if the item was accepted by the target block; otherwise, false.
所以,这里的问题是您在发送消息时没有检查,管道是否可以接受另一条消息。发生这种情况是因为您对块的选择:
new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }
和这一行:
// this line isn't waiting for long operations and simply drops the message as it can't be accepted by the target block
queue.Post(jsonText);
这里你是说处理应该推迟,直到输入队列长度等于100
。在这种情况下,MSDN 或他的 Introduction to Dataflow 系列中的@StephenCleary 建议使用简单的解决方案:
However, it’s possible to throttle a block by limiting its buffer size; in this case, you could use
SendAsync
to (asynchronously) wait for space to be available and then place the data into the block’s input buffer.
因此,正如@JSteward 已经建议的那样,您可以在工作人员之间引入无限缓冲区以避免消息丢失,这是通常的做法,如检查 Post
方法的结果可能会长时间阻塞生产者线程。
问题的第二部分,关于性能,是使用面向 async
的解决方案(对于此类操作,这将完全适合 SendAsync
method usage), as you use I/O operations all the time. Asynchronous operation is basically a way to say the program "start doing this and notify me when it's done". And, as there is no thread,您将通过释放管道中其他操作的线程池。
PS:@JSteward 已经为您提供了该方法的良好示例代码。