Task.WhenAll 对于 ValueTask

Task.WhenAll for ValueTask

是否有 Task.WhenAll 接受 ValueTask 的等价物?

我可以使用

解决这个问题
Task.WhenAll(tasks.Select(t => t.AsTask()))

如果它们都包装了一个 Task 就没问题了,但它会强制为真正的 ValueTask.

分配一个无用的 Task 对象

按照设计,没有。来自 the docs:

Methods may return an instance of this value type when it's likely that the result of their operations will be available synchronously and when the method is expected to be invoked so frequently that the cost of allocating a new Task for each call will be prohibitive.

For example, consider a method that could return either a Task<TResult> with a cached task as a common result or a ValueTask<TResult>. If the consumer of the result wants to use it as a Task<TResult>, such as to use with in methods like Task.WhenAll and Task.WhenAny, the ValueTask<TResult> would first need to be converted into a Task<TResult> using AsTask, which leads to an allocation that would have been avoided if a cached Task<TResult> had been used in the first place.

正如@stuartd 所指出的,它不受设计支持,我不得不手动实现:

public static async Task<IReadOnlyCollection<T>> WhenAll<T>(this IEnumerable<ValueTask<T>> tasks)
{
    var results = new List<T>();
    var toAwait = new List<Task<T>>();

    foreach (var valueTask in tasks)
    {
        if (valueTask.IsCompletedSuccessfully)
            results.Add(valueTask.Result);
        else
            toAwait.Add(valueTask.AsTask());
    }

    results.AddRange(await Task.WhenAll(toAwait).ConfigureAwait(false));

    return results;
}

当然,这只会在高吞吐量和大量 ValueTask 方面有所帮助,因为它会增加一些其他开销。

注意:正如@StephenCleary 指出的那样,这不会像 Task.WhenAll 那样保持顺序,如果需要,可以轻松更改以实现它。

尝试进行一些优化,以正确的顺序返回结果并正确处理异常。

public static ValueTask<T[]> WhenAll<T>(IEnumerable<ValueTask<T>> tasks)
    {
        var list = tasks.ToList();
        var length = list.Count;
        var result = new T[length];
        var i = 0;

        for (; i < length; i ++)
        {
            if (list[i].IsCompletedSuccessfully)
            {
                result[i] = list[i].Result;
            }
            else
            {
                return WhenAllAsync();
            }
        }

        return new ValueTask<T[]>(result);

        async ValueTask<T[]> WhenAllAsync()
        {
            for (; i < length; i ++)
            {
                try
                {
                    result[i] = await list[i];
                }
                catch
                {
                    for (i ++; i < length; i ++)
                    {
                        try
                        {
                            await list[i];
                        }
                        catch
                        {
                            // ignored
                        }
                    }

                    throw;
                }
            }

            return result;
        }
    }

我正在使用这个扩展方法:

internal static class ValueTaskExtensions
{
    public static Task WhenAll(this IEnumerable<ValueTask> tasks)
    {
        return Task.WhenAll(tasks.Select(v => v.AsTask()));
    }
}

除非我遗漏了什么,否则我们应该能够等待循环中的所有任务:

public static async ValueTask<T[]> WhenAll<T>(params ValueTask<T>[] tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);
    if (tasks.Length == 0)
        return Array.Empty<T>();

    var results = new T[tasks.Length];
    for (var i = 0; i < tasks.Length; i++)
        results[i] = await tasks[i].ConfigureAwait(false);

    return results;
}

分配
等待同步完成的 ValueTask 不应导致分配 Task。所以这里发生的唯一“额外”分配是我们用于返回结果的数组。

订单
返回项的顺序与生成它们的给定任务的顺序相同。

并发
尽管看起来我们按顺序执行任务,但实际情况并非如此,因为调用此方法时任务已经启动(即处于热状态)。因此我们只等待数组中最长的任务(感谢 Sergey 在评论中提出这个问题)。

异常
当一个任务抛出异常时,上面的代码将停止等待其余任务并抛出异常。如果这是不可取的,我们可以这样做:

public static async ValueTask<T[]> WhenAll<T>(params ValueTask<T>[] tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);
    if (tasks.Length == 0)
        return Array.Empty<T>();

    // We don't allocate the list if no task throws
    List<Exception>? exceptions = null;

    var results = new T[tasks.Length];
    for (var i = 0; i < tasks.Length; i++)
        try
        {
            results[i] = await tasks[i].ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            exceptions ??= new(tasks.Length);
            exceptions.Add(ex);
        }

    return exceptions is null
        ? results
        : throw new AggregateException(exceptions);
}

额外注意事项

  • 我们可以将其作为扩展方法。
  • 我们可以有接受 IEnumerable<ValueTask<T>>IReadOnlyList<ValueTask<T>> 的重载以实现更广泛的兼容性。

示例签名:

// There are some collections (e.g. hash-sets, queues/stacks,
// linked lists, etc) that only implement I*Collection interfaces
// and not I*List ones, but A) we're not likely to have our tasks
// in them and B) even if we do, IEnumerable accepting overload
// below should handle them. Allocation-wise; it's a ToList there
// vs GetEnumerator here.
public static async ValueTask<T[]> WhenAll<T>(
    IReadOnlyList<ValueTask<T>> tasks)
{
    // Our implementation above.
}

// ToList call below ensures that all tasks are initialized, so
// calling this with an iterator wouldn't cause the tasks to run
// sequentially.
public static ValueTask<T[]> WhenAll<T>(
    IEnumerable<ValueTask<T>> tasks)
{
    return WhenAll(tasks?.ToList());
}

// Arrays already implement IReadOnlyList<T>, but this overload
// is still useful because as the `params` keyword allows callers 
// to pass individual tasks like they are different arguments.
public static ValueTask<T[]> WhenAll<T>(
    params ValueTask<T>[] tasks)
{
    return WhenAll(tasks as IReadOnlyList<ValueTask<T>>);
}

Theodor 在评论中提到了将结果 array/list 作为参数传递的方法,因此 我们的 实现将没有所有额外分配,但调用者仍然会必须创建它,如果他们批量等待任务,这可能是有意义的,但这听起来像是一个相当专业的场景,所以如果你发现自己需要它,你可能不需要这个答案