如何在每个实现的异步中抛出取消异常?

How to throw cancellation exception in an async for each implementation?

我为每个定义和使用如下的实现自定义异步:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int partitionCount, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(partitionCount)
        select Task.Run(async delegate
        {
            using (partition)
            {
                while (partition.MoveNext())
                {
                    await body(partition.Current).ConfigureAwait(false);
                }
            }
        })
    );
}

...

List<long> ids = new List...

await ids.ForEachAsync(8,
    async (id) =>
    {
        await myTask(id);
    }
);

效果很好,但现在我需要修改它以允许传入取消标记。我已经尝试过像这样简单的方法:

List<long> ids = new List...

await ids.ForEachAsync(8,
    async (id) =>
    {
        myToken.ThrowIfCancellationRequested();
        await myTask(id);
    }
);

但这失败了。正如我所预料的那样,我没有出现 OperationCanceledException 冒泡,而是收到了一个异常,该异常是由于取消而由其中一个线程抛出的。我也尝试过将令牌传递给异步扩展方法,但这似乎也没有用。有人可以提供有关如何完成此操作的指导吗?谢谢

要使异常冒泡,您需要将令牌传递给 Task.Run 它只需要对您的代码进行少量修改。

public static Task ForEachAsync<T>(this IEnumerable<T> source, int partitionCount, Func<T, Task> body, CancellationToken token = default(CancellationToken))
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(partitionCount)
        select Task.Run(async delegate
        {
            using (partition)
            {
                while (partition.MoveNext())
                {
                    await body(partition.Current).ConfigureAwait(false);
                }
            }
        }, token) //token passed in
    );
}

一样使用
await ids.ForEachAsync(8,
    async (id) =>
    {
        myToken.ThrowIfCancellationRequested();
        await myTask(id);
    },
    myToken //token passed in
);