实现消费者队列的最佳方式,您可以按顺序从中删除项目(.net 6)

Best way to implement consumer queue that you can remove items from sequentially (.net 6)

这里是新海报所以我希望这是有道理的...

我需要创建一个集合,我可以按顺序从中删除项目(主要是股市时间序列数据)。 数据生产者是多线程的,不保证数据按顺序来。

我四处寻找解决方案,但我唯一能想到的就是创建我自己的自定义字典,使用 ConcurrentDictionary 并实现 IProducerConsumer 接口,以便它可以与 BlockingCollection 一起使用。

我下面的代码确实有效,但会产生错误

System.InvalidOperationException: The underlying collection was modified from outside of the BlockingCollection

当使用 GetConsumingEnumerable() for 循环时,序列中的下一个键不在字典中。在这种情况下,我想等待指定的时间 然后再次尝试从队列中取出项目。

我的问题是:

代码如下 - 非常感谢help/ideas。

IProducerConsumer 实现:

public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
    protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();

    int ICollection.Count => _dictionary.Count;

    bool ICollection.IsSynchronized => false;

    object ICollection.SyncRoot => throw new NotSupportedException();

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        _dictionary.ToList().CopyTo(array, index);
    }

    void ICollection.CopyTo(Array array, int index)
    {

        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        ((ICollection)_dictionary.ToList()).CopyTo(array, index);
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        return _dictionary.ToList().ToArray();
    }

    bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return _dictionary.TryAdd(item.Key, item.Value);
    }

    public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = this.FirstOrDefault();
        TValue? value;

        return _dictionary.TryRemove(item.Key, out value);
    }
}

时序队列实现(继承以上)

public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
    private DateTime _previousTime;
    private DateTime _nextTime;
    private readonly int _intervalSeconds;

    public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
    {
        _intervalSeconds = intervalSeconds;
        _previousTime = startTime;
        _nextTime = startTime;
    }

    public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
    {
        item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
        T? value = default(T);

        if (item.Value == null)
            return false;

        bool result = _dictionary.TryRemove(item.Key, out value);

        if (result)
        {
            _previousTime = _nextTime;
            _nextTime = _nextTime.AddSeconds(_intervalSeconds);
        }

        return result;
    }
}

用法:

BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());

消耗循环 - 在新线程中开始:

foreach (var item in _queue.GetConsumingEnumerable())
{
    // feed downstream
}

When using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

我会尝试笼统地回答这个问题,而不会过多关注您问题的具体细节。所以假设你正在消费 像这样的 BlockingCollection<T>

foreach (var item in collection.GetConsumingEnumerable())
{
    // Do something with the consumed item.
}

...并且您想避免无限期地等待物品到达。您想每 5 秒醒来一次并在 waiting/sleeping 再次之前做一些事情。 以下是您的操作方法:

while (!collection.IsCompleted)
{
    bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
    if (consumed)
    {
        // Do something with the consumed item.
    }
    else
    {
        // Do something before trying again to take an item.
    }
}

以上模式模仿actual source code of the BlockingCollection<T>.GetConsumingEnumerable方法。

如果你想变得更有趣,你可以将此功能合并到 BlockingCollection<T> class 的自定义扩展方法中,如下所示:

public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
    this BlockingCollection<T> source, TimeSpan timeout)
{
    while (!source.IsCompleted)
    {
        bool consumed = source.TryTake(out var item, timeout);
        yield return (consumed, item);
    }
}

用法示例:

foreach (var (consumed, item) in collection.GetConsumingEnumerable(
    TimeSpan.FromSeconds(5)))
{
    // Do something depending on whether an item was consumed or not.
}