在删除条目时迭代 ConcurrentHashMap
Iterate over ConcurrentHashMap while deleting entries
我想在删除条目时定期迭代 ConcurrentHashMap
,如下所示:
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
// do something
iter.remove();
}
问题是在我迭代时另一个线程可能正在更新或修改值。如果发生这种情况,这些更新可能会永远丢失,因为我的线程在迭代时只会看到陈旧的值,但 remove()
将删除活动条目。
经过一番考虑,我想到了这个解决方法:
map.forEach((key, value) -> {
// delete if value is up to date, otherwise leave for next round
if (map.remove(key, value)) {
// do something
}
});
一个问题是它不会捕获对未实现 equals()
(例如 AtomicInteger
)的可变值的修改。有没有更好的方法来安全删除并发修改?
您的解决方法实际上非常好。还有其他设施,您可以在其上构建有点类似的解决方案(例如,使用 computeIfPresent()
和墓碑值),但它们有自己的注意事项,我在略有不同的用例中使用了它们。
至于使用不为映射值实现 equals()
的类型,您可以在相应类型之上使用您自己的包装器。这是将对象相等的自定义语义注入 ConcurrentMap
.
提供的原子 replace/remove 操作的最直接方法
更新
这是一个草图,展示了如何在 ConcurrentMap.remove(Object key, Object value)
API 之上构建:
- 在您用于值的可变类型之上定义一个包装器类型,同时在当前可变值之上定义您的自定义
equals()
方法。
- 在您的
BiConsumer
(您传递给 forEach
的 lambda)中,创建值的深层副本(属于您的新包装器类型)并执行您的逻辑来确定是否该值需要从副本中删除。
- 如果需要删除该值,请调用
remove(myKey, myValueCopy)
。
- 如果在计算值是否需要删除时发生了一些并发更改,
remove(myKey, myValueCopy)
将 return false
(ABA 问题除外,这是一个单独的主题).
这里有一些代码说明了这一点:
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Playground {
private static class AtomicIntegerWrapper {
private final AtomicInteger value;
AtomicIntegerWrapper(int value) {
this.value = new AtomicInteger(value);
}
public void set(int value) {
this.value.set(value);
}
public int get() {
return this.value.get();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AtomicIntegerWrapper)) {
return false;
}
AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
if (other.value.get() == this.value.get()) {
return true;
}
return false;
}
public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
int wrapped = wrapper.get();
return new AtomicIntegerWrapper(wrapped);
}
}
private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
= new ConcurrentHashMap<>();
private static final int NUM_THREADS = 3;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; ++i) {
MAP.put(i, new AtomicIntegerWrapper(1));
}
Thread.sleep(1);
for (int i = 0; i < NUM_THREADS; ++i) {
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem = MAP.get(key);
if (elem == null) {
System.out.println("Oops...");
} else if (elem.get() == 1986) {
elem.set(1);
} else if ((rnd.nextInt() & 128) == 0) {
elem.set(1986);
}
});
}
}).start();
}
Thread.sleep(1);
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem =
AtomicIntegerWrapper.deepCopy(MAP.get(key));
if (elem.get() == 1986) {
try {
Thread.sleep(10);
} catch (Exception e) {}
boolean replaced = MAP.remove(key, elem);
if (!replaced) {
System.out.println("Bailed out!");
} else {
System.out.println("Replaced!");
}
}
});
}
}).start();
}
}
您会看到 "Bailed out!" 的打印输出,与 "Replaced!" 混合在一起(删除成功,因为没有您关心的并发更新)并且计算将在某个时候停止。
- 如果删除自定义
equals()
方法并继续使用副本,您将看到无穷无尽的"Bailed out!",因为副本永远不会被认为等于映射中的值.
- 如果您不使用副本,您将看不到 "Bailed out!" 打印出来,并且您会遇到您正在解释的问题 - 无论并发更改如何,值都会被删除。
让我们考虑一下您有哪些选择。
使用 isUpdated()
操作创建您自己的容器-class 并使用您自己的解决方法。
如果您的地图只包含几个元素,并且与 put/delete 操作相比,您正在非常频繁地迭代地图。使用 CopyOnWriteArrayList
可能是个不错的选择
CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...;
另一种选择是实现您自己的 CopyOnWriteMap
public class CopyOnWriteMap<K, V> implements Map<K, V>{
private volatile Map<K, V> currentMap;
public V put(K key, V value) {
synchronized (this) {
Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
V val = newOne.put(key, value);
this.currentMap = newOne; // atomic operation
return val;
}
}
public V remove(Object key) {
synchronized (this) {
Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
V val = newOne.remove(key);
this.currentMap = newOne; // atomic operation
return val;
}
}
[...]
}
有负面影响。如果您使用的是写时复制集合,您的更新将永远不会丢失,但您可以再次看到一些以前删除的条目。
最坏的情况:如果地图被复制,每次删除的条目都会被恢复。
您的解决方法可行,但存在一种可能的情况。如果某些条目不断更新 map.remove(key,value) 在更新结束之前可能永远不会 return 为真。
如果你使用JDK8,这里是我的解决方案
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
Map.compute(entry.getKey(), (k, v) -> f(v));
//do something for prevValue
}
....
private Integer prevValue;
private Integer f(Integer v){
prevValue = v;
return null;
}
compute() 会将 f(v) 应用于值,在我们的例子中,将值分配给全局变量并删除条目。
根据 Javadoc,它是原子的。
Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping). The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.
我想在删除条目时定期迭代 ConcurrentHashMap
,如下所示:
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
// do something
iter.remove();
}
问题是在我迭代时另一个线程可能正在更新或修改值。如果发生这种情况,这些更新可能会永远丢失,因为我的线程在迭代时只会看到陈旧的值,但 remove()
将删除活动条目。
经过一番考虑,我想到了这个解决方法:
map.forEach((key, value) -> {
// delete if value is up to date, otherwise leave for next round
if (map.remove(key, value)) {
// do something
}
});
一个问题是它不会捕获对未实现 equals()
(例如 AtomicInteger
)的可变值的修改。有没有更好的方法来安全删除并发修改?
您的解决方法实际上非常好。还有其他设施,您可以在其上构建有点类似的解决方案(例如,使用 computeIfPresent()
和墓碑值),但它们有自己的注意事项,我在略有不同的用例中使用了它们。
至于使用不为映射值实现 equals()
的类型,您可以在相应类型之上使用您自己的包装器。这是将对象相等的自定义语义注入 ConcurrentMap
.
更新
这是一个草图,展示了如何在 ConcurrentMap.remove(Object key, Object value)
API 之上构建:
- 在您用于值的可变类型之上定义一个包装器类型,同时在当前可变值之上定义您的自定义
equals()
方法。 - 在您的
BiConsumer
(您传递给forEach
的 lambda)中,创建值的深层副本(属于您的新包装器类型)并执行您的逻辑来确定是否该值需要从副本中删除。 - 如果需要删除该值,请调用
remove(myKey, myValueCopy)
。- 如果在计算值是否需要删除时发生了一些并发更改,
remove(myKey, myValueCopy)
将 returnfalse
(ABA 问题除外,这是一个单独的主题).
- 如果在计算值是否需要删除时发生了一些并发更改,
这里有一些代码说明了这一点:
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Playground {
private static class AtomicIntegerWrapper {
private final AtomicInteger value;
AtomicIntegerWrapper(int value) {
this.value = new AtomicInteger(value);
}
public void set(int value) {
this.value.set(value);
}
public int get() {
return this.value.get();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AtomicIntegerWrapper)) {
return false;
}
AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
if (other.value.get() == this.value.get()) {
return true;
}
return false;
}
public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
int wrapped = wrapper.get();
return new AtomicIntegerWrapper(wrapped);
}
}
private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
= new ConcurrentHashMap<>();
private static final int NUM_THREADS = 3;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; ++i) {
MAP.put(i, new AtomicIntegerWrapper(1));
}
Thread.sleep(1);
for (int i = 0; i < NUM_THREADS; ++i) {
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem = MAP.get(key);
if (elem == null) {
System.out.println("Oops...");
} else if (elem.get() == 1986) {
elem.set(1);
} else if ((rnd.nextInt() & 128) == 0) {
elem.set(1986);
}
});
}
}).start();
}
Thread.sleep(1);
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem =
AtomicIntegerWrapper.deepCopy(MAP.get(key));
if (elem.get() == 1986) {
try {
Thread.sleep(10);
} catch (Exception e) {}
boolean replaced = MAP.remove(key, elem);
if (!replaced) {
System.out.println("Bailed out!");
} else {
System.out.println("Replaced!");
}
}
});
}
}).start();
}
}
您会看到 "Bailed out!" 的打印输出,与 "Replaced!" 混合在一起(删除成功,因为没有您关心的并发更新)并且计算将在某个时候停止。
- 如果删除自定义
equals()
方法并继续使用副本,您将看到无穷无尽的"Bailed out!",因为副本永远不会被认为等于映射中的值. - 如果您不使用副本,您将看不到 "Bailed out!" 打印出来,并且您会遇到您正在解释的问题 - 无论并发更改如何,值都会被删除。
让我们考虑一下您有哪些选择。
使用
isUpdated()
操作创建您自己的容器-class 并使用您自己的解决方法。如果您的地图只包含几个元素,并且与 put/delete 操作相比,您正在非常频繁地迭代地图。使用
CopyOnWriteArrayList
可能是个不错的选择CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...;
另一种选择是实现您自己的
CopyOnWriteMap
public class CopyOnWriteMap<K, V> implements Map<K, V>{ private volatile Map<K, V> currentMap; public V put(K key, V value) { synchronized (this) { Map<K, V> newOne = new HashMap<K, V>(this.currentMap); V val = newOne.put(key, value); this.currentMap = newOne; // atomic operation return val; } } public V remove(Object key) { synchronized (this) { Map<K, V> newOne = new HashMap<K, V>(this.currentMap); V val = newOne.remove(key); this.currentMap = newOne; // atomic operation return val; } } [...] }
有负面影响。如果您使用的是写时复制集合,您的更新将永远不会丢失,但您可以再次看到一些以前删除的条目。
最坏的情况:如果地图被复制,每次删除的条目都会被恢复。
您的解决方法可行,但存在一种可能的情况。如果某些条目不断更新 map.remove(key,value) 在更新结束之前可能永远不会 return 为真。
如果你使用JDK8,这里是我的解决方案
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
Map.compute(entry.getKey(), (k, v) -> f(v));
//do something for prevValue
}
....
private Integer prevValue;
private Integer f(Integer v){
prevValue = v;
return null;
}
compute() 会将 f(v) 应用于值,在我们的例子中,将值分配给全局变量并删除条目。
根据 Javadoc,它是原子的。
Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping). The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.