使用 return 值启动未知数量的并行任务

Launching an unknown number of parallel tasks with return values

我有以下方法:

private async Task<string> MakeRequestAsync(string id)
{
    // ...
}

我有一个用户提供的 ID 列表。

我想为每个 ID 调用一次 MakeRequestAsync()。但我想在合理的情况下使用尽可能多的并行任务。我需要每次调用的结果,我想检测异常。

我看过很多示例,但不知道如何执行此操作。例如,我找到了 Parallel.ForEach(),但是 body 参数是一个 Action<> 而不是 return 任何值。另外,我不确定我应该创建的最大任务数是多少。

谁能告诉我你会怎么做?或者给一篇好文章提供一个link?

您可以使用 await Task.WhenAll 轻松完成,但我经常使用下面的便捷方法。您可以像这样使用它们:

(string, ArgumentException)[] results = await idList
    .ForEachParallelSafe<string, ArgumentException>(MakeRequestAsync);

结果的顺序与源集合的顺序相同。

如果您对抛出第一个异常没问题,您也可以这样做:

string[] results = await idList
    .ForEachParallel(MakeRequestAsync);

可选的 degreeOfParallelism 参数允许您限制并行执行的最大数量。还有一个带有可选 elementSelector 参数的重载,以防您需要将每个原始输入元素与输出组合,例如,如果您需要创建一个 Dictionary.

这是代码(为了方便起见,我已经包含了所有的重载,你也可以只选择你需要的):


        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions. If the execution of <paramref name="asyncFunc" /> throws an exception of type
        /// <typeparamref name="TException" />, it is caught and returned in the result.
        /// </summary>
        public static Task<(TOut Result, TException Exception)[]>
            ForeachParallelSafe<TIn, TOut, TException>(this IEnumerable<TIn> source,
                Func<TIn, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
            where TException : Exception
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));

            async Task<(TOut Result, TException Exception)> safeFunc(TIn input)
            {
                try
                {
                    return (await asyncFunc(input), null);
                }
                catch (TException e)
                {
                    return (default, e);
                }
            }

            return ForeachParallel(source, safeFunc, (orig, output) => output, degreeOfParallelism);
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions. The returned items are the result of applying <paramref name="elementSelector" />
        /// to each of the original items and the resulting items.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static async Task<TResult[]> ForeachParallel<T, TOut, TResult>(this IEnumerable<T> source,
            Func<T, Task<TOut>> asyncFunc, Func<T, TOut, TResult> elementSelector,
            int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
            if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector));

            // Copy the source into an array to avoid multiple enumerations.
            // Could be optimized to avoid copying in certain cases but this
            // is usually negligible compared to the async operation.
            T[] sourceCopy = source.ToArray();

            SemaphoreSlim semaphore = null;
            if (degreeOfParallelism > 0)
            {
                semaphore = new SemaphoreSlim(degreeOfParallelism, degreeOfParallelism);
            }

            
            TOut[] intermediateResults = await Task.WhenAll(sourceCopy
                .Select(async x =>
                {
                    if (semaphore != null)
                    {
                        await semaphore.WaitAsync();
                    }

                    try
                    {
                        return await asyncFunc(x);
                    }
                    finally
                    {
                        semaphore?.Release();
                    }
                }));


            TResult[] result = sourceCopy
                .Select((x, index) => elementSelector(x, intermediateResults[index]))
                .ToArray();

            semaphore?.Dispose();

            return result;
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the end of all executions.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static Task ForeachParallel<T>(this IEnumerable<T> source,
            Func<T, Task> asyncFunc, int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));

            Task<int> asyncTask(T t) => asyncFunc(t).ContinueWith(_ => 0);

            return ForeachParallel(source, asyncTask, (orig, output) => output, degreeOfParallelism);
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static Task<TOut[]> ForeachParallel<T, TOut>(this IEnumerable<T> source,
            Func<T, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
            return ForeachParallel(source, asyncFunc, (orig, output) => output, degreeOfParallelism);
        }