是否有并发排序字典或类似的东西?
Is there a concurrent sorted dictionary or something similar?
对于我们一直在做的一个项目,我们使用了一个并发字典,这很好,直到出现了一个要求对字典进行排序的新规范(它应该保持它添加的顺序,有点像先进先出)。
这就是我们目前所做的,我们从字典中取出 x 数量(在本例中为 5)的项目:
private Dictionary<PriorityOfMessage, ConcurrentDictionary<Guid, PriorityMessage>> mQueuedMessages = new Dictionary<PriorityOfMessage, ConcurrentDictionary<Guid, PriorityMessage>>();
var messages = new List<KeyValuePair<Guid, PriorityMessage>>();
messages.AddRange(mQueuedMessages[priority].Take(5));
然后我们用它做一些事情,如果一切顺利,我们最终将它们删除。
mQueuedMessages[priority].TryRemove(messageOfPriority.Key);
但是,如果事情失败了,我们不会删除它们,稍后再试。所以不幸的是没有并发排序的字典,但是有没有办法确保消息保持它们添加的顺序?
非常重要的是我们可以从 list/dictionary 中取出多个对象而不删除它们(或者我们需要能够稍后将它们添加到前面)。
How often will you take per second?
.
it could be a thousand times a second
每秒1000次锁操作绝对不算什么。这几乎不会消耗任何时间。
my colleague has already tried using locks and lists and he deemed it too slow
很可能这意味着锁定的区域太大了。我猜它是这样的:
lock (...) {
var item = TakeFromQueue();
Process(item);
DeleteFromQueue(item);
}
这不起作用,因为 Process
太慢了。必须是:
lock (...)
var item = TakeFromQueue();
Process(item);
lock (...)
DeleteFromQueue(item);
你根本不会有任何性能问题。
您现在可以选择任何您喜欢的数据结构。您不再受内置并发数据结构功能的束缚。除了选择您喜欢的数据结构之外,您还可以对其执行任何您喜欢的操作,例如以原子方式获取多个项目。
我还没有完全理解您的需求,但听起来 SortedList
可能会朝着正确的方向发展。
您也可以寻求其他解决方案(尚未对其性能进行测试):
public class ConcurrentIndexableQueue<T> {
private long tailIndex;
private long headIndex;
private readonly ConcurrentDictionary<long, T> dictionary;
public ConcurrentIndexableQueue() {
tailIndex = -1;
headIndex = 0;
dictionary = new ConcurrentDictionary<long, T>();
}
public long Count { get { return tailIndex - headIndex + 1; } }
public bool IsEmpty { get { return Count == 0; } }
public void Enqueue(T item) {
var enqueuePosition = Interlocked.Increment(ref tailIndex);
dictionary.AddOrUpdate(enqueuePosition, k => item, (k, v) => item);
}
public T Peek(long index) {
T item;
return dictionary.TryGetValue(index, out item) ?
item :
default(T);
}
public long TryDequeue(out T item) {
if (headIndex > tailIndex) {
item = default(T);
return -1;
}
var dequeuePosition = Interlocked.Increment(ref headIndex) - 1;
dictionary.TryRemove(dequeuePosition, out item);
return dequeuePosition;
}
public List<T> GetSnapshot() {
List<T> snapshot = new List<T>();
long snapshotTail = tailIndex;
long snapshotHead = headIndex;
for (long i = snapshotHead; i < snapshotTail; i++) {
T item;
if (TryDequeue(out item) >= 0) {
snapshot.Add(item);
}
}
return snapshot;
}
}
对于我们一直在做的一个项目,我们使用了一个并发字典,这很好,直到出现了一个要求对字典进行排序的新规范(它应该保持它添加的顺序,有点像先进先出)。
这就是我们目前所做的,我们从字典中取出 x 数量(在本例中为 5)的项目:
private Dictionary<PriorityOfMessage, ConcurrentDictionary<Guid, PriorityMessage>> mQueuedMessages = new Dictionary<PriorityOfMessage, ConcurrentDictionary<Guid, PriorityMessage>>();
var messages = new List<KeyValuePair<Guid, PriorityMessage>>();
messages.AddRange(mQueuedMessages[priority].Take(5));
然后我们用它做一些事情,如果一切顺利,我们最终将它们删除。
mQueuedMessages[priority].TryRemove(messageOfPriority.Key);
但是,如果事情失败了,我们不会删除它们,稍后再试。所以不幸的是没有并发排序的字典,但是有没有办法确保消息保持它们添加的顺序?
非常重要的是我们可以从 list/dictionary 中取出多个对象而不删除它们(或者我们需要能够稍后将它们添加到前面)。
How often will you take per second?
.
it could be a thousand times a second
每秒1000次锁操作绝对不算什么。这几乎不会消耗任何时间。
my colleague has already tried using locks and lists and he deemed it too slow
很可能这意味着锁定的区域太大了。我猜它是这样的:
lock (...) {
var item = TakeFromQueue();
Process(item);
DeleteFromQueue(item);
}
这不起作用,因为 Process
太慢了。必须是:
lock (...)
var item = TakeFromQueue();
Process(item);
lock (...)
DeleteFromQueue(item);
你根本不会有任何性能问题。
您现在可以选择任何您喜欢的数据结构。您不再受内置并发数据结构功能的束缚。除了选择您喜欢的数据结构之外,您还可以对其执行任何您喜欢的操作,例如以原子方式获取多个项目。
我还没有完全理解您的需求,但听起来 SortedList
可能会朝着正确的方向发展。
您也可以寻求其他解决方案(尚未对其性能进行测试):
public class ConcurrentIndexableQueue<T> {
private long tailIndex;
private long headIndex;
private readonly ConcurrentDictionary<long, T> dictionary;
public ConcurrentIndexableQueue() {
tailIndex = -1;
headIndex = 0;
dictionary = new ConcurrentDictionary<long, T>();
}
public long Count { get { return tailIndex - headIndex + 1; } }
public bool IsEmpty { get { return Count == 0; } }
public void Enqueue(T item) {
var enqueuePosition = Interlocked.Increment(ref tailIndex);
dictionary.AddOrUpdate(enqueuePosition, k => item, (k, v) => item);
}
public T Peek(long index) {
T item;
return dictionary.TryGetValue(index, out item) ?
item :
default(T);
}
public long TryDequeue(out T item) {
if (headIndex > tailIndex) {
item = default(T);
return -1;
}
var dequeuePosition = Interlocked.Increment(ref headIndex) - 1;
dictionary.TryRemove(dequeuePosition, out item);
return dequeuePosition;
}
public List<T> GetSnapshot() {
List<T> snapshot = new List<T>();
long snapshotTail = tailIndex;
long snapshotHead = headIndex;
for (long i = snapshotHead; i < snapshotTail; i++) {
T item;
if (TryDequeue(out item) >= 0) {
snapshot.Add(item);
}
}
return snapshot;
}
}