实现消费者队列的最佳方式,您可以按顺序从中删除项目(.net 6)
Best way to implement consumer queue that you can remove items from sequentially (.net 6)
这里是新海报所以我希望这是有道理的...
我需要创建一个集合,我可以按顺序从中删除项目(主要是股市时间序列数据)。
数据生产者是多线程的,不保证数据按顺序来。
我四处寻找解决方案,但我唯一能想到的就是创建我自己的自定义字典,使用 ConcurrentDictionary 并实现 IProducerConsumer 接口,以便它可以与 BlockingCollection 一起使用。
我下面的代码确实有效,但会产生错误
System.InvalidOperationException: The underlying collection was
modified from outside of the BlockingCollection
当使用 GetConsumingEnumerable() for 循环时,序列中的下一个键不在字典中。在这种情况下,我想等待指定的时间
然后再次尝试从队列中取出项目。
我的问题是:
- 没有密钥时处理错误的最佳方法是什么。目前看来处理错误需要退出循环。也许使用 GetConsumingEnumerable() 不是正确的消费方式,while 循环会更好?
代码如下 - 非常感谢help/ideas。
IProducerConsumer 实现:
public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();
int ICollection.Count => _dictionary.Count;
bool ICollection.IsSynchronized => false;
object ICollection.SyncRoot => throw new NotSupportedException();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
_dictionary.ToList().CopyTo(array, index);
}
void ICollection.CopyTo(Array array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
((ICollection)_dictionary.ToList()).CopyTo(array, index);
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
return _dictionary.ToList().ToArray();
}
bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
{
return _dictionary.TryAdd(item.Key, item.Value);
}
public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = this.FirstOrDefault();
TValue? value;
return _dictionary.TryRemove(item.Key, out value);
}
}
时序队列实现(继承以上)
public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
private DateTime _previousTime;
private DateTime _nextTime;
private readonly int _intervalSeconds;
public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
{
_intervalSeconds = intervalSeconds;
_previousTime = startTime;
_nextTime = startTime;
}
public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
{
item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
T? value = default(T);
if (item.Value == null)
return false;
bool result = _dictionary.TryRemove(item.Key, out value);
if (result)
{
_previousTime = _nextTime;
_nextTime = _nextTime.AddSeconds(_intervalSeconds);
}
return result;
}
}
用法:
BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());
消耗循环 - 在新线程中开始:
foreach (var item in _queue.GetConsumingEnumerable())
{
// feed downstream
}
When using the GetConsumingEnumerable()
for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.
我会尝试笼统地回答这个问题,而不会过多关注您问题的具体细节。所以假设你正在消费
像这样的 BlockingCollection<T>
:
foreach (var item in collection.GetConsumingEnumerable())
{
// Do something with the consumed item.
}
...并且您想避免无限期地等待物品到达。您想每 5 秒醒来一次并在 waiting/sleeping 再次之前做一些事情。
以下是您的操作方法:
while (!collection.IsCompleted)
{
bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
if (consumed)
{
// Do something with the consumed item.
}
else
{
// Do something before trying again to take an item.
}
}
以上模式模仿actual source code of the BlockingCollection<T>.GetConsumingEnumerable
方法。
如果你想变得更有趣,你可以将此功能合并到 BlockingCollection<T>
class 的自定义扩展方法中,如下所示:
public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
this BlockingCollection<T> source, TimeSpan timeout)
{
while (!source.IsCompleted)
{
bool consumed = source.TryTake(out var item, timeout);
yield return (consumed, item);
}
}
用法示例:
foreach (var (consumed, item) in collection.GetConsumingEnumerable(
TimeSpan.FromSeconds(5)))
{
// Do something depending on whether an item was consumed or not.
}
这里是新海报所以我希望这是有道理的...
我需要创建一个集合,我可以按顺序从中删除项目(主要是股市时间序列数据)。 数据生产者是多线程的,不保证数据按顺序来。
我四处寻找解决方案,但我唯一能想到的就是创建我自己的自定义字典,使用 ConcurrentDictionary 并实现 IProducerConsumer 接口,以便它可以与 BlockingCollection 一起使用。
我下面的代码确实有效,但会产生错误
System.InvalidOperationException: The underlying collection was modified from outside of the BlockingCollection
当使用 GetConsumingEnumerable() for 循环时,序列中的下一个键不在字典中。在这种情况下,我想等待指定的时间 然后再次尝试从队列中取出项目。
我的问题是:
- 没有密钥时处理错误的最佳方法是什么。目前看来处理错误需要退出循环。也许使用 GetConsumingEnumerable() 不是正确的消费方式,while 循环会更好?
代码如下 - 非常感谢help/ideas。
IProducerConsumer 实现:
public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();
int ICollection.Count => _dictionary.Count;
bool ICollection.IsSynchronized => false;
object ICollection.SyncRoot => throw new NotSupportedException();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
_dictionary.ToList().CopyTo(array, index);
}
void ICollection.CopyTo(Array array, int index)
{
if (array == null)
{
throw new ArgumentNullException("array");
}
((ICollection)_dictionary.ToList()).CopyTo(array, index);
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
return _dictionary.ToList().ToArray();
}
bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
{
return _dictionary.TryAdd(item.Key, item.Value);
}
public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = this.FirstOrDefault();
TValue? value;
return _dictionary.TryRemove(item.Key, out value);
}
}
时序队列实现(继承以上)
public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
private DateTime _previousTime;
private DateTime _nextTime;
private readonly int _intervalSeconds;
public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
{
_intervalSeconds = intervalSeconds;
_previousTime = startTime;
_nextTime = startTime;
}
public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
{
item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
T? value = default(T);
if (item.Value == null)
return false;
bool result = _dictionary.TryRemove(item.Key, out value);
if (result)
{
_previousTime = _nextTime;
_nextTime = _nextTime.AddSeconds(_intervalSeconds);
}
return result;
}
}
用法:
BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());
消耗循环 - 在新线程中开始:
foreach (var item in _queue.GetConsumingEnumerable())
{
// feed downstream
}
When using the
GetConsumingEnumerable()
for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.
我会尝试笼统地回答这个问题,而不会过多关注您问题的具体细节。所以假设你正在消费
像这样的 BlockingCollection<T>
:
foreach (var item in collection.GetConsumingEnumerable())
{
// Do something with the consumed item.
}
...并且您想避免无限期地等待物品到达。您想每 5 秒醒来一次并在 waiting/sleeping 再次之前做一些事情。 以下是您的操作方法:
while (!collection.IsCompleted)
{
bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
if (consumed)
{
// Do something with the consumed item.
}
else
{
// Do something before trying again to take an item.
}
}
以上模式模仿actual source code of the BlockingCollection<T>.GetConsumingEnumerable
方法。
如果你想变得更有趣,你可以将此功能合并到 BlockingCollection<T>
class 的自定义扩展方法中,如下所示:
public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
this BlockingCollection<T> source, TimeSpan timeout)
{
while (!source.IsCompleted)
{
bool consumed = source.TryTake(out var item, timeout);
yield return (consumed, item);
}
}
用法示例:
foreach (var (consumed, item) in collection.GetConsumingEnumerable(
TimeSpan.FromSeconds(5)))
{
// Do something depending on whether an item was consumed or not.
}