在dotnet core中批量使用Parallel.For

Use Parallel.For in batches in dotnet core

我在 dotnet 核心中使用 httptrigger 函数,我在 Json 中获取 httprequest 数据 format.I 需要将此值插入 Google Merchant Center 帐户。需要插入将近 9000 行(每次都是动态数据)。我如何实现执行速度更快的 Parallel.for 逻辑。目前我正在使用如下所示的每个循环,但它需要更多时间。 下面是代码。

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic body = JsonConvert.DeserializeObject(requestBody);
for (int i =0;i<body.Count;i++)
{
  Product newProduct = InsertProduct(merchantId, websiteUrl,body[i]);
}

这样做。

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic body = JsonConvert.DeserializeObject(requestBody);
Parallel.For(0, body.Count, i => {
    Product newProduct = InsertProduct(merchantId, websiteUrl,body[i]);
});

我创建了一个小示例,也许您可​​以找到最适合您情况的最佳方法。

dotnet fiddle Example

有3个选项:

顺序

正如标题所说,每一项都是按顺序处理的。非常节省方法但不是处理 9000 项最快的方法:)

var list = GenerateItems();
var count = list.Count();
for(var i = 0; i < count; i++) 
{
    InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
}

有Parallel.For图书馆

正如评论所说,它对 CPU 绑定处理有好处,但在异步方法上有一些不足 (here)

var list = GenerateItems();
var count = list.Count();
var options = new ParallelOptions{MaxDegreeOfParallelism = MAX_DEGREE_OF_PARALLELISM};
Parallel.For(0, count, options, (i) => 
{
    InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
});

和Async-Await

我认为在您的示例中这最适合您。每个项目都并行处理,直接开始处理并旋转 Task。 (Copied the async-extension from here)

var list = GenerateItems();
var count = list.Count();

// Extensions method see in referenced SO answer
ForEachAsync(count, list, async (item, index) => 
{
    await InsertInDatabaseAsync($"{index}", item);
}).GetAwaiter().GetResult();

...已更新

感谢您的评论。我已将 async-await 实现更新为更简单的实现:

private static async Task ForEachAsync<T>(IEnumerable<T> enumerable, Func<T, int, Task> asyncFunc)
{
    var itemsCount = enumerable.Count();
    var tasks = new Task[itemsCount];
    int i = 0;
    foreach (var t in enumerable)
    {
        tasks[i] = asyncFunc(t, i);
        i++;
    }
    await Task.WhenAll(tasks);
}

并且还添加了 MAX_DEGREE_OF_PARALLELISM 设置为 1。这对推荐中描述的并行处理有巨大影响。