How to batch a ChannelReader<T>, enforcing a maximum interval policy between consuming and processing any individual item?

public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> channelReader, int batchSize)
    List<T> buffer = new();
    while (true)
        T item;
        try { item = await channelReader.ReadAsync(); }
        catch (ChannelClosedException) { break; }
        if (buffer.Count == batchSize)
            yield return buffer.ToArray();
    if (buffer.Count > 0) yield return buffer.ToArray();


await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
    Console.WriteLine($"Processing batch of {batch.Length} items");

I had the same "leak" issue and resolved by:

  • First read uses main token (If I have no items to handle, just wait until one arrives)
  • All the remaining items must be read in x milliseconds

This way I will never get an empty read due to timeout cancellation token (ok, maybe one empty read when application is being shutdown) and I get correct behaviour when items arrives from channel's writer.

/// <summary>
/// Reads all of the data from the channel in batches, enforcing a maximum
/// interval policy between consuming an item and emitting it in a batch.
/// </summary>
public static IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> source, int batchSize, TimeSpan timeSpan)
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    return Implementation();

    async IAsyncEnumerable<T[]> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
        var timerCts = CancellationTokenSource
            List<T> buffer = new();
            while (true)
                var token = buffer.Count == 0 ? cancellationToken : timerCts.Token;
                (T Value, bool HasValue) item;
                    item = (await source.ReadAsync(token).ConfigureAwait(false), true);
                catch (ChannelClosedException) { break; }
                catch (OperationCanceledException)
                    if (cancellationToken.IsCancellationRequested) break;
                    // Timeout occurred.
                    Debug.Assert(buffer.Count > 0);
                    item = default; 
                if (buffer.Count == 0) timerCts.CancelAfter(timeSpan);
                if (item.HasValue)
                    if (buffer.Count < batchSize) continue;
                yield return buffer.ToArray();
                if (!timerCts.TryReset())
                    timerCts = CancellationTokenSource
            // Emit what's left before throwing exceptions.
            if (buffer.Count > 0) yield return buffer.ToArray();


            // Propagate possible failure of the channel.
            if (source.Completion.IsCompleted)
                await source.Completion.ConfigureAwait(false);
        finally { timerCts.Dispose(); }


await foreach (Item[] batch in myChannel.Reader
    .ReadAllBatches(10, TimeSpan.FromSeconds(5)))
    Console.WriteLine($"Processing batch of {batch.Length} items");
