ForEachAsync 与结果

ForEachAsync with Result

我正在尝试将 Stephen Toub's ForEachAsync<T> 扩展方法更改为 return 结果...

斯蒂芬的分机:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

我的方法(无效;任务执行但结果错误)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

我知道我必须 return (WhenAll?) 最后一部分的结果,但我还不知道该怎么做...

更新: 即使所有任务都已执行,我得到的结果只是 degreeOfParallelism 次 null(我猜是因为 default(TResult))。我也尝试 return await body(...) 然后结果很好,但只执行了 degreeOfParallelism 个任务。

您的 LINQ 查询只能有与分区数相同数量的结果 - 您只是将每个分区投影到一个结果中。

如果不在意顺序,只需要将每个分区的结果assemble成一个list,然后压平即可。

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(我已将其从 ForEachAsync 重命名,因为 ForEach 听起来势在必行(适合原版中的 Func<T, Task>),而这是获取结果。A foreach 循环没有结果 - 这个有。)

现在 Parallel.ForEachAsync API 已成为标准库 (.NET 6) 的一部分,实现 returns 和 Task<TResult[]> 的变体是有意义的,基于此 API。这是一个实现:

public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    var results = new List<TResult>();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    var withIndexes = source.Select((item, index) => (item, index));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        var (item, index) = entry;
        var result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        if (t.IsCanceled)
        {
            // Propagate the correct token
            CancellationToken ct = default;
            try { t.GetAwaiter().GetResult(); }
            catch (OperationCanceledException oce) { ct = oce.CancellationToken; }
            return Task.FromCanceled<TResult[]>(ct);
        }
        if (t.IsFaulted)
        {
            var tcs = new TaskCompletionSource<TResult[]>();
            tcs.SetException(t.Exception.InnerExceptions);
            return tcs.Task;
        }
        lock (results) return Task.FromResult(results.ToArray());
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
        .Unwrap();
}

此实现支持具有 IEnumerable<T> 作为 sourceParallel.ForEachAsync 重载的所有选项和功能。它在错误和取消情况下的行为是相同的。结果的排列顺序与 source 序列中关联元素的顺序相同。