多线程检查映射大小和并发性

Multiple threads checking map size and conccurency

我有一个方法应该从队列中提供地图,并且只有在地图大小不超过特定数量时才会这样做。这提示了并发问题,因为我从每个线程获得的大小在全局上是不一致的。我通过这段代码复制了这个问题

import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrenthashMapTest {
private ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>();
private ThreadUx[] tArray = new ThreadUx[999];

public void parallelMapFilling() {
for ( int i = 0; i < 999; i++ ) {
    tArray[i] = new ThreadUx( i );
}
for ( int i = 0; i < 999; i++ ) {
    tArray[i].start();
}
}

public class ThreadUx extends Thread {
private int seq = 0;

public ThreadUx( int i ) {
    seq = i;
}

@Override
public void run() {
    while ( map.size() < 2 ) {
    map.put( seq, seq );
    System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
    }
}
}

public static void main( String[] args ) {
new ConcurrenthashMapTest().parallelMapFilling();
}
}

正常情况下我应该只有一行输出,大小不超过1,但我确实有这样的东西

Thread-1 || The size is: 2 || 2016-06-07 18:32:55.157
Thread-0 || The size is: 2 || 2016-06-07 18:32:55.157

我尝试将整个 运行 方法标记为同步,但没有成功,只有当我这样做时

@Override
    public void run() {
        synchronized ( map ) {
        if ( map.size() < 1 ) {
            map.put( seq, seq );
            System.out.println( Thread.currentThread().getName() + " || The size is: " + map.size() + " || " + new Timestamp( new Date().getTime() ) );
        }
        }
    }

成功了,为什么只有同步块和同步方法有效?此外,我不想使用像同步块一样古老的东西,因为我正在开发 Java EE 应用程序,是否有 Spring 或 Java EE 任务执行器或注释​​可以有帮助吗?

您正在使用 ConcurrentHashMap,并且根据 API 文档:

Bear in mind that the results of aggregate status methods including size, isEmpty, and containsValue are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control.

这意味着除非您显式同步对 size() 的访问,否则您无法获得准确的结果。

synchronized 添加到 run 方法不起作用,因为线程没有在同一个锁对象上同步 -- 每个线程都在自己身上获得锁。

在地图上同步本身确实有效,但恕我直言,这不是一个好的选择,因为那样你就失去了 ConcurrentHashMap 可以提供的性能优势。

总之你需要重新考虑设计。

来自 Java Concurrency in Practice:

The semantics of methods of ConcurrentHashMap that operate on the entire Map, such as size and isEmpty, have been slightly weakened to reflect the concurrent nature of the collection. Since the result of size could be out of date by the time it is computed, it is really only an estimate, so size is allowed to return an approximation instead of an exact count. While at first this may seem disturbing, in reality methods like size and isEmpty are far less useful in concurrent environments because these quantities are moving targets. So the requirements for these operations were weakened to enable performance optimizations for the most important operations, primarily get, put, containsKey, and remove.

The one feature offered by the synchronized Map implementations but not by ConcurrentHashMap is the ability to lock the map for exclusive access. With Hashtable and synchronizedMap, acquiring the Map lock prevents any other thread from accessing it. This might be necessary in unusual cases such as adding several mappings atomically, or iterating the Map several times and needing to see the same elements in the same order. On the whole, though, this is a reasonable tradeoff: concurrent collections should be expected to change their contents continuously.

解决方案:

  1. 重构设计,不使用并发访问的size方法。

  2. 要使用 sizeisEmpty 方法,您可以使用同步集合 Collections.synchronizedMap。同步集合通过序列化对集合状态的所有访问来实现线程安全。这种方法的代价是并发性差;当多个线程争用集合范围的锁时,吞吐量会受到影响。此外,您还需要将其检查和放置的块与地图实例同步,因为它是一个复合操作。

第三。使用第三方实现或自己编写。

public class BoundConcurrentHashMap <K,V> {
    private final Map<K, V> m;
    private final Semaphore semaphore;
    public BoundConcurrentHashMap(int size) {
        m = new ConcurrentHashMap<K, V>();
        semaphore = new Semaphore(size);
    }

    public V get(V key) {
        return m.get(key);
    }

    public boolean put(K key, V value) {
        boolean hasSpace = semaphore.tryAcquire();
        if(hasSpace) {
            m.put(key, value);
        }
        return hasSpace;
    }

    public void remove(Object key) {
        m.remove(key);
        semaphore.release();
    }

    // approximation, do not trust this method
    public int size(){
        return m.size();
    }
}

Class BoundConcurrentHashMapConcurrentHashMap 一样有效并且几乎是线程安全的。因为在 remove 方法中删除元素和释放信号量并不是应该同时进行的。但在这种情况下是可以容忍的。 size 方法仍然是 returns 近似值,但是 put 方法将不允许超过地图大小。