在 System.Threading.Channels.Channel 中消费所有消息

Consume all messages in a System.Threading.Channels.Channel

假设我有一个许多生产者,1 个未绑定的消费者通道,有一个消费者:

await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
}

问题是 consume 函数进行了一些 IO 访问,也可能进行了一些网络访问,因此在使用 1 条消息之前可能会产生更多消息。但是由于不能并发访问IO资源,我不能有很多消费者,也不能把consume函数丢到一个Task里算了。

consume 函数可以很容易地修改为获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的 all 消息,如下所示:

while (true) {
    Message[] messages = await channel.Reader.TakeAll();
    await consumeAll(messages);
}

编辑:我能想到的 1 个选项是:

List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
    Message msg;
    while (channel.Reader.TryRead(out msg))
        messages.Add(msg);
    if (messages.Count > 0)
    {
        await consumeAll(messages);
        messages.Clear();
    }
}

但我觉得这应该是更好的方法。

阅读 Stephen Toub's primer on channels 后,我尝试编写一个扩展方法来满足您的需求(我已经有一段时间没有使用 C# 了,所以这很有趣)。

public static class ChannelReaderEx
{
    public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
        this ChannelReader<T> reader, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default
    )
    {
        while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        {
            yield return reader.Flush().ToList();
        }
    }

    public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
    {
        while (reader.TryRead(out T item))
        {
            yield return item;
        }
    }
}

可以这样使用:

await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
    await ConsumeBatch(batch);
}

ChannelReader<T> 级别上解决这个问题,就像在优秀的 中一样,是实用且足够的,但在 IAsyncEnumerable<T> 级别上解决它可能是一个更广泛的解决方案应用范围。下面是异步序列的扩展方法BufferImmediate,它产生非空缓冲区,其中包含在拉取序列时立即可用的所有元素:

/// <summary>
/// Splits the elements of a sequence into chunks that contain all the elements
/// that are immediately available.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> BufferImmediate<TSource>(
    this IAsyncEnumerable<TSource> source, int maxSize = Int32.MaxValue)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    return Implementation();

    async IAsyncEnumerable<IList<TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ValueTask<bool> moveNext = default;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            moveNext = enumerator.MoveNextAsync();
            var buffer = new List<TSource>();
            ExceptionDispatchInfo error = null;
            while (true)
            {
                if ((!moveNext.IsCompleted && buffer.Count > 0)
                    || buffer.Count >= maxSize)
                {
                    yield return buffer.ToArray();
                    buffer.Clear();
                }
                else
                {
                    // Await a copy, to prevent a second await on finally.
                    var moveNextCopy = moveNext;
                    moveNext = default;
                    bool moved;
                    try { moved = await moveNextCopy.ConfigureAwait(false); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                    if (!moved) break;
                    buffer.Add(enumerator.Current);
                    try { moveNext = enumerator.MoveNextAsync(); }
                    catch (Exception ex)
                    {
                        error = ExceptionDispatchInfo.Capture(ex); break;
                    }
                }
            }
            if (buffer.Count > 0) yield return buffer.ToArray();
            error?.Throw();
        }
        finally
        {
            // The finally runs when an enumerator created by this method is disposed.
            // Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
            // Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
            try { await moveNext.ConfigureAwait(false); } catch { }
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

用法示例:

await foreach (var batch in channel.Reader.ReadAllAsync().BufferImmediate())
{
    await ConsumeBatch(batch);
}

以上实现是非破坏性的,这意味着已经从源序列中消耗掉的元素不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。