多线程检查映射大小和并发性
Multiple threads checking map size and conccurency
我有一个方法应该从队列中提供地图,并且只有在地图大小不超过特定数量时才会这样做。这提示了并发问题,因为我从每个线程获得的大小在全局上是不一致的。我通过这段代码复制了这个问题
import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrenthashMapTest {
private ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>();
private ThreadUx[] tArray = new ThreadUx[999];
public void parallelMapFilling() {
for ( int i = 0; i < 999; i++ ) {
tArray[i] = new ThreadUx( i );
}
for ( int i = 0; i < 999; i++ ) {
tArray[i].start();
}
}
public class ThreadUx extends Thread {
private int seq = 0;
public ThreadUx( int i ) {
seq = i;
}
@Override
public void run() {
while ( map.size() < 2 ) {
map.put( seq, seq );
System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
}
}
}
public static void main( String[] args ) {
new ConcurrenthashMapTest().parallelMapFilling();
}
}
正常情况下我应该只有一行输出,大小不超过1,但我确实有这样的东西
Thread-1 || The size is: 2 || 2016-06-07 18:32:55.157
Thread-0 || The size is: 2 || 2016-06-07 18:32:55.157
我尝试将整个 运行 方法标记为同步,但没有成功,只有当我这样做时
@Override
public void run() {
synchronized ( map ) {
if ( map.size() < 1 ) {
map.put( seq, seq );
System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
}
}
}
成功了,为什么只有同步块和同步方法有效?此外,我不想使用像同步块一样古老的东西,因为我正在开发 Java EE 应用程序,是否有 Spring 或 Java EE 任务执行器或注释可以有帮助吗?
您正在使用 ConcurrentHashMap
,并且根据 API 文档:
Bear in mind that the results of aggregate status methods including
size, isEmpty, and containsValue are typically useful only when a map
is not undergoing concurrent updates in other threads. Otherwise the
results of these methods reflect transient states that may be adequate
for monitoring or estimation purposes, but not for program control.
这意味着除非您显式同步对 size()
的访问,否则您无法获得准确的结果。
将 synchronized
添加到 run
方法不起作用,因为线程没有在同一个锁对象上同步 -- 每个线程都在自己身上获得锁。
在地图上同步本身确实有效,但恕我直言,这不是一个好的选择,因为那样你就失去了 ConcurrentHashMap
可以提供的性能优势。
总之你需要重新考虑设计。
来自 Java Concurrency in Practice:
The semantics of methods of ConcurrentHashMap
that operate on the entire Map, such as size
and isEmpty
, have been slightly weakened to reflect the concurrent nature of the collection. Since the result of size could be out of date by the time it is computed, it is really only an estimate, so size is allowed to return an approximation instead of an exact count. While at first this may seem disturbing, in reality methods like size
and isEmpty
are far less useful in concurrent environments because these quantities are moving targets. So the requirements for these operations were weakened to enable performance optimizations for the most important operations, primarily get
, put
, containsKey
, and remove
.
The one feature offered by the synchronized
Map
implementations but not by ConcurrentHashMap
is the ability to lock the map for exclusive access. With Hashtable
and synchronizedMap
, acquiring the Map lock prevents any other thread from accessing it. This might be necessary in unusual cases such as adding several mappings atomically, or iterating the Map several times and needing to see the same elements in the same order. On the whole, though, this is a reasonable tradeoff: concurrent collections should be expected to change their contents continuously.
解决方案:
重构设计,不使用并发访问的size
方法。
要使用 size
和 isEmpty
方法,您可以使用同步集合 Collections.synchronizedMap
。同步集合通过序列化对集合状态的所有访问来实现线程安全。这种方法的代价是并发性差;当多个线程争用集合范围的锁时,吞吐量会受到影响。此外,您还需要将其检查和放置的块与地图实例同步,因为它是一个复合操作。
第三。使用第三方实现或自己编写。
public class BoundConcurrentHashMap <K,V> {
private final Map<K, V> m;
private final Semaphore semaphore;
public BoundConcurrentHashMap(int size) {
m = new ConcurrentHashMap<K, V>();
semaphore = new Semaphore(size);
}
public V get(V key) {
return m.get(key);
}
public boolean put(K key, V value) {
boolean hasSpace = semaphore.tryAcquire();
if(hasSpace) {
m.put(key, value);
}
return hasSpace;
}
public void remove(Object key) {
m.remove(key);
semaphore.release();
}
// approximation, do not trust this method
public int size(){
return m.size();
}
}
Class BoundConcurrentHashMap
与 ConcurrentHashMap
一样有效并且几乎是线程安全的。因为在 remove
方法中删除元素和释放信号量并不是应该同时进行的。但在这种情况下是可以容忍的。 size
方法仍然是 returns 近似值,但是 put
方法将不允许超过地图大小。
我有一个方法应该从队列中提供地图,并且只有在地图大小不超过特定数量时才会这样做。这提示了并发问题,因为我从每个线程获得的大小在全局上是不一致的。我通过这段代码复制了这个问题
import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrenthashMapTest {
private ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>();
private ThreadUx[] tArray = new ThreadUx[999];
public void parallelMapFilling() {
for ( int i = 0; i < 999; i++ ) {
tArray[i] = new ThreadUx( i );
}
for ( int i = 0; i < 999; i++ ) {
tArray[i].start();
}
}
public class ThreadUx extends Thread {
private int seq = 0;
public ThreadUx( int i ) {
seq = i;
}
@Override
public void run() {
while ( map.size() < 2 ) {
map.put( seq, seq );
System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
}
}
}
public static void main( String[] args ) {
new ConcurrenthashMapTest().parallelMapFilling();
}
}
正常情况下我应该只有一行输出,大小不超过1,但我确实有这样的东西
Thread-1 || The size is: 2 || 2016-06-07 18:32:55.157
Thread-0 || The size is: 2 || 2016-06-07 18:32:55.157
我尝试将整个 运行 方法标记为同步,但没有成功,只有当我这样做时
@Override
public void run() {
synchronized ( map ) {
if ( map.size() < 1 ) {
map.put( seq, seq );
System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
}
}
}
成功了,为什么只有同步块和同步方法有效?此外,我不想使用像同步块一样古老的东西,因为我正在开发 Java EE 应用程序,是否有 Spring 或 Java EE 任务执行器或注释可以有帮助吗?
您正在使用 ConcurrentHashMap
,并且根据 API 文档:
Bear in mind that the results of aggregate status methods including size, isEmpty, and containsValue are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control.
这意味着除非您显式同步对 size()
的访问,否则您无法获得准确的结果。
将 synchronized
添加到 run
方法不起作用,因为线程没有在同一个锁对象上同步 -- 每个线程都在自己身上获得锁。
在地图上同步本身确实有效,但恕我直言,这不是一个好的选择,因为那样你就失去了 ConcurrentHashMap
可以提供的性能优势。
总之你需要重新考虑设计。
来自 Java Concurrency in Practice:
The semantics of methods of
ConcurrentHashMap
that operate on the entire Map, such assize
andisEmpty
, have been slightly weakened to reflect the concurrent nature of the collection. Since the result of size could be out of date by the time it is computed, it is really only an estimate, so size is allowed to return an approximation instead of an exact count. While at first this may seem disturbing, in reality methods likesize
andisEmpty
are far less useful in concurrent environments because these quantities are moving targets. So the requirements for these operations were weakened to enable performance optimizations for the most important operations, primarilyget
,put
,containsKey
, andremove
.The one feature offered by the
synchronized
Map
implementations but not byConcurrentHashMap
is the ability to lock the map for exclusive access. WithHashtable
andsynchronizedMap
, acquiring the Map lock prevents any other thread from accessing it. This might be necessary in unusual cases such as adding several mappings atomically, or iterating the Map several times and needing to see the same elements in the same order. On the whole, though, this is a reasonable tradeoff: concurrent collections should be expected to change their contents continuously.
解决方案:
重构设计,不使用并发访问的
size
方法。要使用
size
和isEmpty
方法,您可以使用同步集合Collections.synchronizedMap
。同步集合通过序列化对集合状态的所有访问来实现线程安全。这种方法的代价是并发性差;当多个线程争用集合范围的锁时,吞吐量会受到影响。此外,您还需要将其检查和放置的块与地图实例同步,因为它是一个复合操作。
第三。使用第三方实现或自己编写。
public class BoundConcurrentHashMap <K,V> {
private final Map<K, V> m;
private final Semaphore semaphore;
public BoundConcurrentHashMap(int size) {
m = new ConcurrentHashMap<K, V>();
semaphore = new Semaphore(size);
}
public V get(V key) {
return m.get(key);
}
public boolean put(K key, V value) {
boolean hasSpace = semaphore.tryAcquire();
if(hasSpace) {
m.put(key, value);
}
return hasSpace;
}
public void remove(Object key) {
m.remove(key);
semaphore.release();
}
// approximation, do not trust this method
public int size(){
return m.size();
}
}
Class BoundConcurrentHashMap
与 ConcurrentHashMap
一样有效并且几乎是线程安全的。因为在 remove
方法中删除元素和释放信号量并不是应该同时进行的。但在这种情况下是可以容忍的。 size
方法仍然是 returns 近似值,但是 put
方法将不允许超过地图大小。