以线程安全的方式填充映射并将该映射从后台线程传递给另一个方法?

Populate map in a thread safe way and pass that map to another method from a background thread?

我有一个下面的 class,其中 add 方法将被多个线程调用以填充 channelMessageHolder CHM 以线程安全的方式。

在同一个 class 中,我有一个每 30 秒运行一次的后台线程,它通过从 channelMessageHolder.

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
                new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        send();
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  // this will be called by only single background thread
  private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
    for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
      Channel channel = entry.getKey();
      ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();

      while (!messageHolder.isEmpty()) {
        Message message = messageHolder.poll();
        ....
        // process this and send to database
      }      
    }
  }

  // called by multiple threads
  public void add(final Channel channel, final Message message) {
    // populate channelMessageHolder in a thread safe way
  }
}

问题

如您所见,channelMessageHolder 已经存在于我的 Processor class 中,所以我是否需要每 30 秒显式地从该地图传递数据到 send 方法?或者我可以直接在我的发送方法中使用它?

令人困惑的是,如果我直接在我的 send 方法中使用它,那么它会同时被多个线程填充,所以这就是为什么我使用 AtomicReference 的 getAndSet 方法将它传递给 send方法。

如果我做的不对,请告诉我,还有更好的方法吗?

Or I can directly use it in my send method without passing anything

您应该可以直接在 send 方法中使用它,只需在 send 方法的开头说 channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>()) 即可,不会出现任何问题。

也就是说,Java 8ConcurrentHashMap class 中添加了一个名为 computeIfAbsent 的新方法,这意味着你并不真的需要你正在使用的AtomicReference

As you can see channelMessageHolder is already present in my Processor class so do I need to explicitly pass data from this map every 30 seconds to send method? Or I can directly use it in my send method?

您当然可以直接在 send() 方法中使用它,并且不需要 AtomicReference 包装器,因为 ConcurrentHashMap 已经同步。您需要担心的是地图中的键和值对象是否已正确同步。我假设 Channel 是不可变的并且 ConcurrentLinkedQueue 是并发的,所以你应该很好。

// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
     new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();

ConcurrentHashMap 会为您处理同步,因此生产者线程可以在您的发送者线程发送项目的同时向其中添加项目而不会发生冲突。仅当您尝试在多个线程之间共享未同步的 class 时才需要 AtomicReference

Confusion is, if I directly use it in my send method, then it will be populated by multiple threads at the same time so that's why I am using getAndSet method of AtomicReference to pass it to send method.

对,但这还可以。多个线程将向 ConcurrentLinkedQueue 添加消息。每隔 30 秒,您的后台线程启动一次,获取 Channel,出队,然后发送当时队列中的消息。 ConcurrentLinkedQueue 防止生产者和消费者的竞争条件。

您的代码中存在的问题是这不是可重入的,因为它依赖于对队列的多次调用:

while (!messageHolder.isEmpty()) {
    Message message = messageHolder.poll();

它适用于你的情况,因为看起来只有一个线程 dequeue-ing 但下面的代码更好:

while (true) {
    // only one call to the concurrent queue
    Message message = messageHolder.poll();
    if (message == null) {
        break;
    }
    ...
}