如何在并行循环中正确插入

How to make an insert correctly in parallel loop

我有一个文件夹,我将在其中存放文本文件(200-500mb - 不是很大,但它的文本文件很大),我想并行处理这个文件。 该文件将有

"ComnanyTestIsert", "Firs Comment", "LA 132", "222-33-22", 1
"ComnanyTestIsert1", "Seconds Comment", "LA 132", "222-33-22", 1

例如,我使用了 2 个这样的文件。我不太明白何时将 BufferedStream 与并行循环一起使用 如何设置并行操作数?以及如何正确插入

static void Main(string[] args)
        {
            //Basic usage to help you get started:
            ProcessFileTaskItem(
                new string[] { "\Insert.txt"
                                        , "\Insert1.txt" }
                , "Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=False;ApplicationIntent=ReadWrite;MultiSubnetFailover=False"
                , "test");
        }


        /// This will read an array of input files, process the lines in parallel, and upload
        /// everything into the database.
        public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable)
        {
            //Make sure there's files to read
            if (SourceFiles != null && SourceFiles.Length > 0)
            {
                //Loop through the file array
                Parallel.For(0, SourceFiles.Length, x =>
                //for (int x = 0; x < SourceFiles.Length; x++)
                {
                    //Make sure the file exists and if so open it for reading.
                    if (File.Exists(SourceFiles[x]))
                    {
                        using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString))
                        {
                            connectionDest.Open();

                            //Configure everything to upload to the database via bulk copy.
                            using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null))
                            {
                                //Configure the bulk copy settings
                                sbc.DestinationTableName = DestinationTable;
                                sbc.BulkCopyTimeout = 28800; //8 hours

                                //Now read and process the file
                                ProcessAllLinesInInputFile(SourceFiles[x], connectionDest, sbc);
                            }

                            connectionDest.Close();
                        }

                    }
                    
                } //for
                ); //End Parallel reading of files

                //Explicitly clean up before exiting
                Array.Clear(SourceFiles, 0, SourceFiles.Length);
            }
            
        } 

        /// Processes every line in the source input file.
        private static void ProcessAllLinesInInputFile(string SourceFiles, SqlConnection connectionDest, SqlBulkCopy sbc)
        {
            
            //Create a local data table. Should be the same name as the table
            //in the database you'll be uploading everything to.
            DataTable CurrentRecords = new DataTable("test");

            //The column names. They should match what's in the database table.
            string[] ColumnNames = new string[] { "Name", "Comment", "Address", "Phone", "IsActive" };

            
            using (FileStream fs = File.Open(SourceFiles, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            using (BufferedStream bs = new BufferedStream(fs))
            using (StreamReader sr = new StreamReader(bs))
            {
                string s;
                while ((s = sr.ReadLine()) != null)
                {

                }
            }

            //Create the datatable with the column names.
            for (int x = 0; x < ColumnNames.Length; x++)
                CurrentRecords.Columns.Add(ColumnNames[x], typeof(string));


                //Now process each line in parallel.
                Parallel.For(0, SourceFiles, x =>
                {
                    List<object> values = null; //so each thread gets its own copy. 
                    


                }
        }

Parallel.For 自动调整使用的线程数,但可以在 parallelOptions 参数中指定。

您有任何理由相信并行执行此操作会提高性能吗?多线程不会神奇地让一切变得更快。像这样的 IO 操作通常不会从多线程中受益。特别是如果你有旋转媒体,其中并发 IO 可以大大 减少 吞吐量,甚至 SSD 通常也会受到非顺序 IO 的影响。此外,如果您关心性能,则应该进行适当的测量,以便判断您是否真的在改进。

除非另有说明,否则非静态框架方法不是线程安全的。所以你不应该尝试从多个线程读取同一个流。您可以使用多个流,但如果您有足够的内存,我建议使用 File.ReadAllLines 而不是 ReadLine,我怀疑这样会更快,因为它可以顺序读取所有内容。

同时对同一个 table 执行 >1 批量插入不会给你很好的加速,特别是如果你把 SqlBulkCopyOptions.TableLock

一个更好的加速策略是将您的文件组合成更大的批量插入批次,或者甚至只是 1 个批次。如果您可以将整个批次放入内存中,然后并行读取所有文件(首选使用异步方法,而不是并行方式),将每个文件组合到您的批次中,然后批量插入单个批次。

您可以让它并行上传,但您确实需要分开 table 以使其快速运行。

您的过程中较慢的部分是读取文件中的数据。通常,您的程序必须空闲地等待“硬盘”提供一大块数据。您的程序可以对已经获取的项目进行一些处理,而不是无所事事地等待。

每当你有一个程序,你的进程必须等待一些外部进程,比如写入磁盘,从数据库管理系统查询数据,或者从互联网上获取信息,考虑使用异步是明智的-等待。

如果您使用 async-await,那么,每当您的进程必须等待某个其他进程完成时,它不会空等,而是会四处看看是否可以做其他事情。

在您的情况下,您可以调用一个异步函数,异步读取一个文件并将读取的数据写入数据库。如果您启动其中几个任务,那么只要其中一个任务必须等待文件读取或数据库写入的结果,它就会环顾四周,看看是否可以为其他任务做任何事情。因此,当它在等待任务 A 中读取文件 X 的一大块数据时,它可能已经开始将数据写入任务 B 中的数据库。

当我们逐行处理文件时,我们需要一个 returns 和 IEnumerable<string> 的函数,或者等效的异步函数:IAsyncEnumable<string>。参见 iterating with AsyncEnumerable

public async IAsyncEnumerable<string> ReadLines(string fileName)
{
    using (StreamReader reader = File.OpenText(fileName)
    {
        while(!reader.EndOfStream)
        {
            yield return await reader.ReadLineAsync().ConfigureAwait(false);
        }
    }
}

File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use one of the overloads of the FileStream constructors that have a Boolean parameter isAsync or FileOptions.Asynchronous.

用法:

async Task DisplayFileContentsAsync(string fileName)
{
    await foreach(string line in ReadFileAsync(fileName))
    {
        Console.WriteLine(line);
    }
}

我们还需要一种将读取的数据写入数据库的方法。我在这里一行一行地做,如果你愿意,你可以把它改成一次写几行。

async Task SaveInDbAsync(string line, string dbConnectionString)
{
    using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
    {
        // prepare the SQL command (consider using other methods)
        const string sqlCommandText = @"Insert into ...";
        var dbCommand = dbConnection.CreateCommand();
        dbCommand.CommandText = sqlCommandText;
        dbCommand.Parameters.Add(...)

        // async execute the dbCommand
        await dbConnection.OpenAsync();
        await dbCommand.ExecuteNonQueryAsync();
        // TODO: consider to use the return value to detect problems
    }
}

放在一起:读取一个文件并将行保存在数据库中:

async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
{
    await foreach(string line in ReadFileAsync(fileName))
    {
        await SaveInDbAsync(line, dbConnectionString);
    }
}

要保存所有文件:

async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
{
    // start all Tasks, do not await yet:
    List<Task> tasks = new List<Task>();
    foreach (string fileName in fileNames)
    {
        Task task = SaveFileInDbAsync(fileName, dbConnectionString);
        tasks.Add(task);
    }

    // now that all Tasks are started and happily reading files
    // and writing the read lines to the  database
    // await until all Tasks are finished.
    await Task.WhenAll(tasks);
}

或者如果您需要同步版本:

void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
{
    // start all Tasks, do not await yet:
    List<Task> tasks = new List<Task>();
    foreach (string fileName in fileNames)
    {
        Task task = SaveFileInDbAsync(fileName, dbConnectionString);
        tasks.Add(task);
    }

    // Wait until all Tasks are finished.
    Task.WaitAll(tasks);
}