跟踪地图中的重复插入(多线程环境)

Keeping a track of duplicate inserts in a Map (Multithreaded environment)

我正在寻找一种方法来跟踪在多线程环境中尝试将相同的密钥插入 Map 的次数,以便可以读取和更新 Map同时由多个线程。如果跟踪重复键插入尝试不容易实现,则替代解决方案是在重复键插入尝试的第一个迹象时终止应用程序。

以下用户定义的单例 Spring bean 显示了我的应用程序使用的全局缓存,它使用多个分区 spring 批处理作业加载(每个 DataType 加载一个作业). addResultForDataType方法可以被多个线程同时调用。

public class JobResults {

    private Map<DataType, Map<String, Object>> results;

    public JobResults() {
        results = new ConcurrentHashMap<DataType, Map<String, Object>>();
    }

    public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
        Map<String, Object> dataTypeMap = results.get(dataType);
        if (dataTypeMap == null) {
            synchronized (dataType) {
                dataTypeMap = results.get(dataType);
                if (dataTypeMap == null) {
                    dataTypeMap = new ConcurrentHashMap<String, Object>();
                    results.put(dataType, dataTypeMap);
                }
            }
        }
        dataTypeMap.put(uniqueId, result);
    }

    public Map<String, Object> getResultForDataType(DataType dataType) {
        return results.get(dataType);
    }

}

此处:

我想创建另一个地图来跟踪重复的插入:

public class JobResults {

    private Map<DataType, Map<String, Object>> results;
    private Map<DataType, ConcurrentHashMap<String, Integer>> duplicates;

    public JobResults() {
        results = new ConcurrentHashMap<DataType, Map<String, Object>>();
        duplicates = new ConcurrentHashMap<DataType, ConcurrentHashMap<String, Integer>>();
    }

    public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
        Map<String, Object> dataTypeMap = results.get(dataType);
        ConcurrentHashMap<String,Integer> duplicateCount = duplicates.get(dataType);
        if (dataTypeMap == null) {
            synchronized (dataType) {
                dataTypeMap = results.get(dataType);
                if (dataTypeMap == null) {
                    dataTypeMap = new ConcurrentHashMap<String, Object>();
                    duplicateCount = new ConcurrentHashMap<String, Integer>();
                    results.put(dataType, dataTypeMap);
                    duplicates.put(dataType, duplicateCount);
                }
            }
        }
        duplicateCount.putIfAbsent(uniqueId, 0);
        duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);//keep track of duplicate rows
        dataTypeMap.put(uniqueId, result);
    }

    public Map<String, Object> getResultForDataType(DataType dataType) {
        return results.get(dataType);
    }

}

我意识到 statemet duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1); 不是隐式线程安全的。为了使其线程安全,我需要使用同步来减慢我的插入速度。如何在不影响应用程序性能的情况下跟踪重复插入。如果跟踪重复插入并不容易,我可以在尝试覆盖映射中现有条目的第一个迹象时抛出异常。

注意 我知道 Map 不允许重复键。我想要的是一种跟踪任何此类尝试并停止应用程序而不是覆盖 Map.

中的条目的方法

尝试这样的事情:

    ConcurrentHashMap<String, AtomicInteger> duplicateCount = new ConcurrentHashMap<String, AtomicInteger>();

然后,当您准备好增加计数时,请执行以下操作:

final AtomicInteger oldCount = duplicateCount.putIfAbsent(uniqueId, new AtomicInteger(1));
if (oldCount != null) {
    oldCount.incrementAndGet();
}

因此,如果您在地图中还没有计数,您将输入 1,如果有,您将获取当前值并自动递增它。这应该是线程安全的。

如果您想跟踪插入的数量,您可以将外部映射类型更改为类似 Map<String, Pair<Integer, Object>> 的类型(或者,如果您不使用 Apache Commons,只需 Map<DataType, Map.Entry<Integer, InnerType>> ,其中 Integer 值是更新次数:

DataType key = ...;
Map<Integer, Object> value = ...;
dataTypeMap.compute(key, (k, current) -> {
    if (current == null) {
        /* Initial count is 0 */
        return Pair.of(0, value);
    } else {
        /* Increment count */
        return Pair.of(current.getFirst(), value);
    }));

如果您只关心确保没有重复插入,您可以简单地使用 computeIfAbsent:

DataType key = ...;
Map<Integer, Object> value = ...;
if (dataTypeMap.computeIfAbsent(key, k -> value)) != null) {
    /* There was already a value */
    throw new IllegalStateException(...);
});