ForEachAsync 与结果
ForEachAsync with Result
我正在尝试将 Stephen Toub's ForEachAsync<T>
扩展方法更改为 return 结果...
斯蒂芬的分机:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
我的方法(无效;任务执行但结果错误)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only one per partition
return default(TResult);
}));
}
我知道我必须 return (WhenAll
?) 最后一部分的结果,但我还不知道该怎么做...
更新: 即使所有任务都已执行,我得到的结果只是 degreeOfParallelism
次 null(我猜是因为 default(TResult)
)。我也尝试 return await body(...)
然后结果很好,但只执行了 degreeOfParallelism
个任务。
您的 LINQ 查询只能有与分区数相同数量的结果 - 您只是将每个分区投影到一个结果中。
如果不在意顺序,只需要将每个分区的结果assemble成一个list,然后压平即可。
public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
var lists = await Task.WhenAll<List<TResult>>(
Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
.Select(partition => Task.Run<List<TResult>>(async () =>
{
var list = new List<TResult>();
using (partition)
{
while (partition.MoveNext())
{
list.Add(await body(partition.Current));
}
}
return list;
})));
return lists.SelectMany(list => list).ToArray();
}
(我已将其从 ForEachAsync
重命名,因为 ForEach
听起来势在必行(适合原版中的 Func<T, Task>
),而这是获取结果。A foreach
循环没有结果 - 这个有。)
现在 Parallel.ForEachAsync
API 已成为标准库 (.NET 6) 的一部分,实现 returns 和 Task<TResult[]>
的变体是有意义的,基于此 API。这是一个实现:
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
this IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
var results = new List<TResult>();
if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
var withIndexes = source.Select((item, index) => (item, index));
return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
{
var (item, index) = entry;
var result = await body(item, ct).ConfigureAwait(false);
lock (results)
{
while (results.Count <= index) results.Add(default);
results[index] = result;
}
}).ContinueWith(t =>
{
if (t.IsCanceled)
{
// Propagate the correct token
CancellationToken ct = default;
try { t.GetAwaiter().GetResult(); }
catch (OperationCanceledException oce) { ct = oce.CancellationToken; }
return Task.FromCanceled<TResult[]>(ct);
}
if (t.IsFaulted)
{
var tcs = new TaskCompletionSource<TResult[]>();
tcs.SetException(t.Exception.InnerExceptions);
return tcs.Task;
}
lock (results) return Task.FromResult(results.ToArray());
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
.Unwrap();
}
此实现支持具有 IEnumerable<T>
作为 source
的 Parallel.ForEachAsync
重载的所有选项和功能。它在错误和取消情况下的行为是相同的。结果的排列顺序与 source
序列中关联元素的顺序相同。
我正在尝试将 Stephen Toub's ForEachAsync<T>
扩展方法更改为 return 结果...
斯蒂芬的分机:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
我的方法(无效;任务执行但结果错误)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
int degreeOfParallelism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); // When I "return await",
// I get good results but only one per partition
return default(TResult);
}));
}
我知道我必须 return (WhenAll
?) 最后一部分的结果,但我还不知道该怎么做...
更新: 即使所有任务都已执行,我得到的结果只是 degreeOfParallelism
次 null(我猜是因为 default(TResult)
)。我也尝试 return await body(...)
然后结果很好,但只执行了 degreeOfParallelism
个任务。
您的 LINQ 查询只能有与分区数相同数量的结果 - 您只是将每个分区投影到一个结果中。
如果不在意顺序,只需要将每个分区的结果assemble成一个list,然后压平即可。
public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
var lists = await Task.WhenAll<List<TResult>>(
Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
.Select(partition => Task.Run<List<TResult>>(async () =>
{
var list = new List<TResult>();
using (partition)
{
while (partition.MoveNext())
{
list.Add(await body(partition.Current));
}
}
return list;
})));
return lists.SelectMany(list => list).ToArray();
}
(我已将其从 ForEachAsync
重命名,因为 ForEach
听起来势在必行(适合原版中的 Func<T, Task>
),而这是获取结果。A foreach
循环没有结果 - 这个有。)
现在 Parallel.ForEachAsync
API 已成为标准库 (.NET 6) 的一部分,实现 returns 和 Task<TResult[]>
的变体是有意义的,基于此 API。这是一个实现:
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
this IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
var results = new List<TResult>();
if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
var withIndexes = source.Select((item, index) => (item, index));
return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
{
var (item, index) = entry;
var result = await body(item, ct).ConfigureAwait(false);
lock (results)
{
while (results.Count <= index) results.Add(default);
results[index] = result;
}
}).ContinueWith(t =>
{
if (t.IsCanceled)
{
// Propagate the correct token
CancellationToken ct = default;
try { t.GetAwaiter().GetResult(); }
catch (OperationCanceledException oce) { ct = oce.CancellationToken; }
return Task.FromCanceled<TResult[]>(ct);
}
if (t.IsFaulted)
{
var tcs = new TaskCompletionSource<TResult[]>();
tcs.SetException(t.Exception.InnerExceptions);
return tcs.Task;
}
lock (results) return Task.FromResult(results.ToArray());
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
.Unwrap();
}
此实现支持具有 IEnumerable<T>
作为 source
的 Parallel.ForEachAsync
重载的所有选项和功能。它在错误和取消情况下的行为是相同的。结果的排列顺序与 source
序列中关联元素的顺序相同。