这个 C# Parallel.ForEach "localFinally" 方法线程安全吗?
Is this C# Parallel.ForEach "localFinally" method thread safe?
由于 MicroSoft 所述的好处,我正在使用 Parallel.ForEach
实现大量 SQL SELECT 语句(其中数千个) (顺便说一句:我现在找不到它,但我很确定我在某处读到 Parallel.ForEach 不会 return 控制直到所有迭代都已执行。这是真的吗?)
我从 Parallel.Foreach SQL querying sometimes results in Connection 开始。我不想使用已接受的答案,因为它放弃了 Parallel.ForEach
而只使用了 Task.Run
(或 Task Factory.StartNew
for .Net 4.0)。同时,O.P。使用 "lock" 将更新同步到 DataTable
列表,据我所知这可能会降低 Parallel.ForEach
.
的效率
所以,利用文章How to: Write a Parallel.ForEach Loop with Thread-Local Variables,我写了下面的稻草人代码。目标是将 threadLocalDataTable
(每个 select 语句一个)累积到 dataTableList
,一旦所有查询完成,即 returned。它有效,但我想知道 localFinally
方法是否真的是线程安全的(请参阅带有注释 //<-- the localFinally method in question
的代码行。 注意:SqlOperation class 实现了一个连接字符串,输出数据表名称,和select查询字符串
public static IList<DataTable> GetAllData(IEnumerable<SqlOperation> sqlList)
{
IList<DataTable> dataTableList = new List<DataTable>();
dataTableList.Clear();
try
{
Parallel.ForEach<SqlOperation, DataTable>(sqlList, () => null, (s, loop, threadLocalDataTable) =>
{
DataTable dataTable = null;
using (SqlCommand sqlCommand = new SqlCommand())
{
using (SqlConnection sqlConnection = new SqlConnection(s.ConnectionString))
{
sqlConnection.Open();
sqlCommand.CommandType = CommandType.Text;
sqlCommand.Connection = sqlConnection;
sqlCommand.CommandText = s.SelectQuery;
SqlDataAdapter sqlDataAdapter = new SqlDataAdapter(sqlCommand);
dataTable = new DataTable
{
TableName = s.DataTableName
};
dataTable.Clear();
dataTable.Rows.Clear();
dataTable.Columns.Clear();
sqlDataAdapter.Fill(dataTable);
sqlDataAdapter.Dispose();
sqlConnection.Close();
}
}
return dataTable;
}, (threadLocalDataTable) => dataTableList.Add(threadLocalDataTable) //<-- the localFinally method in question
);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString(), "GetAllData error");
}
return dataTableList;
}
您提供的 link 文档明确指出(关于 localFinally):
This delegate can be invoked concurrently by multiple tasks
所以不,它不是线程安全的。但这不是唯一的问题 - 您的整个实施都不正确。
"Thread local" 传递给每个循环迭代的变量是累加器。它累积了在同一个线程上多次迭代 运行 得到的结果。您的实现完全忽略了该变量,并且始终 returns 一个数据 table。这意味着如果您的 table 多于线程的并行循环数(默认为处理器的内核数)-您实际上正在丢失结果,因为您忽略了累加器。
正确的方法是使用 List<DataTable>
作为累加器,而不是单个累加器。例如:
Parallel.ForEach<SqlOperation, List<DataTable>>(
sqlList,
() => new List<DataTable>(), // initialize accumulator
(s, loop, threadLocalDataTables) =>
{
DataTable result = ...;
// add, that's thread safe
threadLocalDataTables.Add(result);
// return accumulator to next iteration
return threadLocalDataTables;
}, (threadLocalDataTables) =>
{
//<-- the localFinally method in question
lock (dataTableList) // lock and merge results
dataTableList.AddRange(threadLocalDataTables);
});
也就是说 - 使用 Parallel.ForEach
来加速 IO-bound 工作(比如进行 SQL 查询)不是一个好主意,使用 async\await 会做得更好。但这超出了这个问题的范围。
由于 MicroSoft 所述的好处,我正在使用 Parallel.ForEach
实现大量 SQL SELECT 语句(其中数千个) (顺便说一句:我现在找不到它,但我很确定我在某处读到 Parallel.ForEach 不会 return 控制直到所有迭代都已执行。这是真的吗?)
我从 Parallel.Foreach SQL querying sometimes results in Connection 开始。我不想使用已接受的答案,因为它放弃了 Parallel.ForEach
而只使用了 Task.Run
(或 Task Factory.StartNew
for .Net 4.0)。同时,O.P。使用 "lock" 将更新同步到 DataTable
列表,据我所知这可能会降低 Parallel.ForEach
.
所以,利用文章How to: Write a Parallel.ForEach Loop with Thread-Local Variables,我写了下面的稻草人代码。目标是将 threadLocalDataTable
(每个 select 语句一个)累积到 dataTableList
,一旦所有查询完成,即 returned。它有效,但我想知道 localFinally
方法是否真的是线程安全的(请参阅带有注释 //<-- the localFinally method in question
的代码行。 注意:SqlOperation class 实现了一个连接字符串,输出数据表名称,和select查询字符串
public static IList<DataTable> GetAllData(IEnumerable<SqlOperation> sqlList)
{
IList<DataTable> dataTableList = new List<DataTable>();
dataTableList.Clear();
try
{
Parallel.ForEach<SqlOperation, DataTable>(sqlList, () => null, (s, loop, threadLocalDataTable) =>
{
DataTable dataTable = null;
using (SqlCommand sqlCommand = new SqlCommand())
{
using (SqlConnection sqlConnection = new SqlConnection(s.ConnectionString))
{
sqlConnection.Open();
sqlCommand.CommandType = CommandType.Text;
sqlCommand.Connection = sqlConnection;
sqlCommand.CommandText = s.SelectQuery;
SqlDataAdapter sqlDataAdapter = new SqlDataAdapter(sqlCommand);
dataTable = new DataTable
{
TableName = s.DataTableName
};
dataTable.Clear();
dataTable.Rows.Clear();
dataTable.Columns.Clear();
sqlDataAdapter.Fill(dataTable);
sqlDataAdapter.Dispose();
sqlConnection.Close();
}
}
return dataTable;
}, (threadLocalDataTable) => dataTableList.Add(threadLocalDataTable) //<-- the localFinally method in question
);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString(), "GetAllData error");
}
return dataTableList;
}
您提供的 link 文档明确指出(关于 localFinally):
This delegate can be invoked concurrently by multiple tasks
所以不,它不是线程安全的。但这不是唯一的问题 - 您的整个实施都不正确。
"Thread local" 传递给每个循环迭代的变量是累加器。它累积了在同一个线程上多次迭代 运行 得到的结果。您的实现完全忽略了该变量,并且始终 returns 一个数据 table。这意味着如果您的 table 多于线程的并行循环数(默认为处理器的内核数)-您实际上正在丢失结果,因为您忽略了累加器。
正确的方法是使用 List<DataTable>
作为累加器,而不是单个累加器。例如:
Parallel.ForEach<SqlOperation, List<DataTable>>(
sqlList,
() => new List<DataTable>(), // initialize accumulator
(s, loop, threadLocalDataTables) =>
{
DataTable result = ...;
// add, that's thread safe
threadLocalDataTables.Add(result);
// return accumulator to next iteration
return threadLocalDataTables;
}, (threadLocalDataTables) =>
{
//<-- the localFinally method in question
lock (dataTableList) // lock and merge results
dataTableList.AddRange(threadLocalDataTables);
});
也就是说 - 使用 Parallel.ForEach
来加速 IO-bound 工作(比如进行 SQL 查询)不是一个好主意,使用 async\await 会做得更好。但这超出了这个问题的范围。