任务免分配超时
Allocation-free timeout on Task
我想为 ChannelReader.ReadyAsync 添加超时。这是我找到的两个解决方案:
var cts = new CancellationTokenSource();
cts.CancelAfter(2000);
try {
var data = chan.ReadAsync(cts.Token);
} catch (OperationCanceledException) {
// timeout
}
var tasks = new Task[] { Task.Delay(2000), chan.ReadAsync(CancellationToken.None) };
var completedTask = await Task.WhenAny(tasks);
if (completedTask == tasks[0])
// timeout
else
var data = ((T)completedTask).Result;
但是,这两种解决方案都不是免分配的。第一个在 Task.Delay 中分配一个 CancellationTokenSource,第二个分配一个 Timer。有没有办法不用任何分配就可以制作类似的代码?
编辑 1:使用第一个解决方案时的 dotTrace 输出
感谢您的回答,它们让我重新思考了我一直在寻找的东西:重用 CancellationTokenSource
。一旦 CancellationTokenSource
被取消,您就不能再使用它。但就我而言,ChannelReader.ReadAsync
大部分时间会在超时触发之前 return 所以我使用 CancelAfter
doesn't recreate a timer the second time you call it 来避免在 [=13] 之后取消 CancellationTokenSource
=] returns.
var timeoutCancellation = new CancellationTokenSource();
while (true)
{
if (timeoutCancellation.IsCancellationRequested)
{
timeoutCancellation.Dispose();
timeoutCancellation = new CancellationTokenSource();
}
T data;
try
{
timeoutCancellation.CancelAfter(2000);
data = await _queue.Reader.ReadAsync(timeoutCancellation.Token);
// make sure it doesn't get cancelled so it can be reused in the next iteration
// Timeout.Infinite won't work because it would delete the underlying timer
timeoutCancellation.CancelAfter(int.MaxValue);
}
catch (OperationCanceledException) // timeout reached
{
// handle timeout
continue;
}
// process data
}
这不是免分配的,但它大大减少了分配对象的数量。
ChannelReader<T>
class has a ReadAllAsync
method that exposes the data of the reader as an IAsyncEnumerable<T>
。下面是此方法的重载,它还接受 timeout
参数。此参数的效果是,如果 reader 在指定的时间跨度内未能发出任何项目,则会抛出 TimeoutException
。
为了减少分配,它使用了与 相同的巧妙技术,其中单个 CancellationTokenSource
在每次迭代后重新安排取消。经过一番思考后,我删除了 CancelAfter(int.MaxValue)
行,因为在一般情况下它可能有害而不是有用,但我可能是错的。
public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
this ChannelReader<TSource> source, TimeSpan timeout,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
using var cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
while (true)
{
try
{
if (!await source.WaitToReadAsync(cts.Token).ConfigureAwait(false))
yield break;
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException();
}
while (source.TryRead(out var item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
cts.CancelAfter(timeout);
// It is possible that the CTS timed-out during the yielding
if (cts.IsCancellationRequested) break; // Start a new loop with a new CTS
}
}
}
作为旁注,System.Interactive.Async package includes a Timeout
运算符具有如下所示的签名,可以与内置的 ReadAllAsync
、 结合使用并提供相同的功能有了上面的实现。不过,该方法并未针对低分配进行优化。
public static IAsyncEnumerable<TSource> Timeout<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeout);
注意:回想起来ReadAllAsync().Timeout()
的想法是危险的,因为ReadAllAsync
是一个耗时的方法。换句话说,枚举它具有从频道中删除项目的副作用。 Timeout
运算符不知道源序列内部发生了什么,因此在不幸的时刻发生的超时可能会导致项目丢失。这使得实现成为问题的唯一可靠解决方案(在本答案的范围内)。
我想为 ChannelReader.ReadyAsync 添加超时。这是我找到的两个解决方案:
var cts = new CancellationTokenSource();
cts.CancelAfter(2000);
try {
var data = chan.ReadAsync(cts.Token);
} catch (OperationCanceledException) {
// timeout
}
var tasks = new Task[] { Task.Delay(2000), chan.ReadAsync(CancellationToken.None) };
var completedTask = await Task.WhenAny(tasks);
if (completedTask == tasks[0])
// timeout
else
var data = ((T)completedTask).Result;
但是,这两种解决方案都不是免分配的。第一个在 Task.Delay 中分配一个 CancellationTokenSource,第二个分配一个 Timer。有没有办法不用任何分配就可以制作类似的代码?
编辑 1:使用第一个解决方案时的 dotTrace 输出
感谢您的回答,它们让我重新思考了我一直在寻找的东西:重用 CancellationTokenSource
。一旦 CancellationTokenSource
被取消,您就不能再使用它。但就我而言,ChannelReader.ReadAsync
大部分时间会在超时触发之前 return 所以我使用 CancelAfter
doesn't recreate a timer the second time you call it 来避免在 [=13] 之后取消 CancellationTokenSource
=] returns.
var timeoutCancellation = new CancellationTokenSource();
while (true)
{
if (timeoutCancellation.IsCancellationRequested)
{
timeoutCancellation.Dispose();
timeoutCancellation = new CancellationTokenSource();
}
T data;
try
{
timeoutCancellation.CancelAfter(2000);
data = await _queue.Reader.ReadAsync(timeoutCancellation.Token);
// make sure it doesn't get cancelled so it can be reused in the next iteration
// Timeout.Infinite won't work because it would delete the underlying timer
timeoutCancellation.CancelAfter(int.MaxValue);
}
catch (OperationCanceledException) // timeout reached
{
// handle timeout
continue;
}
// process data
}
这不是免分配的,但它大大减少了分配对象的数量。
ChannelReader<T>
class has a ReadAllAsync
method that exposes the data of the reader as an IAsyncEnumerable<T>
。下面是此方法的重载,它还接受 timeout
参数。此参数的效果是,如果 reader 在指定的时间跨度内未能发出任何项目,则会抛出 TimeoutException
。
为了减少分配,它使用了与 CancellationTokenSource
在每次迭代后重新安排取消。经过一番思考后,我删除了 CancelAfter(int.MaxValue)
行,因为在一般情况下它可能有害而不是有用,但我可能是错的。
public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
this ChannelReader<TSource> source, TimeSpan timeout,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
using var cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
while (true)
{
try
{
if (!await source.WaitToReadAsync(cts.Token).ConfigureAwait(false))
yield break;
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException();
}
while (source.TryRead(out var item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
cts.CancelAfter(timeout);
// It is possible that the CTS timed-out during the yielding
if (cts.IsCancellationRequested) break; // Start a new loop with a new CTS
}
}
}
作为旁注,System.Interactive.Async package includes a Timeout
运算符具有如下所示的签名,可以与内置的 ReadAllAsync
、 结合使用并提供相同的功能有了上面的实现。不过,该方法并未针对低分配进行优化。
public static IAsyncEnumerable<TSource> Timeout<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeout);
注意:回想起来ReadAllAsync().Timeout()
的想法是危险的,因为ReadAllAsync
是一个耗时的方法。换句话说,枚举它具有从频道中删除项目的副作用。 Timeout
运算符不知道源序列内部发生了什么,因此在不幸的时刻发生的超时可能会导致项目丢失。这使得实现成为问题的唯一可靠解决方案(在本答案的范围内)。