Linq 或 IEnumerable 在使用 Parallel.ForEach 时将 Long 设为 运行

Linq Or IEnumerable taking Long to run when using Parallel.ForEach

我有一个读取 csv (200 mb) 的应用程序。

var lines = File.ReadLines(file).ToList();

csv 存储定价信息,其中包含大约 50 万条记录。 下面的代码片段在调用 StoreValues 时大约需要 18 秒。 有没有办法加快速度?

distinctMarketIds = 54 个整数值 lines 集合将有 500k 行,每行 [0] 都有我匹配的 marketId。

IEnumerable<string[]> newList = (lines.Where(t => distinctMarketIds.Contains(t.Split(',')[0]))
                                      .Select(t => t.Split(',')));

log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));

StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));

存储值方法使用Parallel.ForEach

Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
});

我似乎无法找到为什么需要 18 秒才能完成此循环。 我在另一台具有类似规格的机器上进行了测试,StoreValue Method

需要 2.5 秒
#region LoadPriceDataFromCsvFile

        public int LoadPriceDataFromCsvFile(string filename, string[] marketIdList, int maxThreads)
        {
             MaxThreads = maxThreads;
             int filteredRows = 0;
             string[] files = Directory.GetFiles(filename, "*.csv");
            elapsed.Start();
            log.InfoFormat("Total Csv files to Scan {0}",files.Length);
            Parallel.ForEach(files, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (file) =>
            {
                try
                {
                    log.InfoFormat("About to Scan File {0}", file);
                    ScanCsvFilesAndGetPrices(file);
                }
               catch (System.OutOfMemoryException e)
               {
                  log.Info(e);
               }
                catch (Exception e)
                {
                    log.Info(e);
                }

            });


            return PriceCollection.Count;
        }

        #endregion

