如何强制 IAsyncEnumerable 遵守 CancellationToken

How to force an IAsyncEnumerable to respect a CancellationToken

编辑:这道题的要求变了。请参阅下面的 更新 部分。

我有一个异步迭代器方法,它生成一个 IAsyncEnumerable<int>(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。所以使用了 CancellationTokenSource,令牌被传递为 WithCancellation 扩展方法的参数。但是令牌不受尊重。枚举一直持续到消耗完所有数字:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

输出:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

预期的输出是在数字 5 之后出现的 TaskCanceledException。看来我误解了 WithCancellation 实际在做什么。该方法只是将提供的标记传递给迭代器方法,如果该方法接受一个标记。否则,就像我示例中的方法 GetSequence() 一样,令牌将被忽略。我想我的解决方案是手动询问枚举的 body 内的令牌:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

这很简单而且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来执行我期望 WithCancellation 执行的操作,以在随后的枚举中烘焙令牌。这是所需方法的签名:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

更新:看来我问这个问题的时候对整个取消概念的目的理解不正确。我的印象是取消是为了在MoveNextAsync的等待之后打破循环,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待时间可能更长,甚至无限。意识到这一点后,我当前形式的问题几乎没有价值,我必须删除它并打开一个具有相同标题的新问题,或者更改现有问题的要求。这两种选择都以某种方式不好。

我决定选择第二个选项。所以我是 un-accepting 当前接受的答案,我正在寻求一个新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该导致在几毫秒内完成异步枚举。让我们举一个实际的例子来区分可取的和不可取的行为:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

输出(理想):

0:00.242 > 1
0:00.467 > 2
0:00.500 > Canceled

输出(不理想):

0:00.242 > 1
0:00.467 > 2
0:00.707 > Canceled

GetSequence 与初始示例中的方法相同,每 200 毫秒传输一个数字。这个方法不支持取消,前提是我们不能改变它。 WithEnforcedCancellation 是解决此问题所需的扩展方法。

您可以像这样将逻辑提取到扩展方法中:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}

IAsyncEnumerable 使用 EnumeratorCancellation 属性明确提供此机制:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

事实上,如果您给方法一个 CancellationToken 参数,但不添加属性,编译器会发出警告。

请注意,传递给 .WithCancellation 的令牌将覆盖传递给该方法的任何本地令牌。 specs 有这方面的详细信息。

当然,这仍然只有在枚举实际接受 CancellationToken 时才有效——但取消只有在合作完成时才真正有效这一事实对任何 async 工作都是如此。 适用于 "forcing" 对不支持它的枚举进行某些取消措施,但首选解决方案应该是修改枚举以支持自行取消——编译器会尽一切努力提供帮助你出来了。

我认为重申您应该这样做很重要。让异步方法支持取消标记总是更好,这样取消就会像您期望的那样立即进行。如果那不可能,我仍然建议在尝试这个答案之前尝试其他答案之一。

话虽如此,如果您不能向异步方法添加取消支持,并且您绝对需要立即 终止 foreach,然后你就可以绕过它了。

一个技巧是使用带有两个参数的Task.WhenAny

  1. 你从IAsyncEnumerator.MoveNextAsync()
  2. 得到的任务
  3. 另一个支持取消的任务

这是简短的版本

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
    yield return enumerator.Current;
}

带有 ConfigureAwait(false)DisposeAsync() 的完整版本,如果您在本地 运行 它应该可以工作。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        {
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            {
                yield return enumerator.Current;
            }
        }
        finally
        {
            if (moveNext != null && !moveNext.IsCompleted)
            {
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                {
                    await enumerator.DisposeAsync();
                }, TaskScheduler.Default);
#pragma warning restore 4014
            }
            else if (enumerator != null)
            {
                await enumerator.DisposeAsync();
            }
        }
    }

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

注意事项

枚举器 returns 一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 Task.WhenAny() 一起使用,因此使用 AsTask() 会降低性能通过引入分配开销。

只有在最近的 MoveNextAsync() 完成后才能释放枚举器。当请求取消时,任务更有可能仍在 运行ning 中。这就是为什么我在延续任务中添加了对 DisposeAsync 的另一个调用。

在这种情况下,当 WithEnforcedCancellation() 方法退出时,枚举数尚未被释放。在枚举被放弃后,它将被处理一段不确定的时间。如果 DisposeAsync() 抛出异常,则异常将丢失。它无法使调用堆栈冒泡,因为没有调用堆栈。