由于未知原因,带有异步代码的并行 foreach 循环无法正确完成
Parallel foreach loop with async code doesn't complete correctly for unknown reason
我正在尝试重写一个 foreach
循环以使用 Parallel.ForEach
因为我需要处理的每个文档都可以作为单独的实体处理,因此没有任何依赖关系。
代码相当简单,如下所示:
- 查询数据库
- 循环阅读每个文档
- 对每个文档进行两次网络调用并将结果添加到文档
- 将更新的文档添加到列表
- 批量导入列表到数据库
由于网络 API 调用由于网络延迟是最慢的部分,我想并行处理它们以节省时间所以我写了这段代码
private async Task<List<String>> FetchDocumentsAndBuildList(string brand)
{
using (var client = new DocumentClient(new Uri(cosmosDBEndpointUrl), cosmosDBPrimaryKey))
{
List<string> formattedList = new List<string>();
FeedOptions queryOptions = new FeedOptions
{
MaxItemCount = -1,
PartitionKey = new PartitionKey(brand)
};
var query = client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(cosmosDBName, cosmosDBCollectionNameRawData), $"SELECT TOP 2 * from c where c.brand = '{brand}'", queryOptions).AsDocumentQuery();
while(query.HasMoreResults)
{
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(await query.ExecuteNextAsync<Document>(), options, async singleDocument =>
{
JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
if (originalData != null)
{
var artNo = originalData.GetValue("artno");
if (artNo != null)
{
string strArtNo = artNo.ToString();
string productNumber = strArtNo.Substring(0, 7);
string colorNumber = strArtNo.Substring(7, 3);
string HmGoeUrl = $"https://xxx,xom/Online/{strArtNo}/en";
string sisApiUrl = $"https:/yyy.com/{productNumber}/{colorNumber}?&maxnumberofstores=10&brand=000&channel=02";
string HttpFetchMethod = "GET";
JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
if (detailedDataResponse != null)
{
JObject productList = (JObject)detailedDataResponse["product"];
if (productList != null)
{
var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
.Single(x => x.code == strArtNo)
.Index;
detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
}
}
singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
singleDocument.SetPropertyValue("InventoryData", inventoryData);
singleDocument.SetPropertyValue("consumer", "NWS");
}
}
formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
});
//foreach (Document singleDocument in await query.ExecuteNextAsync<Document>())
//{
// JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
// if(originalData != null)
// {
// var artNo = originalData.GetValue("artno");
// if(artNo != null)
// {
// string strArtNo = artNo.ToString();
// string productNumber = strArtNo.Substring(0, 7);
// string colorNumber = strArtNo.Substring(7, 3);
// string HmGoeUrl = $"https:/xxx.xom/Online/{strArtNo}/en";
// string sisApiUrl = $"https://yyy.xom&maxnumberofstores=10&brand=000&channel=02";
// string HttpFetchMethod = "GET";
// JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
// JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
// if(detailedDataResponse != null)
// {
// JObject productList = (JObject)detailedDataResponse["product"];
// if(productList != null)
// {
// var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
// .Single(x => x.code == strArtNo)
// .Index;
// detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
// }
// }
// singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
// singleDocument.SetPropertyValue("InventoryData", inventoryData);
// singleDocument.SetPropertyValue("consumer", "NWS");
// }
// }
// formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
//}
}
return formattedList;
}
}
如果我在循环中添加断点,我可以看到为每个变量分配了正确的值,但由于某种原因,返回的 formattedList
始终为 0 个条目,我无法弄清楚原因。
被注释掉的是原始的 foreach
循环,它工作得很好但是很慢
--- 编辑 ---
这就是我从父方法调用此代码的方式
log.LogInformation($"Starting creation of DocumentList for BulkImport at: {DateTime.Now}");
var documentsToImportInBatch = await FetchDocumentsAndBuildList(brand);
log.LogInformation($"BulkExecutor DocumentList has: {documentsToImportInBatch.Count} entries, created at: {DateTime.Now}");
这里的问题是 Parallel.ForEach
不明白每次调用返回 Task
的 lambda 都需要等待 ForEach
才能被视为完成。
因此,在您的函数退出之前不会调用 await 之后的延续,这就是为什么 formattedList
中有零个元素。
您可以使用代码示例轻松证明这一点,例如:
Parallel.ForEach(Enumerable.Range(0, 100), async singleDocument => await Task.Delay(9999));
Console.WriteLine("Done!");
Done
几乎会立即打印出来。
对于 I/O 绑定并行性,您可以改用 Task.WhenAll
来并行化您的异步网络抓取调用
var myDocuments = await query.ExecuteNextAsync<Document>();
var myScrapingTasks = myDocuments.Select(async singleDocument =>
{
// ... all of your web scraping code here
// return the amended (mutated) document
return JsonConvert.SerializeObject(singleDocument);
});
var results = await Task.WhenAll(myScrapingTasks);
formattedList.AddRange(results);
w.r.t MaxDegreeOfParallelism
,如果您发现需要限制并发抓取调用的数量,最简单的方法是 group the incoming documents into manageable chunks 并一次处理较小的块 - Select(x, i)
过载和 GroupBy
创造奇迹。
我正在尝试重写一个 foreach
循环以使用 Parallel.ForEach
因为我需要处理的每个文档都可以作为单独的实体处理,因此没有任何依赖关系。
代码相当简单,如下所示:
- 查询数据库
- 循环阅读每个文档
- 对每个文档进行两次网络调用并将结果添加到文档
- 将更新的文档添加到列表
- 批量导入列表到数据库
由于网络 API 调用由于网络延迟是最慢的部分,我想并行处理它们以节省时间所以我写了这段代码
private async Task<List<String>> FetchDocumentsAndBuildList(string brand)
{
using (var client = new DocumentClient(new Uri(cosmosDBEndpointUrl), cosmosDBPrimaryKey))
{
List<string> formattedList = new List<string>();
FeedOptions queryOptions = new FeedOptions
{
MaxItemCount = -1,
PartitionKey = new PartitionKey(brand)
};
var query = client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(cosmosDBName, cosmosDBCollectionNameRawData), $"SELECT TOP 2 * from c where c.brand = '{brand}'", queryOptions).AsDocumentQuery();
while(query.HasMoreResults)
{
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(await query.ExecuteNextAsync<Document>(), options, async singleDocument =>
{
JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
if (originalData != null)
{
var artNo = originalData.GetValue("artno");
if (artNo != null)
{
string strArtNo = artNo.ToString();
string productNumber = strArtNo.Substring(0, 7);
string colorNumber = strArtNo.Substring(7, 3);
string HmGoeUrl = $"https://xxx,xom/Online/{strArtNo}/en";
string sisApiUrl = $"https:/yyy.com/{productNumber}/{colorNumber}?&maxnumberofstores=10&brand=000&channel=02";
string HttpFetchMethod = "GET";
JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
if (detailedDataResponse != null)
{
JObject productList = (JObject)detailedDataResponse["product"];
if (productList != null)
{
var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
.Single(x => x.code == strArtNo)
.Index;
detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
}
}
singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
singleDocument.SetPropertyValue("InventoryData", inventoryData);
singleDocument.SetPropertyValue("consumer", "NWS");
}
}
formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
});
//foreach (Document singleDocument in await query.ExecuteNextAsync<Document>())
//{
// JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
// if(originalData != null)
// {
// var artNo = originalData.GetValue("artno");
// if(artNo != null)
// {
// string strArtNo = artNo.ToString();
// string productNumber = strArtNo.Substring(0, 7);
// string colorNumber = strArtNo.Substring(7, 3);
// string HmGoeUrl = $"https:/xxx.xom/Online/{strArtNo}/en";
// string sisApiUrl = $"https://yyy.xom&maxnumberofstores=10&brand=000&channel=02";
// string HttpFetchMethod = "GET";
// JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
// JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
// if(detailedDataResponse != null)
// {
// JObject productList = (JObject)detailedDataResponse["product"];
// if(productList != null)
// {
// var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
// .Single(x => x.code == strArtNo)
// .Index;
// detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
// }
// }
// singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
// singleDocument.SetPropertyValue("InventoryData", inventoryData);
// singleDocument.SetPropertyValue("consumer", "NWS");
// }
// }
// formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
//}
}
return formattedList;
}
}
如果我在循环中添加断点,我可以看到为每个变量分配了正确的值,但由于某种原因,返回的 formattedList
始终为 0 个条目,我无法弄清楚原因。
被注释掉的是原始的 foreach
循环,它工作得很好但是很慢
--- 编辑 --- 这就是我从父方法调用此代码的方式
log.LogInformation($"Starting creation of DocumentList for BulkImport at: {DateTime.Now}");
var documentsToImportInBatch = await FetchDocumentsAndBuildList(brand);
log.LogInformation($"BulkExecutor DocumentList has: {documentsToImportInBatch.Count} entries, created at: {DateTime.Now}");
这里的问题是 Parallel.ForEach
不明白每次调用返回 Task
的 lambda 都需要等待 ForEach
才能被视为完成。
因此,在您的函数退出之前不会调用 await 之后的延续,这就是为什么 formattedList
中有零个元素。
您可以使用代码示例轻松证明这一点,例如:
Parallel.ForEach(Enumerable.Range(0, 100), async singleDocument => await Task.Delay(9999));
Console.WriteLine("Done!");
Done
几乎会立即打印出来。
对于 I/O 绑定并行性,您可以改用 Task.WhenAll
来并行化您的异步网络抓取调用
var myDocuments = await query.ExecuteNextAsync<Document>();
var myScrapingTasks = myDocuments.Select(async singleDocument =>
{
// ... all of your web scraping code here
// return the amended (mutated) document
return JsonConvert.SerializeObject(singleDocument);
});
var results = await Task.WhenAll(myScrapingTasks);
formattedList.AddRange(results);
w.r.t MaxDegreeOfParallelism
,如果您发现需要限制并发抓取调用的数量,最简单的方法是 group the incoming documents into manageable chunks 并一次处理较小的块 - Select(x, i)
过载和 GroupBy
创造奇迹。