CsvHelper - 异步读取流
CsvHelper - Reading Stream Asynchronously
我有一项服务接受包含需要批量插入数据库的 CSV 数据的输入流,我的应用程序尽可能使用 async/await。
过程是:使用CsvHelper的CsvParser解析流,将每一行添加到DataTable,使用SqlBulkCopy将DataTable复制到数据库中。
数据可以是任意大小,所以我想避免一次将整个数据读入内存 - 显然我最终会在 DataTable 中拥有所有数据,因此基本上有 2 个副本内存。
我想尽可能异步地完成所有这些操作,但 CsvHelper 没有任何异步方法,所以我想出了以下解决方法:
using (var inputStreamReader = new StreamReader(inputStream))
{
while (!inputStreamReader.EndOfStream)
{
// Read line from the input stream
string line = await inputStreamReader.ReadLineAsync();
using (var memoryStream = new MemoryStream())
using (var streamWriter = new StreamWriter(memoryStream))
using (var memoryStreamReader = new StreamReader(memoryStream))
using (var csvParser = new CsvParser(memoryStreamReader))
{
await streamWriter.WriteLineAsync(line);
await streamWriter.FlushAsync();
memoryStream.Position = 0;
// Loop through all the rows (should only be one as we only read a single line...)
while (true)
{
var row = csvParser.Read();
// No more rows to process
if (row == null)
{
break;
}
// Add row to DataTable
}
}
}
}
这个解决方案有什么问题吗?甚至有必要吗?我已经看到 CsvHelper 开发人员特别没有添加异步功能 (https://github.com/JoshClose/CsvHelper/issues/202),但我并没有真正理解不这样做的原因。
编辑:我刚刚意识到这个解决方案不适用于列包含换行符的情况:(我猜我只需要将整个输入流复制到 MemoryStream 或其他东西
EDIT2:更多信息。
这是一个库中的异步方法,我在其中尝试一直执行异步操作。它可能会被 MVC 控制器使用(如果我只是想从 UI 线程卸载它,我会 Task.Run() 它)。大多数情况下,该方法将等待外部资源,例如数据库/DFS,我希望在线程处于释放状态时将其释放。
CsvParser.Read() 会阻塞,即使阻塞的是读取流(例如,如果我试图读取的数据驻留在世界另一端的服务器上),而如果CsvHelper 将实现一个使用 TextReader.ReadAsync() 的异步方法,这样我就不会因为等待我的数据从迪拜到达而被阻塞。据我所知,我并没有要求围绕同步方法的异步包装器。
EDIT3:从遥远的未来更新!异步功能实际上早在 2017 年就被添加到 CsvHelper 中。我希望我工作的公司的某个人从那时起已经升级到更新的版本!
这是一篇关于在同步方法上公开异步包装器的好文章,以及为什么 CsvHelper 没有这样做。 http://blogs.msdn.com/b/pfxteam/archive/2012/03/24/10287244.aspx
如果您不想阻塞 UI 线程,运行 在后台线程上处理。
CsvHelper 拉入数据缓冲区。缓冲区的大小是一个设置,您可以根据需要进行更改。如果你的服务器在世界的另一边,它会缓冲一些数据,然后读取它。更有可能的是,在使用缓冲区之前需要多次读取。
CsvHelper 也生成记录,因此如果您实际上没有得到一行,则不会读取任何内容。如果您只读取几行,则只会读取那么多的文件(实际上是缓冲区大小)。
如果您担心内存问题,有几个简单的选项。
- 缓冲数据。您可以一次批量复制 100 行或 1000 行,而不是整个文件。继续这样做,直到文件完成。
- 使用文件流。如果出于某种原因需要一次读取整个文件,请改用 FileStream 并将整个文件写入光盘。它会更慢,但你不会使用一堆内存。
Eric lippert 使用 a metaphor of cooking a meal in a restaurant 解释了 async-await 的用途。根据他的解释,如果您的线程无事可做,那么异步执行某些操作是没有用的。
此外,请注意,当您的线程正在做某事时,它不能做其他事情。只有当您的线程正在等待某事时,它才能做其他事情。您在流程中等待的事情之一是读取文件。当线程逐行读取文件时,它必须等待几次才能读取行。在此等待期间,它可以做其他事情,例如解析读取的 CSV 数据并将解析后的数据发送到您的目的地。
解析数据不是您的线程必须等待其他进程完成的过程,就像读取文件或将数据发送到数据库时必须做的那样。这就是为什么没有解析过程的异步版本。普通的异步等待不会帮助您的线程保持忙碌,因为在解析过程中没有什么可等待的,因此在解析过程中您的线程将没有时间做其他事情。
您当然可以使用 Task.Run ( () => ParseReadData(...)) 将解析过程转换为可等待的任务,并等待此任务完成,但在 Eric 的类比中Lippert 的餐厅,这将由一名厨师来解冻,而你却坐在柜台后面什么都不做。
但是,如果您的线程有一些有意义的事情要做,同时正在解析读取的 CSV 数据,例如响应用户输入,那么在单独的任务中开始解析可能会很有用。
如果你完整的读取-解析-更新数据库过程不需要与用户交互,但你需要你的线程在做这个过程的同时可以自由地做其他事情,可以考虑将完整的过程放在一个单独的任务中, 并开始任务而不等待它。在那种情况下,您只需要使用您的界面线程来启动其他任务,并且您的界面线程可以自由地做其他事情。与您的流程的总时间相比,开始这个新任务的成本相对较小。
再说一遍:如果你的线程没有别的事可做,让这个线程去做处理,不要启动其他任务去做。
我有一项服务接受包含需要批量插入数据库的 CSV 数据的输入流,我的应用程序尽可能使用 async/await。
过程是:使用CsvHelper的CsvParser解析流,将每一行添加到DataTable,使用SqlBulkCopy将DataTable复制到数据库中。
数据可以是任意大小,所以我想避免一次将整个数据读入内存 - 显然我最终会在 DataTable 中拥有所有数据,因此基本上有 2 个副本内存。
我想尽可能异步地完成所有这些操作,但 CsvHelper 没有任何异步方法,所以我想出了以下解决方法:
using (var inputStreamReader = new StreamReader(inputStream))
{
while (!inputStreamReader.EndOfStream)
{
// Read line from the input stream
string line = await inputStreamReader.ReadLineAsync();
using (var memoryStream = new MemoryStream())
using (var streamWriter = new StreamWriter(memoryStream))
using (var memoryStreamReader = new StreamReader(memoryStream))
using (var csvParser = new CsvParser(memoryStreamReader))
{
await streamWriter.WriteLineAsync(line);
await streamWriter.FlushAsync();
memoryStream.Position = 0;
// Loop through all the rows (should only be one as we only read a single line...)
while (true)
{
var row = csvParser.Read();
// No more rows to process
if (row == null)
{
break;
}
// Add row to DataTable
}
}
}
}
这个解决方案有什么问题吗?甚至有必要吗?我已经看到 CsvHelper 开发人员特别没有添加异步功能 (https://github.com/JoshClose/CsvHelper/issues/202),但我并没有真正理解不这样做的原因。
编辑:我刚刚意识到这个解决方案不适用于列包含换行符的情况:(我猜我只需要将整个输入流复制到 MemoryStream 或其他东西
EDIT2:更多信息。
这是一个库中的异步方法,我在其中尝试一直执行异步操作。它可能会被 MVC 控制器使用(如果我只是想从 UI 线程卸载它,我会 Task.Run() 它)。大多数情况下,该方法将等待外部资源,例如数据库/DFS,我希望在线程处于释放状态时将其释放。
CsvParser.Read() 会阻塞,即使阻塞的是读取流(例如,如果我试图读取的数据驻留在世界另一端的服务器上),而如果CsvHelper 将实现一个使用 TextReader.ReadAsync() 的异步方法,这样我就不会因为等待我的数据从迪拜到达而被阻塞。据我所知,我并没有要求围绕同步方法的异步包装器。
EDIT3:从遥远的未来更新!异步功能实际上早在 2017 年就被添加到 CsvHelper 中。我希望我工作的公司的某个人从那时起已经升级到更新的版本!
这是一篇关于在同步方法上公开异步包装器的好文章,以及为什么 CsvHelper 没有这样做。 http://blogs.msdn.com/b/pfxteam/archive/2012/03/24/10287244.aspx
如果您不想阻塞 UI 线程,运行 在后台线程上处理。
CsvHelper 拉入数据缓冲区。缓冲区的大小是一个设置,您可以根据需要进行更改。如果你的服务器在世界的另一边,它会缓冲一些数据,然后读取它。更有可能的是,在使用缓冲区之前需要多次读取。
CsvHelper 也生成记录,因此如果您实际上没有得到一行,则不会读取任何内容。如果您只读取几行,则只会读取那么多的文件(实际上是缓冲区大小)。
如果您担心内存问题,有几个简单的选项。
- 缓冲数据。您可以一次批量复制 100 行或 1000 行,而不是整个文件。继续这样做,直到文件完成。
- 使用文件流。如果出于某种原因需要一次读取整个文件,请改用 FileStream 并将整个文件写入光盘。它会更慢,但你不会使用一堆内存。
Eric lippert 使用 a metaphor of cooking a meal in a restaurant 解释了 async-await 的用途。根据他的解释,如果您的线程无事可做,那么异步执行某些操作是没有用的。
此外,请注意,当您的线程正在做某事时,它不能做其他事情。只有当您的线程正在等待某事时,它才能做其他事情。您在流程中等待的事情之一是读取文件。当线程逐行读取文件时,它必须等待几次才能读取行。在此等待期间,它可以做其他事情,例如解析读取的 CSV 数据并将解析后的数据发送到您的目的地。
解析数据不是您的线程必须等待其他进程完成的过程,就像读取文件或将数据发送到数据库时必须做的那样。这就是为什么没有解析过程的异步版本。普通的异步等待不会帮助您的线程保持忙碌,因为在解析过程中没有什么可等待的,因此在解析过程中您的线程将没有时间做其他事情。
您当然可以使用 Task.Run ( () => ParseReadData(...)) 将解析过程转换为可等待的任务,并等待此任务完成,但在 Eric 的类比中Lippert 的餐厅,这将由一名厨师来解冻,而你却坐在柜台后面什么都不做。
但是,如果您的线程有一些有意义的事情要做,同时正在解析读取的 CSV 数据,例如响应用户输入,那么在单独的任务中开始解析可能会很有用。
如果你完整的读取-解析-更新数据库过程不需要与用户交互,但你需要你的线程在做这个过程的同时可以自由地做其他事情,可以考虑将完整的过程放在一个单独的任务中, 并开始任务而不等待它。在那种情况下,您只需要使用您的界面线程来启动其他任务,并且您的界面线程可以自由地做其他事情。与您的流程的总时间相比,开始这个新任务的成本相对较小。
再说一遍:如果你的线程没有别的事可做,让这个线程去做处理,不要启动其他任务去做。