如何批处理 ChannelReader<T>,在消费和处理任何单个项目之间实施最大间隔策略?

How to batch a ChannelReader<T>, enforcing a maximum interval policy between consuming and processing any individual item?

我在生产者-消费者场景中使用 Channel<T>,我需要以每批 10 个项目的形式使用通道,并且不让任何已消费的项目在缓冲区中闲置超过 5 秒。此持续时间是从通道读取项目与处理包含该项目的批次之间允许的最大延迟。最大延迟策略优先于批量大小策略,因此即使少于 10 个项目也应处理一个批次,以满足最大延迟要求。

我能够以 ChannelReader<T> class:

ReadAllBatches 扩展方法的形式实现第一个要求
public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> channelReader, int batchSize)
{
    List<T> buffer = new();
    while (true)
    {
        T item;
        try { item = await channelReader.ReadAsync(); }
        catch (ChannelClosedException) { break; }
        buffer.Add(item);
        if (buffer.Count == batchSize)
        {
            yield return buffer.ToArray();
            buffer.Clear();
        }
    }
    if (buffer.Count > 0) yield return buffer.ToArray();
}

我打算这样使用它:

await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

我的问题是: 我如何使用额外的 TimeSpan timeout 参数增强我的 ReadAllBatches<T> 实现,该参数强制执行上述最大延迟策略,并且没有将第三方包安装到我的项目中?

重要提示: 所请求的实现不应受到已报告的内存泄漏问题的影响 here。因此,消耗通道的循环不应导致应用程序使用的内存稳定增加,以防在通道中写入项目的生产者长时间空闲。

注意: 我知道 exists regarding batching the IAsyncEnumerable<T> 界面,但我对此不感兴趣。出于性能原因,我对直接针对 ChannelReader<T> 类型的方法感兴趣。

下面是 on GitHub, by tkrafael.

发布的想法的实现

I had the same "leak" issue and resolved by:

  • First read uses main token (If I have no items to handle, just wait until one arrives)
  • All the remaining items must be read in x milliseconds

This way I will never get an empty read due to timeout cancellation token (ok, maybe one empty read when application is being shutdown) and I get correct behaviour when items arrives from channel's writer.

内部 CancellationTokenSource 安排了一个取消计时器,在消耗完批处理中的第一个元素后立即取消。

/// <summary>
/// Reads all of the data from the channel in batches, enforcing a maximum
/// interval policy between consuming an item and emitting it in a batch.
/// </summary>
public static IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> source, int batchSize, TimeSpan timeSpan)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    return Implementation();

    async IAsyncEnumerable<T[]> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var timerCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        try
        {
            List<T> buffer = new();
            while (true)
            {
                var token = buffer.Count == 0 ? cancellationToken : timerCts.Token;
                (T Value, bool HasValue) item;
                try
                {
                    item = (await source.ReadAsync(token).ConfigureAwait(false), true);
                }
                catch (ChannelClosedException) { break; }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested) break;
                    // Timeout occurred.
                    Debug.Assert(timerCts.IsCancellationRequested);
                    Debug.Assert(buffer.Count > 0);
                    item = default; 
                }
                if (buffer.Count == 0) timerCts.CancelAfter(timeSpan);
                if (item.HasValue)
                {
                    buffer.Add(item.Value);
                    if (buffer.Count < batchSize) continue;
                }
                yield return buffer.ToArray();
                buffer.Clear();
                if (!timerCts.TryReset())
                {
                    timerCts.Dispose();
                    timerCts = CancellationTokenSource
                        .CreateLinkedTokenSource(cancellationToken);
                }
            }
            // Emit what's left before throwing exceptions.
            if (buffer.Count > 0) yield return buffer.ToArray();

            cancellationToken.ThrowIfCancellationRequested();

            // Propagate possible failure of the channel.
            if (source.Completion.IsCompleted)
                await source.Completion.ConfigureAwait(false);
        }
        finally { timerCts.Dispose(); }
    }
}

用法示例:

await foreach (Item[] batch in myChannel.Reader
    .ReadAllBatches(10, TimeSpan.FromSeconds(5)))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

此实现是non-destructive,这意味着从频道中消耗的任何物品都没有丢失的危险。如果枚举被取消或通道出现故障,任何消耗的项目将在错误传播之前在最后一批中发出。