如何强制 IAsyncEnumerable 遵守 CancellationToken
How to force an IAsyncEnumerable to respect a CancellationToken
编辑:这道题的要求变了。请参阅下面的 更新 部分。
我有一个异步迭代器方法,它生成一个 IAsyncEnumerable<int>
(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。所以使用了 CancellationTokenSource
,令牌被传递为
WithCancellation
扩展方法的参数。但是令牌不受尊重。枚举一直持续到消耗完所有数字:
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
输出:
12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10
预期的输出是在数字 5 之后出现的 TaskCanceledException
。看来我误解了 WithCancellation
实际在做什么。该方法只是将提供的标记传递给迭代器方法,如果该方法接受一个标记。否则,就像我示例中的方法 GetSequence()
一样,令牌将被忽略。我想我的解决方案是手动询问枚举的 body 内的令牌:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
这很简单而且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来执行我期望 WithCancellation
执行的操作,以在随后的枚举中烘焙令牌。这是所需方法的签名:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
// Is it possible?
}
更新:看来我问这个问题的时候对整个取消概念的目的理解不正确。我的印象是取消是为了在MoveNextAsync
的等待之后打破循环,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待时间可能更长,甚至无限。意识到这一点后,我当前形式的问题几乎没有价值,我必须删除它并打开一个具有相同标题的新问题,或者更改现有问题的要求。这两种选择都以某种方式不好。
我决定选择第二个选项。所以我是 un-accepting 当前接受的答案,我正在寻求一个新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该导致在几毫秒内完成异步枚举。让我们举一个实际的例子来区分可取的和不可取的行为:
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
输出(理想):
0:00.242 > 1
0:00.467 > 2
0:00.500 > Canceled
输出(不理想):
0:00.242 > 1
0:00.467 > 2
0:00.707 > Canceled
GetSequence
与初始示例中的方法相同,每 200 毫秒传输一个数字。这个方法不支持取消,前提是我们不能改变它。 WithEnforcedCancellation
是解决此问题所需的扩展方法。
您可以像这样将逻辑提取到扩展方法中:
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
await foreach (var item in source)
{
cancellationToken.ThrowIfCancellationRequested();
yield return item;
}
}
IAsyncEnumerable
使用 EnumeratorCancellation
属性明确提供此机制:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
for (int i = 1; i <= 10; i++) {
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
}
}
事实上,如果您给方法一个 CancellationToken
参数,但不添加属性,编译器会发出警告。
请注意,传递给 .WithCancellation
的令牌将覆盖传递给该方法的任何本地令牌。 specs 有这方面的详细信息。
当然,这仍然只有在枚举实际接受 CancellationToken
时才有效——但取消只有在合作完成时才真正有效这一事实对任何 async
工作都是如此。 适用于 "forcing" 对不支持它的枚举进行某些取消措施,但首选解决方案应该是修改枚举以支持自行取消——编译器会尽一切努力提供帮助你出来了。
我认为重申您不应该这样做很重要。让异步方法支持取消标记总是更好,这样取消就会像您期望的那样立即进行。如果那不可能,我仍然建议在尝试这个答案之前尝试其他答案之一。
话虽如此,如果您不能向异步方法添加取消支持,并且您绝对需要立即 终止 foreach,然后你就可以绕过它了。
一个技巧是使用带有两个参数的Task.WhenAny
:
- 你从
IAsyncEnumerator.MoveNextAsync()
得到的任务
- 另一个支持取消的任务
这是简短的版本
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
yield return enumerator.Current;
}
带有 ConfigureAwait(false)
和 DisposeAsync()
的完整版本,如果您在本地 运行 它应该可以工作。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncStreamHelper
{
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
Task<bool> moveNext = null;
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
try
{
while (
await (
await Task.WhenAny(
(
moveNext = enumerator.MoveNextAsync().AsTask()
),
untilCanceled
).ConfigureAwait(false)
)
)
{
yield return enumerator.Current;
}
}
finally
{
if (moveNext != null && !moveNext.IsCompleted)
{
// Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!
moveNext.ContinueWith(async _ =>
{
await enumerator.DisposeAsync();
}, TaskScheduler.Default);
#pragma warning restore 4014
}
else if (enumerator != null)
{
await enumerator.DisposeAsync();
}
}
}
private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
{
// This is just one possible implementation... feel free to swap out for something else
return new Task<bool>(() => true, cancellationToken);
}
}
public class Program
{
public static async Task Main()
{
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
}
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
}
注意事项
枚举器 returns 一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 Task.WhenAny()
一起使用,因此使用 AsTask()
会降低性能通过引入分配开销。
只有在最近的 MoveNextAsync()
完成后才能释放枚举器。当请求取消时,任务更有可能仍在 运行ning 中。这就是为什么我在延续任务中添加了对 DisposeAsync
的另一个调用。
在这种情况下,当 WithEnforcedCancellation()
方法退出时,枚举数尚未被释放。在枚举被放弃后,它将被处理一段不确定的时间。如果 DisposeAsync()
抛出异常,则异常将丢失。它无法使调用堆栈冒泡,因为没有调用堆栈。
编辑:这道题的要求变了。请参阅下面的 更新 部分。
我有一个异步迭代器方法,它生成一个 IAsyncEnumerable<int>
(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。所以使用了 CancellationTokenSource
,令牌被传递为
WithCancellation
扩展方法的参数。但是令牌不受尊重。枚举一直持续到消耗完所有数字:
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
输出:
12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10
预期的输出是在数字 5 之后出现的 TaskCanceledException
。看来我误解了 WithCancellation
实际在做什么。该方法只是将提供的标记传递给迭代器方法,如果该方法接受一个标记。否则,就像我示例中的方法 GetSequence()
一样,令牌将被忽略。我想我的解决方案是手动询问枚举的 body 内的令牌:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
这很简单而且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来执行我期望 WithCancellation
执行的操作,以在随后的枚举中烘焙令牌。这是所需方法的签名:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
// Is it possible?
}
更新:看来我问这个问题的时候对整个取消概念的目的理解不正确。我的印象是取消是为了在MoveNextAsync
的等待之后打破循环,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待时间可能更长,甚至无限。意识到这一点后,我当前形式的问题几乎没有价值,我必须删除它并打开一个具有相同标题的新问题,或者更改现有问题的要求。这两种选择都以某种方式不好。
我决定选择第二个选项。所以我是 un-accepting 当前接受的答案,我正在寻求一个新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该导致在几毫秒内完成异步枚举。让我们举一个实际的例子来区分可取的和不可取的行为:
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
输出(理想):
0:00.242 > 1
0:00.467 > 2
0:00.500 > Canceled
输出(不理想):
0:00.242 > 1
0:00.467 > 2
0:00.707 > Canceled
GetSequence
与初始示例中的方法相同,每 200 毫秒传输一个数字。这个方法不支持取消,前提是我们不能改变它。 WithEnforcedCancellation
是解决此问题所需的扩展方法。
您可以像这样将逻辑提取到扩展方法中:
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
await foreach (var item in source)
{
cancellationToken.ThrowIfCancellationRequested();
yield return item;
}
}
IAsyncEnumerable
使用 EnumeratorCancellation
属性明确提供此机制:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
for (int i = 1; i <= 10; i++) {
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
}
}
事实上,如果您给方法一个 CancellationToken
参数,但不添加属性,编译器会发出警告。
请注意,传递给 .WithCancellation
的令牌将覆盖传递给该方法的任何本地令牌。 specs 有这方面的详细信息。
当然,这仍然只有在枚举实际接受 CancellationToken
时才有效——但取消只有在合作完成时才真正有效这一事实对任何 async
工作都是如此。
我认为重申您不应该这样做很重要。让异步方法支持取消标记总是更好,这样取消就会像您期望的那样立即进行。如果那不可能,我仍然建议在尝试这个答案之前尝试其他答案之一。
话虽如此,如果您不能向异步方法添加取消支持,并且您绝对需要立即 终止 foreach,然后你就可以绕过它了。
一个技巧是使用带有两个参数的Task.WhenAny
:
- 你从
IAsyncEnumerator.MoveNextAsync()
得到的任务
- 另一个支持取消的任务
这是简短的版本
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
yield return enumerator.Current;
}
带有 ConfigureAwait(false)
和 DisposeAsync()
的完整版本,如果您在本地 运行 它应该可以工作。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncStreamHelper
{
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
Task<bool> moveNext = null;
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
try
{
while (
await (
await Task.WhenAny(
(
moveNext = enumerator.MoveNextAsync().AsTask()
),
untilCanceled
).ConfigureAwait(false)
)
)
{
yield return enumerator.Current;
}
}
finally
{
if (moveNext != null && !moveNext.IsCompleted)
{
// Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!
moveNext.ContinueWith(async _ =>
{
await enumerator.DisposeAsync();
}, TaskScheduler.Default);
#pragma warning restore 4014
}
else if (enumerator != null)
{
await enumerator.DisposeAsync();
}
}
}
private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
{
// This is just one possible implementation... feel free to swap out for something else
return new Task<bool>(() => true, cancellationToken);
}
}
public class Program
{
public static async Task Main()
{
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
}
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
}
注意事项
枚举器 returns 一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 Task.WhenAny()
一起使用,因此使用 AsTask()
会降低性能通过引入分配开销。
只有在最近的 MoveNextAsync()
完成后才能释放枚举器。当请求取消时,任务更有可能仍在 运行ning 中。这就是为什么我在延续任务中添加了对 DisposeAsync
的另一个调用。
在这种情况下,当 WithEnforcedCancellation()
方法退出时,枚举数尚未被释放。在枚举被放弃后,它将被处理一段不确定的时间。如果 DisposeAsync()
抛出异常,则异常将丢失。它无法使调用堆栈冒泡,因为没有调用堆栈。