Linq 或 IEnumerable 在使用 Parallel.ForEach 时将 Long 设为 运行
Linq Or IEnumerable taking Long to run when using Parallel.ForEach
我有一个读取 csv (200 mb) 的应用程序。
var lines = File.ReadLines(file).ToList();
csv 存储定价信息,其中包含大约 50 万条记录。
下面的代码片段在调用 StoreValues 时大约需要 18 秒。
有没有办法加快速度?
distinctMarketIds = 54 个整数值
lines 集合将有 500k 行,每行 [0] 都有我匹配的 marketId。
IEnumerable<string[]> newList = (lines.Where(t => distinctMarketIds.Contains(t.Split(',')[0]))
.Select(t => t.Split(',')));
log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));
存储值方法使用Parallel.ForEach
Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
});
我似乎无法找到为什么需要 18 秒才能完成此循环。
我在另一台具有类似规格的机器上进行了测试,StoreValue Method
需要 2.5 秒
#region LoadPriceDataFromCsvFile
public int LoadPriceDataFromCsvFile(string filename, string[] marketIdList, int maxThreads)
{
MaxThreads = maxThreads;
int filteredRows = 0;
string[] files = Directory.GetFiles(filename, "*.csv");
elapsed.Start();
log.InfoFormat("Total Csv files to Scan {0}",files.Length);
Parallel.ForEach(files, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (file) =>
{
try
{
log.InfoFormat("About to Scan File {0}", file);
ScanCsvFilesAndGetPrices(file);
}
catch (System.OutOfMemoryException e)
{
log.Info(e);
}
catch (Exception e)
{
log.Info(e);
}
});
return PriceCollection.Count;
}
#endregion
#region ScanCsvFilesAndGetPrices
private void ScanCsvFilesAndGetPrices(string file)
{
try
{
log.Info(("Time Taken " + elapsed.Elapsed.TotalSeconds + " seconds."));
var lines = File.ReadLines(file).ToList();
log.Info(("Time Taken To Read csv " + elapsed.Elapsed.TotalSeconds + " seconds."));
if (lines.Any())
{
log.Info(("Time Taken To Read Any " + elapsed.Elapsed.TotalSeconds + " seconds."));
var firstLine = lines.ElementAt(1); // This is the First Line with Headers
log.Info(("Time Taken To Read First Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
var lastLine = lines.Last(); // This is the Last line in the csv file
log.Info(("Time Taken To Read Last Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
var header = lines.First().Split(',');
log.Info(("Time Taken To Split Header Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
GetIndexOfFields(header);
log.Info(("Time Taken To Read Header " + elapsed.Elapsed.TotalSeconds + " seconds."));
// Get the Publish Date Time
if (PublishedDatetime_Index != -1)
{
var fLine = firstLine.Split(',');
var lLine = lastLine.Split(',');
var firstLineDatetime = (fLine[PublishedDatetime_Index].Contains("+"))? fLine[PublishedDatetime_Index].Remove(fLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): fLine[PublishedDatetime_Index];
var publishDateTimeFirstLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(firstLineDatetime));
string lastLineDatetime = (lLine[PublishedDatetime_Index].Contains("+"))? lLine[PublishedDatetime_Index].Remove(lLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): lLine[PublishedDatetime_Index];
var publishDateTimeLastLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(lastLineDatetime));
// check if the order execution date time of any order lieas between the date time of first and last line of csv so we can add that csv to our filtered list
string[] distinctMarketIds = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString())
.Distinct()
.ToArray();
log.InfoFormat("Total Markets Identified {0}",distinctMarketIds.Length);
List<OrderEntity> foundOrdersList = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine);
lock (FoundOrdersList)
{
FoundOrdersList.AddRange(foundOrdersList);
}
log.InfoFormat("Total Orders Identified {0}", FoundOrdersList.Count());
log.Info(("Time Taken To Read Execution Times and Market " + elapsed.Elapsed.TotalSeconds +" seconds."));
if (distinctMarketIds.Length != 0)
{
IEnumerable<string[]> newList =
(lines.Where(
t => distinctMarketIds.Contains(t.Split(',')[0]))
.Select(t => t.Split(','))
);
log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
// this is taking longer than expected. Somthing to do with IEnumerable<string[]>
StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));
}
}
}
}
catch (Exception e)
{
log.Info(e);
}
}
#endregion
#region GetIndexOfFields
// These are the fields we want to Look for from the headers and accordingly get their location
private void GetIndexOfFields(IEnumerable<string> lineHeader)
{
int index = 0;
foreach (var column in lineHeader)
{
if (column == "MarketId")
{
MarketId_Index= index;
}
if (column == "Bid")
{
Bid_Index = index; ;
}
if (column == "Ask")
{
Ask_Index = index;
}
if (column == "Mid")
{
Mid_Index = index;
}
if (column == "Is_Indicative")
{
Is_Indicative_Index = index;
}
if (column == "Price_Engine")
{
Price_Engine_Index = index;
}
if (column == "PublishedDatetime")
{
PublishedDatetime_Index = index;
}
if (column == "Market_Is_Open")
{
Market_Is_Open_Index = index;
}
if (column == "AuditId")
{
AuditId_Index = index;
}
if (column == "Row_Update_Version")
{
Row_Update_Version_Index = index;
}
if (column == "DontPublish")
{
DontPublish_Index = index; ;
}
index++;
}
}
#endregion
#region StoreValues
private void StoreValues(IEnumerable<string[]> finalLines, string file)
{
log.InfoFormat("total Finel Lines Sent for Storing {0}", finalLines.Count());
Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
var prices = new Prices();
// the code that you want to measure comes here
var datetime = (line[PublishedDatetime_Index].Contains("+")) ? line[PublishedDatetime_Index].Remove(line[PublishedDatetime_Index].IndexOf("+", StringComparison.Ordinal)) : line[PublishedDatetime_Index];
if (!IsNullOrEmpty(datetime))
{
prices.PublishedDatetime = Convert.ToDateTime(datetime);
}
if (!IsNullOrEmpty(line[MarketId_Index]))
{
prices.MarketId = Convert.ToInt32(line[MarketId_Index]);
}
if (!IsNullOrEmpty(line[Bid_Index]))
{
prices.Bid = Convert.ToDecimal(line[Bid_Index]);
}
if (!IsNullOrEmpty(line[Ask_Index]))
{
prices.Ask = Convert.ToDecimal(line[Ask_Index]);
}
if (!IsNullOrEmpty(line[Mid_Index]))
{
prices.Mid = Convert.ToDecimal(line[Mid_Index]);
}
if (!IsNullOrEmpty(line[Is_Indicative_Index]))
{
prices.Is_Indicative = Convert.ToBoolean(line[Is_Indicative_Index]);
}
else
{
prices.Is_Indicative = false;
}
if (!IsNullOrEmpty(line[Price_Engine_Index]))
{
prices.Price_Engine = Convert.ToString(line[Price_Engine_Index]);
}
if (!IsNullOrEmpty(line[Market_Is_Open_Index]))
{
prices.Market_Is_Open = line[Market_Is_Open_Index] == "1";
}
if (!IsNullOrEmpty(line[AuditId_Index]))
{
prices.AuditId = Convert.ToString(line[AuditId_Index]);
}
if (!IsNullOrEmpty(line[Row_Update_Version_Index]))
{
prices.Row_Update_Version = Convert.ToString(line[Row_Update_Version_Index]);
}
if (!IsNullOrEmpty(line[DontPublish_Index]))
{
if (DontPublish_Index != 0)
{
prices.DontPublish = line[DontPublish_Index] == "1";
}
}
prices.SbProdFile = file;
lock (PriceCollection)
{
PriceCollection.Add(prices);
}
});
}
static void Main()
{
//var lines = File.ReadLines(file).ToList();
// this is just a fast generation for sample data
var lines = Enumerable.Range(0, 500000)
.Select(i => string.Join(",", i % 7, i, i & 2))
.ToList();
// HashSet will work as an indexed store and will match faster in your loop
var distinctMarketIds = new HashSet<string>{
"0", "2", "3", "5"
};
// Do this if you are to use the method syntax instead of the query syntax
// var newList = lines.Select(l=>l.Split(','))
// .Where(ps=>distinctMarketIds.Contains(ps[0]));
var newList = from l in lines
// this will parse the string once versus twice as you were doing before
let ps = l.Split(',')
where distinctMarketIds.Contains(ps[0])
select ps;
// can't see the content of your `StoreValues` method but writing to a
// file in parallel will never work as expected.
using (var stream = new StreamWriter("outfile.txt"))
foreach (var l in newList)
stream.WriteLine(string.Join(";", l));
}
- 当您需要处理单个文件时,我看不出
Parallel.ForEach
如何帮助提高性能
- 不要使用
File.ReadLines(file).ToList()
,如果你想要内存中的所有行使用ReadAllLines
或者如果你想一个接一个地处理这些行使用ReadLines
- 为什么要多次拆分线?
- 对
distinctMarketIds
使用HashSet<string>
:
这应该更有效率:
var marketIdSet = new HashSet<string>(OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString()));
IEnumerable<string[]> allFields = File.ReadLines(file)
.Select(line => line.Split(','))
.Where(arr => marketIdSet.Contains(arr[0]));
请注意,由于 Select
和 Where
的延迟执行,这只是一个查询,尚未执行。因此,每当您使用 allFields
时,您将再次执行此查询。所以创建一个集合是个好主意,f.e。使用 allFields.ToList()
传递给 StoreValues
:
StoreValues(allFields.ToList(), file); //store the prices
如果您传递一个集合,您可以从在 StoreValues
中使用 Parallel.ForEach
真正受益。
我有一个读取 csv (200 mb) 的应用程序。
var lines = File.ReadLines(file).ToList();
csv 存储定价信息,其中包含大约 50 万条记录。 下面的代码片段在调用 StoreValues 时大约需要 18 秒。 有没有办法加快速度?
distinctMarketIds = 54 个整数值 lines 集合将有 500k 行,每行 [0] 都有我匹配的 marketId。
IEnumerable<string[]> newList = (lines.Where(t => distinctMarketIds.Contains(t.Split(',')[0]))
.Select(t => t.Split(',')));
log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));
存储值方法使用Parallel.ForEach
Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
});
我似乎无法找到为什么需要 18 秒才能完成此循环。 我在另一台具有类似规格的机器上进行了测试,StoreValue Method
需要 2.5 秒#region LoadPriceDataFromCsvFile
public int LoadPriceDataFromCsvFile(string filename, string[] marketIdList, int maxThreads)
{
MaxThreads = maxThreads;
int filteredRows = 0;
string[] files = Directory.GetFiles(filename, "*.csv");
elapsed.Start();
log.InfoFormat("Total Csv files to Scan {0}",files.Length);
Parallel.ForEach(files, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (file) =>
{
try
{
log.InfoFormat("About to Scan File {0}", file);
ScanCsvFilesAndGetPrices(file);
}
catch (System.OutOfMemoryException e)
{
log.Info(e);
}
catch (Exception e)
{
log.Info(e);
}
});
return PriceCollection.Count;
}
#endregion
#region ScanCsvFilesAndGetPrices
private void ScanCsvFilesAndGetPrices(string file)
{
try
{
log.Info(("Time Taken " + elapsed.Elapsed.TotalSeconds + " seconds."));
var lines = File.ReadLines(file).ToList();
log.Info(("Time Taken To Read csv " + elapsed.Elapsed.TotalSeconds + " seconds."));
if (lines.Any())
{
log.Info(("Time Taken To Read Any " + elapsed.Elapsed.TotalSeconds + " seconds."));
var firstLine = lines.ElementAt(1); // This is the First Line with Headers
log.Info(("Time Taken To Read First Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
var lastLine = lines.Last(); // This is the Last line in the csv file
log.Info(("Time Taken To Read Last Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
var header = lines.First().Split(',');
log.Info(("Time Taken To Split Header Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
GetIndexOfFields(header);
log.Info(("Time Taken To Read Header " + elapsed.Elapsed.TotalSeconds + " seconds."));
// Get the Publish Date Time
if (PublishedDatetime_Index != -1)
{
var fLine = firstLine.Split(',');
var lLine = lastLine.Split(',');
var firstLineDatetime = (fLine[PublishedDatetime_Index].Contains("+"))? fLine[PublishedDatetime_Index].Remove(fLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): fLine[PublishedDatetime_Index];
var publishDateTimeFirstLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(firstLineDatetime));
string lastLineDatetime = (lLine[PublishedDatetime_Index].Contains("+"))? lLine[PublishedDatetime_Index].Remove(lLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): lLine[PublishedDatetime_Index];
var publishDateTimeLastLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(lastLineDatetime));
// check if the order execution date time of any order lieas between the date time of first and last line of csv so we can add that csv to our filtered list
string[] distinctMarketIds = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString())
.Distinct()
.ToArray();
log.InfoFormat("Total Markets Identified {0}",distinctMarketIds.Length);
List<OrderEntity> foundOrdersList = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine);
lock (FoundOrdersList)
{
FoundOrdersList.AddRange(foundOrdersList);
}
log.InfoFormat("Total Orders Identified {0}", FoundOrdersList.Count());
log.Info(("Time Taken To Read Execution Times and Market " + elapsed.Elapsed.TotalSeconds +" seconds."));
if (distinctMarketIds.Length != 0)
{
IEnumerable<string[]> newList =
(lines.Where(
t => distinctMarketIds.Contains(t.Split(',')[0]))
.Select(t => t.Split(','))
);
log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
// this is taking longer than expected. Somthing to do with IEnumerable<string[]>
StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));
}
}
}
}
catch (Exception e)
{
log.Info(e);
}
}
#endregion
#region GetIndexOfFields
// These are the fields we want to Look for from the headers and accordingly get their location
private void GetIndexOfFields(IEnumerable<string> lineHeader)
{
int index = 0;
foreach (var column in lineHeader)
{
if (column == "MarketId")
{
MarketId_Index= index;
}
if (column == "Bid")
{
Bid_Index = index; ;
}
if (column == "Ask")
{
Ask_Index = index;
}
if (column == "Mid")
{
Mid_Index = index;
}
if (column == "Is_Indicative")
{
Is_Indicative_Index = index;
}
if (column == "Price_Engine")
{
Price_Engine_Index = index;
}
if (column == "PublishedDatetime")
{
PublishedDatetime_Index = index;
}
if (column == "Market_Is_Open")
{
Market_Is_Open_Index = index;
}
if (column == "AuditId")
{
AuditId_Index = index;
}
if (column == "Row_Update_Version")
{
Row_Update_Version_Index = index;
}
if (column == "DontPublish")
{
DontPublish_Index = index; ;
}
index++;
}
}
#endregion
#region StoreValues
private void StoreValues(IEnumerable<string[]> finalLines, string file)
{
log.InfoFormat("total Finel Lines Sent for Storing {0}", finalLines.Count());
Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
var prices = new Prices();
// the code that you want to measure comes here
var datetime = (line[PublishedDatetime_Index].Contains("+")) ? line[PublishedDatetime_Index].Remove(line[PublishedDatetime_Index].IndexOf("+", StringComparison.Ordinal)) : line[PublishedDatetime_Index];
if (!IsNullOrEmpty(datetime))
{
prices.PublishedDatetime = Convert.ToDateTime(datetime);
}
if (!IsNullOrEmpty(line[MarketId_Index]))
{
prices.MarketId = Convert.ToInt32(line[MarketId_Index]);
}
if (!IsNullOrEmpty(line[Bid_Index]))
{
prices.Bid = Convert.ToDecimal(line[Bid_Index]);
}
if (!IsNullOrEmpty(line[Ask_Index]))
{
prices.Ask = Convert.ToDecimal(line[Ask_Index]);
}
if (!IsNullOrEmpty(line[Mid_Index]))
{
prices.Mid = Convert.ToDecimal(line[Mid_Index]);
}
if (!IsNullOrEmpty(line[Is_Indicative_Index]))
{
prices.Is_Indicative = Convert.ToBoolean(line[Is_Indicative_Index]);
}
else
{
prices.Is_Indicative = false;
}
if (!IsNullOrEmpty(line[Price_Engine_Index]))
{
prices.Price_Engine = Convert.ToString(line[Price_Engine_Index]);
}
if (!IsNullOrEmpty(line[Market_Is_Open_Index]))
{
prices.Market_Is_Open = line[Market_Is_Open_Index] == "1";
}
if (!IsNullOrEmpty(line[AuditId_Index]))
{
prices.AuditId = Convert.ToString(line[AuditId_Index]);
}
if (!IsNullOrEmpty(line[Row_Update_Version_Index]))
{
prices.Row_Update_Version = Convert.ToString(line[Row_Update_Version_Index]);
}
if (!IsNullOrEmpty(line[DontPublish_Index]))
{
if (DontPublish_Index != 0)
{
prices.DontPublish = line[DontPublish_Index] == "1";
}
}
prices.SbProdFile = file;
lock (PriceCollection)
{
PriceCollection.Add(prices);
}
});
}
static void Main()
{
//var lines = File.ReadLines(file).ToList();
// this is just a fast generation for sample data
var lines = Enumerable.Range(0, 500000)
.Select(i => string.Join(",", i % 7, i, i & 2))
.ToList();
// HashSet will work as an indexed store and will match faster in your loop
var distinctMarketIds = new HashSet<string>{
"0", "2", "3", "5"
};
// Do this if you are to use the method syntax instead of the query syntax
// var newList = lines.Select(l=>l.Split(','))
// .Where(ps=>distinctMarketIds.Contains(ps[0]));
var newList = from l in lines
// this will parse the string once versus twice as you were doing before
let ps = l.Split(',')
where distinctMarketIds.Contains(ps[0])
select ps;
// can't see the content of your `StoreValues` method but writing to a
// file in parallel will never work as expected.
using (var stream = new StreamWriter("outfile.txt"))
foreach (var l in newList)
stream.WriteLine(string.Join(";", l));
}
- 当您需要处理单个文件时,我看不出
Parallel.ForEach
如何帮助提高性能 - 不要使用
File.ReadLines(file).ToList()
,如果你想要内存中的所有行使用ReadAllLines
或者如果你想一个接一个地处理这些行使用ReadLines
- 为什么要多次拆分线?
- 对
distinctMarketIds
使用HashSet<string>
:
这应该更有效率:
var marketIdSet = new HashSet<string>(OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString()));
IEnumerable<string[]> allFields = File.ReadLines(file)
.Select(line => line.Split(','))
.Where(arr => marketIdSet.Contains(arr[0]));
请注意,由于 Select
和 Where
的延迟执行,这只是一个查询,尚未执行。因此,每当您使用 allFields
时,您将再次执行此查询。所以创建一个集合是个好主意,f.e。使用 allFields.ToList()
传递给 StoreValues
:
StoreValues(allFields.ToList(), file); //store the prices
如果您传递一个集合,您可以从在 StoreValues
中使用 Parallel.ForEach
真正受益。