这个 C# Parallel.ForEach "localFinally" 方法线程安全吗?

Is this C# Parallel.ForEach "localFinally" method thread safe?

由于 MicroSoft 所述的好处,我正在使用 Parallel.ForEach 实现大量 SQL SELECT 语句(其中数千个) (顺便说一句:我现在找不到它,但我很确定我在某处读到 Parallel.ForEach 不会 return 控制直到所有迭代都已执行。这是真的吗?)

我从 Parallel.Foreach SQL querying sometimes results in Connection 开始。我不想使用已接受的答案,因为它放弃了 Parallel.ForEach 而只使用了 Task.Run (或 Task Factory.StartNew for .Net 4.0)。同时,O.P。使用 "lock" 将更新同步到 DataTable 列表,据我所知这可能会降低 Parallel.ForEach.

的效率

所以,利用文章How to: Write a Parallel.ForEach Loop with Thread-Local Variables,我写了下面的稻草人代码。目标是将 threadLocalDataTable(每个 select 语句一个)累积到 dataTableList,一旦所有查询完成,即 returned。它有效,但我想知道 localFinally 方法是否真的是线程安全的(请参阅带有注释 //<-- the localFinally method in question 的代码行。 注意:SqlOperation class 实现了一个连接字符串,输出数据表名称,和select查询字符串

public static IList<DataTable> GetAllData(IEnumerable<SqlOperation> sqlList)
{
    IList<DataTable> dataTableList = new List<DataTable>();
    dataTableList.Clear();

    try
    {

        Parallel.ForEach<SqlOperation, DataTable>(sqlList, () => null, (s, loop, threadLocalDataTable) =>
        {

            DataTable dataTable = null;

            using (SqlCommand sqlCommand = new SqlCommand())
            {
                using (SqlConnection sqlConnection = new SqlConnection(s.ConnectionString))
                {
                    sqlConnection.Open();
                    sqlCommand.CommandType = CommandType.Text;
                    sqlCommand.Connection = sqlConnection;
                    sqlCommand.CommandText = s.SelectQuery;
                    SqlDataAdapter sqlDataAdapter = new SqlDataAdapter(sqlCommand);

                    dataTable = new DataTable
                    {
                        TableName = s.DataTableName
                    };

                    dataTable.Clear();
                    dataTable.Rows.Clear();
                    dataTable.Columns.Clear();

                    sqlDataAdapter.Fill(dataTable);
                    sqlDataAdapter.Dispose();
                    sqlConnection.Close();
                }
            }

            return dataTable;


        }, (threadLocalDataTable) => dataTableList.Add(threadLocalDataTable) //<-- the localFinally method in question
        );
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString(), "GetAllData error");
    }

    return dataTableList;
}

您提供的 link 文档明确指出(关于 localFinally):

This delegate can be invoked concurrently by multiple tasks

所以不,它不是线程安全的。但这不是唯一的问题 - 您的整个实施都不正确。

"Thread local" 传递给每个循环迭代的变量是累加器。它累积了在同一个线程上多次迭代 运行 得到的结果。您的实现完全忽略了该变量,并且始终 returns 一个数据 table。这意味着如果您的 table 多于线程的并行循环数(默认为处理器的内核数)-您实际上正在丢失结果,因为您忽略了累加器。

正确的方法是使用 List<DataTable> 作为累加器,而不是单个累加器。例如:

Parallel.ForEach<SqlOperation, List<DataTable>>(
    sqlList, 
    () => new List<DataTable>(), // initialize accumulator
    (s, loop, threadLocalDataTables) =>
    {
        DataTable result = ...;
        // add, that's thread safe
        threadLocalDataTables.Add(result);
        // return accumulator to next iteration
        return threadLocalDataTables;
    }, (threadLocalDataTables) =>
    {
        //<-- the localFinally method in question
        lock (dataTableList) // lock and merge results
           dataTableList.AddRange(threadLocalDataTables);
    });

也就是说 - 使用 Parallel.ForEach 来加速 IO-bound 工作(比如进行 SQL 查询)不是一个好主意,使用 async\await 会做得更好。但这超出了这个问题的范围。