任务免分配超时

Allocation-free timeout on Task

我想为 ChannelReader.ReadyAsync 添加超时。这是我找到的两个解决方案:

var cts = new CancellationTokenSource();
cts.CancelAfter(2000);
try {
  var data = chan.ReadAsync(cts.Token);
} catch (OperationCanceledException) {
  // timeout
}
var tasks = new Task[] { Task.Delay(2000), chan.ReadAsync(CancellationToken.None) };
var completedTask = await Task.WhenAny(tasks);
if (completedTask == tasks[0])
  // timeout
else
  var data = ((T)completedTask).Result;

但是,这两种解决方案都不是免分配的。第一个在 Task.Delay 中分配一个 CancellationTokenSource,第二个分配一个 Timer。有没有办法不用任何分配就可以制作类似的代码?

编辑 1:使用第一个解决方案时的 dotTrace 输出

感谢您的回答,它们让我重新思考了我一直在寻找的东西:重用 CancellationTokenSource。一旦 CancellationTokenSource 被取消,您就不能再使用它。但就我而言,ChannelReader.ReadAsync 大部分时间会在超时触发之前 return 所以我使用 CancelAfter doesn't recreate a timer the second time you call it 来避免在 [=13] 之后取消 CancellationTokenSource =] returns.

var timeoutCancellation = new CancellationTokenSource();

while (true)
{
    if (timeoutCancellation.IsCancellationRequested)
    {
        timeoutCancellation.Dispose();
        timeoutCancellation = new CancellationTokenSource();
    }

    T data;
    try
    {
        timeoutCancellation.CancelAfter(2000);
        data = await _queue.Reader.ReadAsync(timeoutCancellation.Token);
        // make sure it doesn't get cancelled so it can be reused in the next iteration
        // Timeout.Infinite won't work because it would delete the underlying timer
        timeoutCancellation.CancelAfter(int.MaxValue);
    }
    catch (OperationCanceledException) // timeout reached
    {
        // handle timeout
        continue;
    }

    // process data
}

这不是免分配的,但它大大减少了分配对象的数量。

ChannelReader<T> class has a ReadAllAsync method that exposes the data of the reader as an IAsyncEnumerable<T>。下面是此方法的重载,它还接受 timeout 参数。此参数的效果是,如果 reader 在指定的时间跨度内未能发出任何项目,则会抛出 TimeoutException

为了减少分配,它使用了与 相同的巧妙技术,其中单个 CancellationTokenSource 在每次迭代后重新安排取消。经过一番思考后,我删除了 CancelAfter(int.MaxValue) 行,因为在一般情况下它可能有害而不是有用,但我可能是错的。

public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
    this ChannelReader<TSource> source, TimeSpan timeout,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (true)
    {
        using var cts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        cts.CancelAfter(timeout);
        while (true)
        {
            try
            {
                if (!await source.WaitToReadAsync(cts.Token).ConfigureAwait(false))
                    yield break;
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                throw new TimeoutException();
            }
            while (source.TryRead(out var item))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }

            cts.CancelAfter(timeout);
            // It is possible that the CTS timed-out during the yielding
            if (cts.IsCancellationRequested) break; // Start a new loop with a new CTS
        }
    }
}

作为旁注,System.Interactive.Async package includes a Timeout 运算符具有如下所示的签名,可以与内置的 ReadAllAsync 结合使用并提供相同的功能有了上面的实现。不过,该方法并未针对低分配进行优化。

public static IAsyncEnumerable<TSource> Timeout<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeout);

注意:回想起来ReadAllAsync().Timeout()的想法是危险的,因为ReadAllAsync是一个耗时的方法。换句话说,枚举它具有从频道中删除项目的副作用。 Timeout 运算符不知道源序列内部发生了什么,因此在不幸的时刻发生的超时可能会导致项目丢失。这使得实现成为问题的唯一可靠解决方案(在本答案的范围内)。