在并行处理期间维护 DataTable 行的顺序

Maintaining order of DataTable rows during parallel processing

以下是当前代码:

 Parallel.ForEach(dataTable.AsEnumerable(),row => {

     // Code to process the data row to Dictionary<object,object>
     // Unique Column name is the Dictionary Key
     // ConcurrentDictionary is used for thread safety      
     });

这里我使用Parallel.ForEachDataTable的行处理成Dictionary<object,object>类型的对象,最终结果是List<Dictionary<object,object>>类型的对象,使用中间线程实现安全结构 ConcurrentQueue<Dictionary<object,object>>DataTable 的来源按给定顺序对数据进行排序,但在并行处理期间总是会丢失。由于顺序很重要,所以我想出了以下解决方法:

Parallel.For(0,RowCount,index => {

  int rowIndex = index;

  // Access the rows using the Index
  // Final structure will be of type ConcurrentDictionary<Custom>, 
  // with the RowIndex assigned based on original index
});

Class Custom
{
  public int RowIndex { get; set; }

  public Dictionary<object,object> DataDictionary {get; set;}
}

ConcurrentQueue<Dictionary<Custom>> customObj 类型的最终结果使用以下代码处理:

customObj.OrderBy(x=>x.RowIndex).Select(y=>y.DataDictionary).ToList()

以下是我的问题:

  1. 有没有更好的方法来实现相同的并行处理,我可以保持原始顺序,这是最重要的业务需求

  2. 在最终解决方案中,我需要局部变量 rowIndex,我的理解是 index 是并行循环的一部分,不会导致关闭问题

有什么指点吗?

首先,您可以在 Parallel.ForEach 中获取 Index 而不是使用 Parallel.For

Parallel.ForEach(dataTable.AsEnumerable(), (line, state, index) =>
{
    Console.WriteLine("{0} : {1}", index, line);
});

如我所见,主要目的是避免OrderBy。 为此,请在 ForLoop

之前创建您的
var lines =  new YourClass[NumberOfElemnts] ;

在此之后,您可以使用您想要的任何循环填充此列表。让我们使用 Parallel.For

Parallel.For(0, NumberOfElemnts, i =>
    {
        lines[i]=dataTable[i];
    });

根据@Panagiotis Kanavos 的备注编辑

这个呢

var items = new ConcurrentDictionary<DataRow, Dictionary<object,object>>;

Parallel.ForEach(dataTable.AsEnumerable(),row => {
    var result = ...; 
    items.Add(row, result);
});

var finalResult = dataTable.Rows.Cast<DataRow>().Select(r => items[r]).ToList());

您可以利用 PLINQ

ParallelEnumerable.AsOrdered 扩展方法

Enables treatment of a data source as if it was ordered, overriding the default of unordered.

在您的示例中,您可以按以下方式使用它:

var result = dataTable.AsEnumerable().AsParallel().AsOrdered()
                      .Select(/*Process the row to dictionary*/).ToList();