如何在并行循环中正确插入
How to make an insert correctly in parallel loop
我有一个文件夹,我将在其中存放文本文件(200-500mb - 不是很大,但它的文本文件很大),我想并行处理这个文件。
该文件将有
"ComnanyTestIsert", "Firs Comment", "LA 132", "222-33-22", 1
"ComnanyTestIsert1", "Seconds Comment", "LA 132", "222-33-22", 1
例如,我使用了 2 个这样的文件。我不太明白何时将 BufferedStream 与并行循环一起使用
如何设置并行操作数?以及如何正确插入
static void Main(string[] args)
{
//Basic usage to help you get started:
ProcessFileTaskItem(
new string[] { "\Insert.txt"
, "\Insert1.txt" }
, "Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=False;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
, "test");
}
/// This will read an array of input files, process the lines in parallel, and upload
/// everything into the database.
public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable)
{
//Make sure there's files to read
if (SourceFiles != null && SourceFiles.Length > 0)
{
//Loop through the file array
Parallel.For(0, SourceFiles.Length, x =>
//for (int x = 0; x < SourceFiles.Length; x++)
{
//Make sure the file exists and if so open it for reading.
if (File.Exists(SourceFiles[x]))
{
using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString))
{
connectionDest.Open();
//Configure everything to upload to the database via bulk copy.
using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null))
{
//Configure the bulk copy settings
sbc.DestinationTableName = DestinationTable;
sbc.BulkCopyTimeout = 28800; //8 hours
//Now read and process the file
ProcessAllLinesInInputFile(SourceFiles[x], connectionDest, sbc);
}
connectionDest.Close();
}
}
} //for
); //End Parallel reading of files
//Explicitly clean up before exiting
Array.Clear(SourceFiles, 0, SourceFiles.Length);
}
}
/// Processes every line in the source input file.
private static void ProcessAllLinesInInputFile(string SourceFiles, SqlConnection connectionDest, SqlBulkCopy sbc)
{
//Create a local data table. Should be the same name as the table
//in the database you'll be uploading everything to.
DataTable CurrentRecords = new DataTable("test");
//The column names. They should match what's in the database table.
string[] ColumnNames = new string[] { "Name", "Comment", "Address", "Phone", "IsActive" };
using (FileStream fs = File.Open(SourceFiles, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
string s;
while ((s = sr.ReadLine()) != null)
{
}
}
//Create the datatable with the column names.
for (int x = 0; x < ColumnNames.Length; x++)
CurrentRecords.Columns.Add(ColumnNames[x], typeof(string));
//Now process each line in parallel.
Parallel.For(0, SourceFiles, x =>
{
List<object> values = null; //so each thread gets its own copy.
}
}
Parallel.For 自动调整使用的线程数,但可以在 parallelOptions 参数中指定。
您有任何理由相信并行执行此操作会提高性能吗?多线程不会神奇地让一切变得更快。像这样的 IO 操作通常不会从多线程中受益。特别是如果你有旋转媒体,其中并发 IO 可以大大 减少 吞吐量,甚至 SSD 通常也会受到非顺序 IO 的影响。此外,如果您关心性能,则应该进行适当的测量,以便判断您是否真的在改进。
除非另有说明,否则非静态框架方法不是线程安全的。所以你不应该尝试从多个线程读取同一个流。您可以使用多个流,但如果您有足够的内存,我建议使用 File.ReadAllLines
而不是 ReadLine
,我怀疑这样会更快,因为它可以顺序读取所有内容。
同时对同一个 table 执行 >1 批量插入不会给你很好的加速,特别是如果你把 SqlBulkCopyOptions.TableLock
一个更好的加速策略是将您的文件组合成更大的批量插入批次,或者甚至只是 1 个批次。如果您可以将整个批次放入内存中,然后并行读取所有文件(首选使用异步方法,而不是并行方式),将每个文件组合到您的批次中,然后批量插入单个批次。
您可以让它并行上传,但您确实需要分开 table 以使其快速运行。
您的过程中较慢的部分是读取文件中的数据。通常,您的程序必须空闲地等待“硬盘”提供一大块数据。您的程序可以对已经获取的项目进行一些处理,而不是无所事事地等待。
每当你有一个程序,你的进程必须等待一些外部进程,比如写入磁盘,从数据库管理系统查询数据,或者从互联网上获取信息,考虑使用异步是明智的-等待。
如果您使用 async-await,那么,每当您的进程必须等待某个其他进程完成时,它不会空等,而是会四处看看是否可以做其他事情。
在您的情况下,您可以调用一个异步函数,异步读取一个文件并将读取的数据写入数据库。如果您启动其中几个任务,那么只要其中一个任务必须等待文件读取或数据库写入的结果,它就会环顾四周,看看是否可以为其他任务做任何事情。因此,当它在等待任务 A 中读取文件 X 的一大块数据时,它可能已经开始将数据写入任务 B 中的数据库。
当我们逐行处理文件时,我们需要一个 returns 和 IEnumerable<string>
的函数,或者等效的异步函数:IAsyncEnumable<string>
。参见 iterating with AsyncEnumerable
public async IAsyncEnumerable<string> ReadLines(string fileName)
{
using (StreamReader reader = File.OpenText(fileName)
{
while(!reader.EndOfStream)
{
yield return await reader.ReadLineAsync().ConfigureAwait(false);
}
}
}
File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use one of the overloads of the FileStream constructors that have a Boolean parameter isAsync or FileOptions.Asynchronous.
用法:
async Task DisplayFileContentsAsync(string fileName)
{
await foreach(string line in ReadFileAsync(fileName))
{
Console.WriteLine(line);
}
}
我们还需要一种将读取的数据写入数据库的方法。我在这里一行一行地做,如果你愿意,你可以把它改成一次写几行。
async Task SaveInDbAsync(string line, string dbConnectionString)
{
using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
{
// prepare the SQL command (consider using other methods)
const string sqlCommandText = @"Insert into ...";
var dbCommand = dbConnection.CreateCommand();
dbCommand.CommandText = sqlCommandText;
dbCommand.Parameters.Add(...)
// async execute the dbCommand
await dbConnection.OpenAsync();
await dbCommand.ExecuteNonQueryAsync();
// TODO: consider to use the return value to detect problems
}
}
放在一起:读取一个文件并将行保存在数据库中:
async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
{
await foreach(string line in ReadFileAsync(fileName))
{
await SaveInDbAsync(line, dbConnectionString);
}
}
要保存所有文件:
async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}
// now that all Tasks are started and happily reading files
// and writing the read lines to the database
// await until all Tasks are finished.
await Task.WhenAll(tasks);
}
或者如果您需要同步版本:
void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}
// Wait until all Tasks are finished.
Task.WaitAll(tasks);
}
我有一个文件夹,我将在其中存放文本文件(200-500mb - 不是很大,但它的文本文件很大),我想并行处理这个文件。 该文件将有
"ComnanyTestIsert", "Firs Comment", "LA 132", "222-33-22", 1
"ComnanyTestIsert1", "Seconds Comment", "LA 132", "222-33-22", 1
例如,我使用了 2 个这样的文件。我不太明白何时将 BufferedStream 与并行循环一起使用 如何设置并行操作数?以及如何正确插入
static void Main(string[] args)
{
//Basic usage to help you get started:
ProcessFileTaskItem(
new string[] { "\Insert.txt"
, "\Insert1.txt" }
, "Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=False;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
, "test");
}
/// This will read an array of input files, process the lines in parallel, and upload
/// everything into the database.
public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable)
{
//Make sure there's files to read
if (SourceFiles != null && SourceFiles.Length > 0)
{
//Loop through the file array
Parallel.For(0, SourceFiles.Length, x =>
//for (int x = 0; x < SourceFiles.Length; x++)
{
//Make sure the file exists and if so open it for reading.
if (File.Exists(SourceFiles[x]))
{
using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString))
{
connectionDest.Open();
//Configure everything to upload to the database via bulk copy.
using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null))
{
//Configure the bulk copy settings
sbc.DestinationTableName = DestinationTable;
sbc.BulkCopyTimeout = 28800; //8 hours
//Now read and process the file
ProcessAllLinesInInputFile(SourceFiles[x], connectionDest, sbc);
}
connectionDest.Close();
}
}
} //for
); //End Parallel reading of files
//Explicitly clean up before exiting
Array.Clear(SourceFiles, 0, SourceFiles.Length);
}
}
/// Processes every line in the source input file.
private static void ProcessAllLinesInInputFile(string SourceFiles, SqlConnection connectionDest, SqlBulkCopy sbc)
{
//Create a local data table. Should be the same name as the table
//in the database you'll be uploading everything to.
DataTable CurrentRecords = new DataTable("test");
//The column names. They should match what's in the database table.
string[] ColumnNames = new string[] { "Name", "Comment", "Address", "Phone", "IsActive" };
using (FileStream fs = File.Open(SourceFiles, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
string s;
while ((s = sr.ReadLine()) != null)
{
}
}
//Create the datatable with the column names.
for (int x = 0; x < ColumnNames.Length; x++)
CurrentRecords.Columns.Add(ColumnNames[x], typeof(string));
//Now process each line in parallel.
Parallel.For(0, SourceFiles, x =>
{
List<object> values = null; //so each thread gets its own copy.
}
}
Parallel.For 自动调整使用的线程数,但可以在 parallelOptions 参数中指定。
您有任何理由相信并行执行此操作会提高性能吗?多线程不会神奇地让一切变得更快。像这样的 IO 操作通常不会从多线程中受益。特别是如果你有旋转媒体,其中并发 IO 可以大大 减少 吞吐量,甚至 SSD 通常也会受到非顺序 IO 的影响。此外,如果您关心性能,则应该进行适当的测量,以便判断您是否真的在改进。
除非另有说明,否则非静态框架方法不是线程安全的。所以你不应该尝试从多个线程读取同一个流。您可以使用多个流,但如果您有足够的内存,我建议使用 File.ReadAllLines
而不是 ReadLine
,我怀疑这样会更快,因为它可以顺序读取所有内容。
同时对同一个 table 执行 >1 批量插入不会给你很好的加速,特别是如果你把 SqlBulkCopyOptions.TableLock
一个更好的加速策略是将您的文件组合成更大的批量插入批次,或者甚至只是 1 个批次。如果您可以将整个批次放入内存中,然后并行读取所有文件(首选使用异步方法,而不是并行方式),将每个文件组合到您的批次中,然后批量插入单个批次。
您可以让它并行上传,但您确实需要分开 table 以使其快速运行。
您的过程中较慢的部分是读取文件中的数据。通常,您的程序必须空闲地等待“硬盘”提供一大块数据。您的程序可以对已经获取的项目进行一些处理,而不是无所事事地等待。
每当你有一个程序,你的进程必须等待一些外部进程,比如写入磁盘,从数据库管理系统查询数据,或者从互联网上获取信息,考虑使用异步是明智的-等待。
如果您使用 async-await,那么,每当您的进程必须等待某个其他进程完成时,它不会空等,而是会四处看看是否可以做其他事情。
在您的情况下,您可以调用一个异步函数,异步读取一个文件并将读取的数据写入数据库。如果您启动其中几个任务,那么只要其中一个任务必须等待文件读取或数据库写入的结果,它就会环顾四周,看看是否可以为其他任务做任何事情。因此,当它在等待任务 A 中读取文件 X 的一大块数据时,它可能已经开始将数据写入任务 B 中的数据库。
当我们逐行处理文件时,我们需要一个 returns 和 IEnumerable<string>
的函数,或者等效的异步函数:IAsyncEnumable<string>
。参见 iterating with AsyncEnumerable
public async IAsyncEnumerable<string> ReadLines(string fileName)
{
using (StreamReader reader = File.OpenText(fileName)
{
while(!reader.EndOfStream)
{
yield return await reader.ReadLineAsync().ConfigureAwait(false);
}
}
}
File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use one of the overloads of the FileStream constructors that have a Boolean parameter isAsync or FileOptions.Asynchronous.
用法:
async Task DisplayFileContentsAsync(string fileName)
{
await foreach(string line in ReadFileAsync(fileName))
{
Console.WriteLine(line);
}
}
我们还需要一种将读取的数据写入数据库的方法。我在这里一行一行地做,如果你愿意,你可以把它改成一次写几行。
async Task SaveInDbAsync(string line, string dbConnectionString)
{
using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
{
// prepare the SQL command (consider using other methods)
const string sqlCommandText = @"Insert into ...";
var dbCommand = dbConnection.CreateCommand();
dbCommand.CommandText = sqlCommandText;
dbCommand.Parameters.Add(...)
// async execute the dbCommand
await dbConnection.OpenAsync();
await dbCommand.ExecuteNonQueryAsync();
// TODO: consider to use the return value to detect problems
}
}
放在一起:读取一个文件并将行保存在数据库中:
async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
{
await foreach(string line in ReadFileAsync(fileName))
{
await SaveInDbAsync(line, dbConnectionString);
}
}
要保存所有文件:
async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}
// now that all Tasks are started and happily reading files
// and writing the read lines to the database
// await until all Tasks are finished.
await Task.WhenAll(tasks);
}
或者如果您需要同步版本:
void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}
// Wait until all Tasks are finished.
Task.WaitAll(tasks);
}