在并行处理期间维护 DataTable 行的顺序
Maintaining order of DataTable rows during parallel processing
以下是当前代码:
Parallel.ForEach(dataTable.AsEnumerable(),row => {
// Code to process the data row to Dictionary<object,object>
// Unique Column name is the Dictionary Key
// ConcurrentDictionary is used for thread safety
});
这里我使用Parallel.ForEach
将DataTable
的行处理成Dictionary<object,object>
类型的对象,最终结果是List<Dictionary<object,object>>
类型的对象,使用中间线程实现安全结构 ConcurrentQueue<Dictionary<object,object>>
,DataTable
的来源按给定顺序对数据进行排序,但在并行处理期间总是会丢失。由于顺序很重要,所以我想出了以下解决方法:
Parallel.For(0,RowCount,index => {
int rowIndex = index;
// Access the rows using the Index
// Final structure will be of type ConcurrentDictionary<Custom>,
// with the RowIndex assigned based on original index
});
Class Custom
{
public int RowIndex { get; set; }
public Dictionary<object,object> DataDictionary {get; set;}
}
ConcurrentQueue<Dictionary<Custom>> customObj
类型的最终结果使用以下代码处理:
customObj.OrderBy(x=>x.RowIndex).Select(y=>y.DataDictionary).ToList()
以下是我的问题:
有没有更好的方法来实现相同的并行处理,我可以保持原始顺序,这是最重要的业务需求
在最终解决方案中,我需要局部变量 rowIndex
,我的理解是 index
是并行循环的一部分,不会导致关闭问题
有什么指点吗?
首先,您可以在 Parallel.ForEach 中获取 Index 而不是使用 Parallel.For
Parallel.ForEach(dataTable.AsEnumerable(), (line, state, index) =>
{
Console.WriteLine("{0} : {1}", index, line);
});
如我所见,主要目的是避免OrderBy。
为此,请在 ForLoop
之前创建您的
var lines = new YourClass[NumberOfElemnts] ;
在此之后,您可以使用您想要的任何循环填充此列表。让我们使用 Parallel.For
Parallel.For(0, NumberOfElemnts, i =>
{
lines[i]=dataTable[i];
});
根据@Panagiotis Kanavos 的备注编辑
这个呢
var items = new ConcurrentDictionary<DataRow, Dictionary<object,object>>;
Parallel.ForEach(dataTable.AsEnumerable(),row => {
var result = ...;
items.Add(row, result);
});
var finalResult = dataTable.Rows.Cast<DataRow>().Select(r => items[r]).ToList());
您可以利用 PLINQ
和
的 ParallelEnumerable.AsOrdered
扩展方法
Enables treatment of a data source as if it was ordered, overriding the default of unordered.
在您的示例中,您可以按以下方式使用它:
var result = dataTable.AsEnumerable().AsParallel().AsOrdered()
.Select(/*Process the row to dictionary*/).ToList();
以下是当前代码:
Parallel.ForEach(dataTable.AsEnumerable(),row => {
// Code to process the data row to Dictionary<object,object>
// Unique Column name is the Dictionary Key
// ConcurrentDictionary is used for thread safety
});
这里我使用Parallel.ForEach
将DataTable
的行处理成Dictionary<object,object>
类型的对象,最终结果是List<Dictionary<object,object>>
类型的对象,使用中间线程实现安全结构 ConcurrentQueue<Dictionary<object,object>>
,DataTable
的来源按给定顺序对数据进行排序,但在并行处理期间总是会丢失。由于顺序很重要,所以我想出了以下解决方法:
Parallel.For(0,RowCount,index => {
int rowIndex = index;
// Access the rows using the Index
// Final structure will be of type ConcurrentDictionary<Custom>,
// with the RowIndex assigned based on original index
});
Class Custom
{
public int RowIndex { get; set; }
public Dictionary<object,object> DataDictionary {get; set;}
}
ConcurrentQueue<Dictionary<Custom>> customObj
类型的最终结果使用以下代码处理:
customObj.OrderBy(x=>x.RowIndex).Select(y=>y.DataDictionary).ToList()
以下是我的问题:
有没有更好的方法来实现相同的并行处理,我可以保持原始顺序,这是最重要的业务需求
在最终解决方案中,我需要局部变量
rowIndex
,我的理解是index
是并行循环的一部分,不会导致关闭问题
有什么指点吗?
首先,您可以在 Parallel.ForEach 中获取 Index 而不是使用 Parallel.For
Parallel.ForEach(dataTable.AsEnumerable(), (line, state, index) =>
{
Console.WriteLine("{0} : {1}", index, line);
});
如我所见,主要目的是避免OrderBy。 为此,请在 ForLoop
之前创建您的var lines = new YourClass[NumberOfElemnts] ;
在此之后,您可以使用您想要的任何循环填充此列表。让我们使用 Parallel.For
Parallel.For(0, NumberOfElemnts, i =>
{
lines[i]=dataTable[i];
});
根据@Panagiotis Kanavos 的备注编辑
这个呢
var items = new ConcurrentDictionary<DataRow, Dictionary<object,object>>;
Parallel.ForEach(dataTable.AsEnumerable(),row => {
var result = ...;
items.Add(row, result);
});
var finalResult = dataTable.Rows.Cast<DataRow>().Select(r => items[r]).ToList());
您可以利用 PLINQ
和
ParallelEnumerable.AsOrdered
扩展方法
Enables treatment of a data source as if it was ordered, overriding the default of unordered.
在您的示例中,您可以按以下方式使用它:
var result = dataTable.AsEnumerable().AsParallel().AsOrdered()
.Select(/*Process the row to dictionary*/).ToList();