以线程安全的方式填充映射并将该映射从后台线程传递给另一个方法?
Populate map in a thread safe way and pass that map to another method from a background thread?
我有一个下面的 class,其中 add
方法将被多个线程调用以填充 channelMessageHolder
CHM 以线程安全的方式。
在同一个 class 中,我有一个每 30 秒运行一次的后台线程,它通过从 channelMessageHolder
.
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
send();
}
}, 0, 30, TimeUnit.SECONDS);
}
// this will be called by only single background thread
private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
Channel channel = entry.getKey();
ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
....
// process this and send to database
}
}
}
// called by multiple threads
public void add(final Channel channel, final Message message) {
// populate channelMessageHolder in a thread safe way
}
}
问题
如您所见,channelMessageHolder
已经存在于我的 Processor
class 中,所以我是否需要每 30 秒显式地从该地图传递数据到 send 方法?或者我可以直接在我的发送方法中使用它?
令人困惑的是,如果我直接在我的 send 方法中使用它,那么它会同时被多个线程填充,所以这就是为什么我使用 AtomicReference 的 getAndSet
方法将它传递给 send
方法。
如果我做的不对,请告诉我,还有更好的方法吗?
Or I can directly use it in my send method without passing anything
您应该可以直接在 send
方法中使用它,只需在 send
方法的开头说 channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>())
即可,不会出现任何问题。
也就是说,Java 8 在 ConcurrentHashMap
class 中添加了一个名为 computeIfAbsent
的新方法,这意味着你并不真的需要你正在使用的AtomicReference
。
As you can see channelMessageHolder is already present in my Processor class so do I need to explicitly pass data from this map every 30 seconds to send method? Or I can directly use it in my send method?
您当然可以直接在 send()
方法中使用它,并且不需要 AtomicReference
包装器,因为 ConcurrentHashMap
已经同步。您需要担心的是地图中的键和值对象是否已正确同步。我假设 Channel
是不可变的并且 ConcurrentLinkedQueue
是并发的,所以你应该很好。
// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();
ConcurrentHashMap
会为您处理同步,因此生产者线程可以在您的发送者线程发送项目的同时向其中添加项目而不会发生冲突。仅当您尝试在多个线程之间共享未同步的 class 时才需要 AtomicReference
。
Confusion is, if I directly use it in my send method, then it will be populated by multiple threads at the same time so that's why I am using getAndSet method of AtomicReference to pass it to send method.
对,但这还可以。多个线程将向 ConcurrentLinkedQueue
添加消息。每隔 30 秒,您的后台线程启动一次,获取 Channel
,出队,然后发送当时队列中的消息。 ConcurrentLinkedQueue
防止生产者和消费者的竞争条件。
您的代码中存在的问题是这不是可重入的,因为它依赖于对队列的多次调用:
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
它适用于你的情况,因为看起来只有一个线程 dequeue-ing 但下面的代码更好:
while (true) {
// only one call to the concurrent queue
Message message = messageHolder.poll();
if (message == null) {
break;
}
...
}
我有一个下面的 class,其中 add
方法将被多个线程调用以填充 channelMessageHolder
CHM 以线程安全的方式。
在同一个 class 中,我有一个每 30 秒运行一次的后台线程,它通过从 channelMessageHolder
.
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
send();
}
}, 0, 30, TimeUnit.SECONDS);
}
// this will be called by only single background thread
private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
Channel channel = entry.getKey();
ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
....
// process this and send to database
}
}
}
// called by multiple threads
public void add(final Channel channel, final Message message) {
// populate channelMessageHolder in a thread safe way
}
}
问题
如您所见,channelMessageHolder
已经存在于我的 Processor
class 中,所以我是否需要每 30 秒显式地从该地图传递数据到 send 方法?或者我可以直接在我的发送方法中使用它?
令人困惑的是,如果我直接在我的 send 方法中使用它,那么它会同时被多个线程填充,所以这就是为什么我使用 AtomicReference 的 getAndSet
方法将它传递给 send
方法。
如果我做的不对,请告诉我,还有更好的方法吗?
Or I can directly use it in my send method without passing anything
您应该可以直接在 send
方法中使用它,只需在 send
方法的开头说 channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>())
即可,不会出现任何问题。
也就是说,Java 8 在 ConcurrentHashMap
class 中添加了一个名为 computeIfAbsent
的新方法,这意味着你并不真的需要你正在使用的AtomicReference
。
As you can see channelMessageHolder is already present in my Processor class so do I need to explicitly pass data from this map every 30 seconds to send method? Or I can directly use it in my send method?
您当然可以直接在 send()
方法中使用它,并且不需要 AtomicReference
包装器,因为 ConcurrentHashMap
已经同步。您需要担心的是地图中的键和值对象是否已正确同步。我假设 Channel
是不可变的并且 ConcurrentLinkedQueue
是并发的,所以你应该很好。
// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();
ConcurrentHashMap
会为您处理同步,因此生产者线程可以在您的发送者线程发送项目的同时向其中添加项目而不会发生冲突。仅当您尝试在多个线程之间共享未同步的 class 时才需要 AtomicReference
。
Confusion is, if I directly use it in my send method, then it will be populated by multiple threads at the same time so that's why I am using getAndSet method of AtomicReference to pass it to send method.
对,但这还可以。多个线程将向 ConcurrentLinkedQueue
添加消息。每隔 30 秒,您的后台线程启动一次,获取 Channel
,出队,然后发送当时队列中的消息。 ConcurrentLinkedQueue
防止生产者和消费者的竞争条件。
您的代码中存在的问题是这不是可重入的,因为它依赖于对队列的多次调用:
while (!messageHolder.isEmpty()) {
Message message = messageHolder.poll();
它适用于你的情况,因为看起来只有一个线程 dequeue-ing 但下面的代码更好:
while (true) {
// only one call to the concurrent queue
Message message = messageHolder.poll();
if (message == null) {
break;
}
...
}