如何对 IAsyncEnumerable<T> 进行批处理,在连续批次之间强制执行最大间隔策略?

How to batch an IAsyncEnumerable<T>, enforcing a maximum interval policy between consecutive batches?

我有一个异步消息序列(流),有时大量到达,有时零星到达,我想以每批 10 条消息为一组来处理它们。我还想对接收消息和处理消息之间的延迟施加上限,因此如果在收到该批消息的第一条消息后 5 秒过去了,则还应该处理少于 10 条消息的批次。我发现我可以使用 Buffer operator from the System.Interactive.Async 包解决问题的第一部分:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}

Buffer操作员的签名:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

不幸的是,Buffer 运算符没有带有 TimeSpan 参数的重载,所以我无法轻易解决问题的第二部分。我将不得不自己以某种方式实现一个带有计时器的批处理操作符。我的问题是:如何实现具有以下签名的 Buffer 运算符的变体?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);

timeSpan 参数应该像这样影响 Buffer 运算符的行为:

  1. 在发出前一批之后 timeSpan 过去时(或最初在调用 Buffer 方法之后),必须发出一批。
  2. 如果在发出上一个批次后 timeSpan 已经过去,并且在此期间没有收到任何消息,则必须发出一个空批次。
  3. 比每个 timeSpan 更频繁地发出批次意味着批次已满。在 timeSpan 过去之前发出少于 count 消息的批处理是不可取的。

我可以根据需要向我的项目添加外部依赖项,例如 System.Interactive.Async or the System.Linq.Async 包。

P.S。这个问题的灵感来自 a recent question 与通道和内存泄漏相关的问题。

这里有两种方法可以解决这个问题。第一个有缺陷,但由于其极其简单,我还是将其发布。一个 Buffer operator with a TimeSpan parameter already exists in the System.Reactive package, and converters between asynchronous and observable sequences exist in the System.Linq.Async 包。所以这只是将三个已经可用的运算符链接在一起的问题:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
    return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}

不幸的是,这种巧妙的方法是有缺陷的,因为从拉动模型转变为推动模型并返回拉动模型会产生副作用。发生的情况是中间可观察序列在订阅时开始积极地拉取源 IAsyncEnumerable,而不管结果 IAsyncEnumerable 是如何拉取的。因此,不是结果序列的消费者成为枚举的驱动程序,而是枚举以源序列允许的最大速度在后台静默发生,并且生成的消息缓冲在内部队列中。因此,不仅有可能对消息的处理施加隐藏的延迟,而且内存消耗也有可能失控地飙升。

第二种是实践方法,它使用Task.Delay method as a timer, and the Task.WhenAny 方法来协调计时器和枚举任务。这种方法的行为类似于基于 Rx 的方法,除了源序列的枚举是由结果序列的消费者驱动的,正如人们所期望的那样。

/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
    return Implementation();

    async IAsyncEnumerable<IList<TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var timerCts = new CancellationTokenSource();
        var delayTask = Task.Delay(timeSpan, timerCts.Token);
        (ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
        using var linkedCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
        try
        {
            moveNext = (enumerator.MoveNextAsync(), null);
            var buffer = new List<TSource>(count);
            ExceptionDispatchInfo error = null;
            while (true)
            {
                Task completedTask = null;
                if (!moveNext.ValueTask.IsCompleted)
                {
                    // Preserve the ValueTask, if it's not preserved already.
                    if (moveNext.Task == null)
                    {
                        var preserved = moveNext.ValueTask.AsTask();
                        moveNext = (new ValueTask<bool>(preserved), preserved);
                    }
                    completedTask = await Task.WhenAny(moveNext.Task, delayTask)
                        .ConfigureAwait(false);
                }
                if (completedTask == delayTask)
                {
                    Debug.Assert(delayTask.IsCompleted);
                    yield return buffer.ToArray(); // It's OK if the buffer is empty.
                    buffer.Clear();
                    delayTask = Task.Delay(timeSpan, timerCts.Token);
                }
                else
                {
                    Debug.Assert(moveNext.ValueTask.IsCompleted);
                    // Await a copy, to prevent a second await on finally.
                    var moveNextCopy = moveNext.ValueTask;
                    moveNext = default;
                    bool moved;
                    try { moved = await moveNextCopy.ConfigureAwait(false); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                    if (!moved) break;
                    buffer.Add(enumerator.Current);
                    if (buffer.Count == count)
                    {
                        timerCts.Cancel(); timerCts.Dispose();
                        timerCts = new CancellationTokenSource();
                        yield return buffer.ToArray();
                        buffer.Clear();
                        delayTask = Task.Delay(timeSpan, timerCts.Token);
                    }
                    try { moveNext = (enumerator.MoveNextAsync(), null); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                }
            }
            if (buffer.Count > 0) yield return buffer.ToArray();
            error?.Throw();
        }
        finally
        {
            // The finally runs when an enumerator created by this method is disposed.
            timerCts.Cancel(); timerCts.Dispose();
            // Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
            // Cancel the async-enumerator, for more responsive completion.
            // Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
            linkedCts.Cancel();
            try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

已采取措施避免泄漏即发即弃 MoveNextAsync 操作或计时器。

仅当 MoveNextAsync 调用 returns 未完成的 ValueTask<bool> 时才会分配 Task 包装器。

此实现是非破坏性的,这意味着从源序列中消耗的任何元素都不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。

如何使用 Channel 来实现所需的功能?如果使用类似这种扩展方法从队列中读取直到超时到期,是否存在任何缺陷?

public static async Task<List<T>> ReadWithTimeoutAsync<T>(this ChannelReader<T> reader, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                linkedCts.Token.ThrowIfCancellationRequested();
            }

            Console.WriteLine("All messages read.");
        }
        catch (OperationCanceledException)
        {
            if (timeoutTokenSrc.Token.IsCancellationRequested)
            {
                Console.WriteLine($"Delay ({readTOut.Milliseconds} msec) for reading items from message channel has expired.");
            }
            else if (cancellationToken.IsCancellationRequested)
            {
                Console.WriteLine("Cancelling per user request.");
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
    timeoutTokenSrc.Dispose();

    return messages;
}

将超时与最大值相结合。批量大小,可以添加一个令牌源:

public static async Task<List<T>> ReadBatchWithTimeoutAsync<T>(this ChannelReader<T> reader, int maxBatchSize, TimeSpan readTOut, CancellationToken cancellationToken)
{
    var timeoutTokenSrc = new CancellationTokenSource();
    timeoutTokenSrc.CancelAfter(readTOut);
    var maxSizeTokenSrc = new CancellationTokenSource();

    var messages = new List<T>();

    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSrc.Token, maxSizeTokenSrc.Token, cancellationToken))
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(linkedCts.Token))
            {
                messages.Add(item);
                if (messages.Count >= maxBatchSize)
                {
                    maxSizeTokenSrc.Cancel();
                }
                linkedCts.Token.ThrowIfCancellationRequested();
            }....