#region ScanCsvFilesAndGetPrices
        private void ScanCsvFilesAndGetPrices(string file)
        {
            try
            {

                log.Info(("Time Taken " + elapsed.Elapsed.TotalSeconds + " seconds."));
                var lines = File.ReadLines(file).ToList();
                log.Info(("Time Taken To Read csv " + elapsed.Elapsed.TotalSeconds + " seconds."));

                if (lines.Any())
                {
                    log.Info(("Time Taken To Read Any " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var firstLine = lines.ElementAt(1); // This is the First Line with Headers
                    log.Info(("Time Taken To Read First Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var lastLine = lines.Last(); // This is the Last line in the csv file
                    log.Info(("Time Taken To Read Last Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var header = lines.First().Split(',');
                    log.Info(("Time Taken To Split Header Line " + elapsed.Elapsed.TotalSeconds + " seconds."));

                    GetIndexOfFields(header);
                    log.Info(("Time Taken To Read Header " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    // Get the Publish Date Time
                    if (PublishedDatetime_Index != -1)
                    {
                        var fLine = firstLine.Split(',');
                        var lLine = lastLine.Split(',');

                        var firstLineDatetime = (fLine[PublishedDatetime_Index].Contains("+"))? fLine[PublishedDatetime_Index].Remove(fLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): fLine[PublishedDatetime_Index];
                        var publishDateTimeFirstLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(firstLineDatetime));

                        string lastLineDatetime = (lLine[PublishedDatetime_Index].Contains("+"))? lLine[PublishedDatetime_Index].Remove(lLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): lLine[PublishedDatetime_Index];
                        var publishDateTimeLastLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(lastLineDatetime));
                        // check if the order execution date time of any order lieas between the date time of first and last line of csv so we can add that csv to our filtered list


                        string[] distinctMarketIds = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString())
                                    .Distinct()
                                    .ToArray();

                        log.InfoFormat("Total Markets Identified {0}",distinctMarketIds.Length);

                        List<OrderEntity> foundOrdersList = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine);

                        lock (FoundOrdersList)
                        {
                            FoundOrdersList.AddRange(foundOrdersList);
                        }
                        log.InfoFormat("Total Orders Identified {0}", FoundOrdersList.Count());

                        log.Info(("Time Taken To Read Execution Times and Market " + elapsed.Elapsed.TotalSeconds +" seconds."));
                        if (distinctMarketIds.Length != 0)
                        {
                            IEnumerable<string[]> newList =
                                                            (lines.Where(
                                                                t => distinctMarketIds.Contains(t.Split(',')[0]))
                                                                .Select(t => t.Split(','))
                                                                );

                            log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
                            // this is taking longer than expected. Somthing to do with IEnumerable<string[]> 
                            StoreValues(newList, file); //store the prices
                            log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));



                        }
                    }

                }
            }
            catch (Exception e)
            {
                log.Info(e);
            }

         }


        #endregion

#region GetIndexOfFields

        // These are the fields we want to Look for from the headers and accordingly get their location
        private void GetIndexOfFields(IEnumerable<string> lineHeader)
        {
            int index = 0;
            foreach (var column in lineHeader)
            {
                if (column == "MarketId")
                {
                   MarketId_Index= index;
                }
                if (column == "Bid")
                {
                    Bid_Index = index; ;
                }
                 if (column == "Ask")
                {
                    Ask_Index = index; 
                }
                 if (column == "Mid")
                {
                    Mid_Index = index;
                }
                 if (column == "Is_Indicative")
                {
                    Is_Indicative_Index = index;
                }
                 if (column == "Price_Engine")
                {
                    Price_Engine_Index = index; 
                }
                 if (column == "PublishedDatetime")
                {
                    PublishedDatetime_Index = index; 
                }
                 if (column == "Market_Is_Open")
                {
                    Market_Is_Open_Index = index; 
                }
                 if (column == "AuditId")
                {
                    AuditId_Index = index; 
                }
                 if (column == "Row_Update_Version")
                {
                    Row_Update_Version_Index = index; 
                }
                 if (column == "DontPublish")
                {
                    DontPublish_Index = index; ;
                }
                index++;
            }


        }

        #endregion

        #region StoreValues

        private void StoreValues(IEnumerable<string[]> finalLines, string file)
        {

            log.InfoFormat("total Finel Lines Sent for Storing {0}", finalLines.Count());

            Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
               {
                   var prices = new Prices();
                  // the code that you want to measure comes here
                  var datetime = (line[PublishedDatetime_Index].Contains("+")) ? line[PublishedDatetime_Index].Remove(line[PublishedDatetime_Index].IndexOf("+", StringComparison.Ordinal)) : line[PublishedDatetime_Index];


                   if (!IsNullOrEmpty(datetime))
                   {
                       prices.PublishedDatetime = Convert.ToDateTime(datetime);
                   }

                   if (!IsNullOrEmpty(line[MarketId_Index]))
                   {
                       prices.MarketId = Convert.ToInt32(line[MarketId_Index]);
                   }
                   if (!IsNullOrEmpty(line[Bid_Index]))
                   {
                       prices.Bid = Convert.ToDecimal(line[Bid_Index]);
                   }
                   if (!IsNullOrEmpty(line[Ask_Index]))
                   {
                       prices.Ask = Convert.ToDecimal(line[Ask_Index]);
                   }
                   if (!IsNullOrEmpty(line[Mid_Index]))
                   {
                       prices.Mid = Convert.ToDecimal(line[Mid_Index]);
                   }
                   if (!IsNullOrEmpty(line[Is_Indicative_Index]))
                   {
                       prices.Is_Indicative = Convert.ToBoolean(line[Is_Indicative_Index]);
                   }
                   else
                   {
                       prices.Is_Indicative = false;
                   }
                   if (!IsNullOrEmpty(line[Price_Engine_Index]))
                   {
                       prices.Price_Engine = Convert.ToString(line[Price_Engine_Index]);
                   }

                   if (!IsNullOrEmpty(line[Market_Is_Open_Index]))
                   {
                       prices.Market_Is_Open = line[Market_Is_Open_Index] == "1";
                   }
                   if (!IsNullOrEmpty(line[AuditId_Index]))
                   {
                       prices.AuditId = Convert.ToString(line[AuditId_Index]);
                   }
                   if (!IsNullOrEmpty(line[Row_Update_Version_Index]))
                   {
                       prices.Row_Update_Version = Convert.ToString(line[Row_Update_Version_Index]);
                   }
                   if (!IsNullOrEmpty(line[DontPublish_Index]))
                   {
                       if (DontPublish_Index != 0)
                       {
                           prices.DontPublish = line[DontPublish_Index] == "1";
                       }
                   }
                   prices.SbProdFile = file;

                   lock (PriceCollection)
                   {
                       PriceCollection.Add(prices);
                   }
               });

        }
static void Main()
{
    //var lines = File.ReadLines(file).ToList();

    // this is just a fast generation for sample data
    var lines = Enumerable.Range(0, 500000)
                            .Select(i => string.Join(",", i % 7, i, i & 2))
                            .ToList();

    // HashSet will work as an indexed store and will match faster in your loop
    var distinctMarketIds = new HashSet<string>{
        "0", "2", "3", "5"
    };

    // Do this if you are to use the method syntax instead of the query syntax
    // var newList = lines.Select(l=>l.Split(','))
    //                    .Where(ps=>distinctMarketIds.Contains(ps[0]));

    var newList = from l in lines
                  // this will parse the string once versus twice as you were doing before
                  let ps = l.Split(',')
                  where distinctMarketIds.Contains(ps[0])
                  select ps;

    // can't see the content of your `StoreValues` method but writing to a 
    //    file in parallel will never work as expected.  
    using (var stream = new StreamWriter("outfile.txt"))
        foreach (var l in newList)
            stream.WriteLine(string.Join(";", l));

}
  • 当您需要处理单个文件时,我看不出 Parallel.ForEach 如何帮助提高性能
  • 不要使用File.ReadLines(file).ToList(),如果你想要内存中的所有行使用ReadAllLines或者如果你想一个接一个地处理这些行使用ReadLines
  • 为什么要多次拆分线?
  • distinctMarketIds使用HashSet<string>:

这应该更有效率:

var marketIdSet = new HashSet<string>(OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString()));
IEnumerable<string[]> allFields = File.ReadLines(file)
    .Select(line => line.Split(','))
    .Where(arr => marketIdSet.Contains(arr[0]));

请注意,由于 SelectWhere 的延迟执行,这只是一个查询,尚未执行。因此,每当您使用 allFields 时,您将再次执行此查询。所以创建一个集合是个好主意,f.e。使用 allFields.ToList() 传递给 StoreValues:

StoreValues(allFields.ToList(), file); //store the prices

如果您传递一个集合,您可以从在 StoreValues 中使用 Parallel.ForEach 真正受益。