一个枚举器包装器,预先缓冲来自底层枚举器的多个项目

An enumerator wrapper that pre-buffers a number of items from underlying enumerator in advance

假设我有一些 IEnumerator<T>MoveNext() 方法中进行大量处理。

从该枚举器消耗的代码不仅消耗数据可用的速度,而且偶尔会等待(具体细节与我的问题无关)以便在需要恢复消耗时同步时间。但是当它下次调用 MoveNext() 时,它需要尽可能快的数据。

一种方法是将整个流预先消耗到某个列表或数组结构中以进行即时枚举。然而,这会浪费内存,因为在任何一个时间点,只有一个项目在使用中,并且在整个数据不适合内存的情况下,这将是禁止的。

所以 .net 中是否有一些通用的东西包装枚举器/可枚举的方式,它异步地预先迭代底层枚举器的几个项目并缓冲结果 这样它的缓冲区中总是有许多可用的项目,并且调用 MoveNext 永远不必等待?显然,消耗的项目,即由调用者的后续 MoveNext 迭代的项目,将从缓冲区中删除。

N.B. 我正在尝试做的部分工作也称为 Backpressure,并且在 Rx 中世界,已在 RxJava and is under discussion in Rx.NET 中实施。 Rx(推送数据的可观察对象)可以被认为是枚举器的相反方法(枚举器允许提取数据)。背压在拉动方法中相对容易,正如我的回答所示:暂停消费即可。推的时候比较难,需要额外的反馈机制。

有多种方法可以自己实现,我决定使用

  • 每个枚举器一个专用线程执行异步预缓冲
  • 预缓冲固定数量的元素

这非常适合我手头的情况(只有少数非常长的 运行 枚举器),但是例如如果您使用大量的枚举器,创建一个线程可能太繁重,如果您需要更动态的东西,固定数量的元素可能太不灵活,可能基于项目的实际内容。

到目前为止我只测试了它的主要功能,可能还存在一些粗糙的边缘。可以这样使用:

int bufferSize = 5;
IEnumerable<int> en = ...;
foreach (var item in new PreBufferingEnumerable<int>(en, bufferSize))
{
    ...

这是枚举器的要点:

class PreBufferingEnumerator<TItem> : IEnumerator<TItem>
{
    private readonly IEnumerator<TItem> _underlying;
    private readonly int _bufferSize;
    private readonly Queue<TItem> _buffer;
    private bool _done;
    private bool _disposed;

    public PreBufferingEnumerator(IEnumerator<TItem> underlying, int bufferSize)
    {
        _underlying = underlying;
        _bufferSize = bufferSize;
        _buffer = new Queue<TItem>();
        Thread preBufferingThread = new Thread(PreBufferer) { Name = "PreBufferingEnumerator.PreBufferer", IsBackground = true };
        preBufferingThread.Start();
    }

    private void PreBufferer()
    {
        while (true)
        {
            lock (_buffer)
            {
                while (_buffer.Count == _bufferSize && !_disposed)
                    Monitor.Wait(_buffer);
                if (_disposed)
                    return;
            }
            if (!_underlying.MoveNext())
            {
                lock (_buffer)
                    _done = true;
                return;
            }
            var current = _underlying.Current; // do outside lock, in case underlying enumerator does something inside get_Current()
            lock (_buffer)
            {
                _buffer.Enqueue(current);
                Monitor.Pulse(_buffer);
            }
        }
    }

    public bool MoveNext()
    {
        lock (_buffer)
        {
            while (_buffer.Count == 0 && !_done && !_disposed)
                Monitor.Wait(_buffer);
            if (_buffer.Count > 0)
            {
                Current = _buffer.Dequeue();
                Monitor.Pulse(_buffer); // so PreBufferer thread can fetch more
                return true;
            }
            return false; // _done || _disposed
        }
    }

    public TItem Current { get; private set; }

    public void Dispose()
    {
        lock (_buffer)
        {
            if (_disposed)
                return;
            _disposed = true;
            _buffer.Clear();
            Current = default(TItem);
            Monitor.PulseAll(_buffer);
        }
    }

自定义可枚举 class 的更简洁替代方法是:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
{
    var queue = new BlockingCollection<T>(bufferSize);

    Task.Run(() => {
        foreach(var i in source) queue.Add(i);
        queue.CompleteAdding();
    });

    return queue.GetConsumingEnumerable();
}

这可以用作:

var slowEnumerable = GetMySlowEnumerable();
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread