使用 return 值启动未知数量的并行任务
Launching an unknown number of parallel tasks with return values
我有以下方法:
private async Task<string> MakeRequestAsync(string id)
{
// ...
}
我有一个用户提供的 ID 列表。
我想为每个 ID 调用一次 MakeRequestAsync()
。但我想在合理的情况下使用尽可能多的并行任务。我需要每次调用的结果,我想检测异常。
我看过很多示例,但不知道如何执行此操作。例如,我找到了 Parallel.ForEach()
,但是 body
参数是一个 Action<>
而不是 return 任何值。另外,我不确定我应该创建的最大任务数是多少。
谁能告诉我你会怎么做?或者给一篇好文章提供一个link?
您可以使用 await Task.WhenAll
轻松完成,但我经常使用下面的便捷方法。您可以像这样使用它们:
(string, ArgumentException)[] results = await idList
.ForEachParallelSafe<string, ArgumentException>(MakeRequestAsync);
结果的顺序与源集合的顺序相同。
如果您对抛出第一个异常没问题,您也可以这样做:
string[] results = await idList
.ForEachParallel(MakeRequestAsync);
可选的 degreeOfParallelism
参数允许您限制并行执行的最大数量。还有一个带有可选 elementSelector
参数的重载,以防您需要将每个原始输入元素与输出组合,例如,如果您需要创建一个 Dictionary
.
这是代码(为了方便起见,我已经包含了所有的重载,你也可以只选择你需要的):
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. If the execution of <paramref name="asyncFunc" /> throws an exception of type
/// <typeparamref name="TException" />, it is caught and returned in the result.
/// </summary>
public static Task<(TOut Result, TException Exception)[]>
ForeachParallelSafe<TIn, TOut, TException>(this IEnumerable<TIn> source,
Func<TIn, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
where TException : Exception
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
async Task<(TOut Result, TException Exception)> safeFunc(TIn input)
{
try
{
return (await asyncFunc(input), null);
}
catch (TException e)
{
return (default, e);
}
}
return ForeachParallel(source, safeFunc, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. The returned items are the result of applying <paramref name="elementSelector" />
/// to each of the original items and the resulting items.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static async Task<TResult[]> ForeachParallel<T, TOut, TResult>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, Func<T, TOut, TResult> elementSelector,
int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector));
// Copy the source into an array to avoid multiple enumerations.
// Could be optimized to avoid copying in certain cases but this
// is usually negligible compared to the async operation.
T[] sourceCopy = source.ToArray();
SemaphoreSlim semaphore = null;
if (degreeOfParallelism > 0)
{
semaphore = new SemaphoreSlim(degreeOfParallelism, degreeOfParallelism);
}
TOut[] intermediateResults = await Task.WhenAll(sourceCopy
.Select(async x =>
{
if (semaphore != null)
{
await semaphore.WaitAsync();
}
try
{
return await asyncFunc(x);
}
finally
{
semaphore?.Release();
}
}));
TResult[] result = sourceCopy
.Select((x, index) => elementSelector(x, intermediateResults[index]))
.ToArray();
semaphore?.Dispose();
return result;
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the end of all executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task ForeachParallel<T>(this IEnumerable<T> source,
Func<T, Task> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
Task<int> asyncTask(T t) => asyncFunc(t).ContinueWith(_ => 0);
return ForeachParallel(source, asyncTask, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task<TOut[]> ForeachParallel<T, TOut>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
return ForeachParallel(source, asyncFunc, (orig, output) => output, degreeOfParallelism);
}
我有以下方法:
private async Task<string> MakeRequestAsync(string id)
{
// ...
}
我有一个用户提供的 ID 列表。
我想为每个 ID 调用一次 MakeRequestAsync()
。但我想在合理的情况下使用尽可能多的并行任务。我需要每次调用的结果,我想检测异常。
我看过很多示例,但不知道如何执行此操作。例如,我找到了 Parallel.ForEach()
,但是 body
参数是一个 Action<>
而不是 return 任何值。另外,我不确定我应该创建的最大任务数是多少。
谁能告诉我你会怎么做?或者给一篇好文章提供一个link?
您可以使用 await Task.WhenAll
轻松完成,但我经常使用下面的便捷方法。您可以像这样使用它们:
(string, ArgumentException)[] results = await idList
.ForEachParallelSafe<string, ArgumentException>(MakeRequestAsync);
结果的顺序与源集合的顺序相同。
如果您对抛出第一个异常没问题,您也可以这样做:
string[] results = await idList
.ForEachParallel(MakeRequestAsync);
可选的 degreeOfParallelism
参数允许您限制并行执行的最大数量。还有一个带有可选 elementSelector
参数的重载,以防您需要将每个原始输入元素与输出组合,例如,如果您需要创建一个 Dictionary
.
这是代码(为了方便起见,我已经包含了所有的重载,你也可以只选择你需要的):
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. If the execution of <paramref name="asyncFunc" /> throws an exception of type
/// <typeparamref name="TException" />, it is caught and returned in the result.
/// </summary>
public static Task<(TOut Result, TException Exception)[]>
ForeachParallelSafe<TIn, TOut, TException>(this IEnumerable<TIn> source,
Func<TIn, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
where TException : Exception
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
async Task<(TOut Result, TException Exception)> safeFunc(TIn input)
{
try
{
return (await asyncFunc(input), null);
}
catch (TException e)
{
return (default, e);
}
}
return ForeachParallel(source, safeFunc, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions. The returned items are the result of applying <paramref name="elementSelector" />
/// to each of the original items and the resulting items.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static async Task<TResult[]> ForeachParallel<T, TOut, TResult>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, Func<T, TOut, TResult> elementSelector,
int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector));
// Copy the source into an array to avoid multiple enumerations.
// Could be optimized to avoid copying in certain cases but this
// is usually negligible compared to the async operation.
T[] sourceCopy = source.ToArray();
SemaphoreSlim semaphore = null;
if (degreeOfParallelism > 0)
{
semaphore = new SemaphoreSlim(degreeOfParallelism, degreeOfParallelism);
}
TOut[] intermediateResults = await Task.WhenAll(sourceCopy
.Select(async x =>
{
if (semaphore != null)
{
await semaphore.WaitAsync();
}
try
{
return await asyncFunc(x);
}
finally
{
semaphore?.Release();
}
}));
TResult[] result = sourceCopy
.Select((x, index) => elementSelector(x, intermediateResults[index]))
.ToArray();
semaphore?.Dispose();
return result;
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the end of all executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task ForeachParallel<T>(this IEnumerable<T> source,
Func<T, Task> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
Task<int> asyncTask(T t) => asyncFunc(t).ContinueWith(_ => 0);
return ForeachParallel(source, asyncTask, (orig, output) => output, degreeOfParallelism);
}
/// <summary>
/// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
/// awaits the result of executions.
/// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
/// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
/// </summary>
public static Task<TOut[]> ForeachParallel<T, TOut>(this IEnumerable<T> source,
Func<T, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
return ForeachParallel(source, asyncFunc, (orig, output) => output, degreeOfParallelism);
}