单个数据存储的多个消息侦听器。高效设计

Multiple message listeners to single data store. Efficient design

我有一个由多个消息侦听器写入的数据存储。这些消息侦听器中的每一个也可以在数百个单独的线程中。

数据存储是 PriorityBlockingQueue,因为它需要按时间戳对插入的对象进行排序。为了有效地检查项目队列而不是循环遍历队列,使用 concurrent hashmap 作为索引的一种形式。

private Map<String, SLAData> SLADataIndex = new ConcurrentHashMap<String, SLAData>();;
private BlockingQueue<SLAData> SLADataQueue;

问题 1 这是一个可以接受的设计还是我应该只使用单个 PriorityBlockingQueue。

每个消息侦听器执行一个操作,这些侦听器被扩展到多个线程。

Insert Method 因此它会插入到两者中。

this.SLADataIndex.put(dataToWrite.getMessageId(), dataToWrite);
this.SLADataQueue.add(dataToWrite);

更新方法

this.SLADataIndex.get(messageId).setNodeId(
            updatedNodeId);

删除方法

SLATupleData data = this.SLADataIndex.get(messageId);
//remove is O(log n)
this.SLADataQueue.remove(data);
// remove from index
this.SLADataIndex.remove(messageId);

问题二使用这些方法是最有效的方法吗?它们通过另一个对象对它们进行包装以进行错误处理。

问题三使用并发的HashMap和BlockingQueue是否意味着这些操作是线程安全的?我不需要使用锁定对象?

问题四当这些方法被多个线程和监听器调用时没有任何同步块,它们可以同时被不同的线程或监听器调用吗?

Question 1 is this a acceptable design or should I just use the single PriorityBlockingQueue.

当然你应该尝试使用单个Queue。保持两个集合同步将需要更多的同步复杂性和代码中的担忧。

为什么需要 Map?如果只是调用 setNodeId(...) 那么我会让处理线程在从 Queue.

中拉出时自己执行此操作
// processing thread
while (!Thread.currentThread().isInterrupted()) {
   dataToWrite = queue.take();
   dataToWrite.setNodeId(myNodeId);
   // process data
   ...
}

Question Two Using these methods is this the most efficient way? They have wrappers around them via another object for error handling.

当然,这看起来不错,但同样,您将需要进行一些同步锁定,否则您将无法race conditions保持 2 个集合同步。

Question Three Using a concurrent HashMap and BlockingQueue does this mean these operations are thread safe? I dont need to use a lock object?

这两个 类(ConcurrentHashMapBlockingQueue 实现)都是线程安全的,是的。 BUT 由于有两个集合,您可能会出现竞争条件,其中一个集合已更新,而另一个集合尚未更新。很可能,您必须使用锁定对象来确保两个集合正确保持同步。

Question Four When these methods are called by multiple threads and listeners without any sort of synchronized block, can they be called at the same time by different threads or listeners?

在没有看到相关代码的情况下,这是一个很难回答的问题。例如。有人可能正在调用 Insert(...) 并将其添加到 Map 而不是 queue,当另一个线程调用 Delete(...) 并且该项目将在 Map 并删除,但 queue.remove() 不会在队列中找到它,因为 Insert(...) 尚未在另一个线程中完成。