跟踪地图中的重复插入(多线程环境)
Keeping a track of duplicate inserts in a Map (Multithreaded environment)
我正在寻找一种方法来跟踪在多线程环境中尝试将相同的密钥插入 Map
的次数,以便可以读取和更新 Map
同时由多个线程。如果跟踪重复键插入尝试不容易实现,则替代解决方案是在重复键插入尝试的第一个迹象时终止应用程序。
以下用户定义的单例 Spring bean 显示了我的应用程序使用的全局缓存,它使用多个分区 spring 批处理作业加载(每个 DataType
加载一个作业). addResultForDataType
方法可以被多个线程同时调用。
public class JobResults {
private Map<DataType, Map<String, Object>> results;
public JobResults() {
results = new ConcurrentHashMap<DataType, Map<String, Object>>();
}
public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
Map<String, Object> dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
synchronized (dataType) {
dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
dataTypeMap = new ConcurrentHashMap<String, Object>();
results.put(dataType, dataTypeMap);
}
}
}
dataTypeMap.put(uniqueId, result);
}
public Map<String, Object> getResultForDataType(DataType dataType) {
return results.get(dataType);
}
}
此处:
DataType
可以认为是来自 table 的名称或文件名
加载数据的位置。每个DataType表示一个table或文件
uniqueId
表示 table 或文件中每条记录的主键。
result
是表示整行的对象。
- 以上方法每条记录调用一次。在任何给定时间,多个线程可以为相同的
DataType
或不同的 DataType
. 插入记录
我想创建另一个地图来跟踪重复的插入:
public class JobResults {
private Map<DataType, Map<String, Object>> results;
private Map<DataType, ConcurrentHashMap<String, Integer>> duplicates;
public JobResults() {
results = new ConcurrentHashMap<DataType, Map<String, Object>>();
duplicates = new ConcurrentHashMap<DataType, ConcurrentHashMap<String, Integer>>();
}
public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
Map<String, Object> dataTypeMap = results.get(dataType);
ConcurrentHashMap<String,Integer> duplicateCount = duplicates.get(dataType);
if (dataTypeMap == null) {
synchronized (dataType) {
dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
dataTypeMap = new ConcurrentHashMap<String, Object>();
duplicateCount = new ConcurrentHashMap<String, Integer>();
results.put(dataType, dataTypeMap);
duplicates.put(dataType, duplicateCount);
}
}
}
duplicateCount.putIfAbsent(uniqueId, 0);
duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);//keep track of duplicate rows
dataTypeMap.put(uniqueId, result);
}
public Map<String, Object> getResultForDataType(DataType dataType) {
return results.get(dataType);
}
}
我意识到 statemet duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);
不是隐式线程安全的。为了使其线程安全,我需要使用同步来减慢我的插入速度。如何在不影响应用程序性能的情况下跟踪重复插入。如果跟踪重复插入并不容易,我可以在尝试覆盖映射中现有条目的第一个迹象时抛出异常。
注意 我知道 Map
不允许重复键。我想要的是一种跟踪任何此类尝试并停止应用程序而不是覆盖 Map
.
中的条目的方法
尝试这样的事情:
ConcurrentHashMap<String, AtomicInteger> duplicateCount = new ConcurrentHashMap<String, AtomicInteger>();
然后,当您准备好增加计数时,请执行以下操作:
final AtomicInteger oldCount = duplicateCount.putIfAbsent(uniqueId, new AtomicInteger(1));
if (oldCount != null) {
oldCount.incrementAndGet();
}
因此,如果您在地图中还没有计数,您将输入 1,如果有,您将获取当前值并自动递增它。这应该是线程安全的。
如果您想跟踪插入的数量,您可以将外部映射类型更改为类似 Map<String, Pair<Integer, Object>>
的类型(或者,如果您不使用 Apache Commons,只需 Map<DataType, Map.Entry<Integer, InnerType>>
,其中 Integer
值是更新次数:
DataType key = ...;
Map<Integer, Object> value = ...;
dataTypeMap.compute(key, (k, current) -> {
if (current == null) {
/* Initial count is 0 */
return Pair.of(0, value);
} else {
/* Increment count */
return Pair.of(current.getFirst(), value);
}));
如果您只关心确保没有重复插入,您可以简单地使用 computeIfAbsent
:
DataType key = ...;
Map<Integer, Object> value = ...;
if (dataTypeMap.computeIfAbsent(key, k -> value)) != null) {
/* There was already a value */
throw new IllegalStateException(...);
});
我正在寻找一种方法来跟踪在多线程环境中尝试将相同的密钥插入 Map
的次数,以便可以读取和更新 Map
同时由多个线程。如果跟踪重复键插入尝试不容易实现,则替代解决方案是在重复键插入尝试的第一个迹象时终止应用程序。
以下用户定义的单例 Spring bean 显示了我的应用程序使用的全局缓存,它使用多个分区 spring 批处理作业加载(每个 DataType
加载一个作业). addResultForDataType
方法可以被多个线程同时调用。
public class JobResults {
private Map<DataType, Map<String, Object>> results;
public JobResults() {
results = new ConcurrentHashMap<DataType, Map<String, Object>>();
}
public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
Map<String, Object> dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
synchronized (dataType) {
dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
dataTypeMap = new ConcurrentHashMap<String, Object>();
results.put(dataType, dataTypeMap);
}
}
}
dataTypeMap.put(uniqueId, result);
}
public Map<String, Object> getResultForDataType(DataType dataType) {
return results.get(dataType);
}
}
此处:
DataType
可以认为是来自 table 的名称或文件名 加载数据的位置。每个DataType表示一个table或文件uniqueId
表示 table 或文件中每条记录的主键。result
是表示整行的对象。- 以上方法每条记录调用一次。在任何给定时间,多个线程可以为相同的
DataType
或不同的DataType
. 插入记录
我想创建另一个地图来跟踪重复的插入:
public class JobResults {
private Map<DataType, Map<String, Object>> results;
private Map<DataType, ConcurrentHashMap<String, Integer>> duplicates;
public JobResults() {
results = new ConcurrentHashMap<DataType, Map<String, Object>>();
duplicates = new ConcurrentHashMap<DataType, ConcurrentHashMap<String, Integer>>();
}
public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
Map<String, Object> dataTypeMap = results.get(dataType);
ConcurrentHashMap<String,Integer> duplicateCount = duplicates.get(dataType);
if (dataTypeMap == null) {
synchronized (dataType) {
dataTypeMap = results.get(dataType);
if (dataTypeMap == null) {
dataTypeMap = new ConcurrentHashMap<String, Object>();
duplicateCount = new ConcurrentHashMap<String, Integer>();
results.put(dataType, dataTypeMap);
duplicates.put(dataType, duplicateCount);
}
}
}
duplicateCount.putIfAbsent(uniqueId, 0);
duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);//keep track of duplicate rows
dataTypeMap.put(uniqueId, result);
}
public Map<String, Object> getResultForDataType(DataType dataType) {
return results.get(dataType);
}
}
我意识到 statemet duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);
不是隐式线程安全的。为了使其线程安全,我需要使用同步来减慢我的插入速度。如何在不影响应用程序性能的情况下跟踪重复插入。如果跟踪重复插入并不容易,我可以在尝试覆盖映射中现有条目的第一个迹象时抛出异常。
注意 我知道 Map
不允许重复键。我想要的是一种跟踪任何此类尝试并停止应用程序而不是覆盖 Map
.
尝试这样的事情:
ConcurrentHashMap<String, AtomicInteger> duplicateCount = new ConcurrentHashMap<String, AtomicInteger>();
然后,当您准备好增加计数时,请执行以下操作:
final AtomicInteger oldCount = duplicateCount.putIfAbsent(uniqueId, new AtomicInteger(1));
if (oldCount != null) {
oldCount.incrementAndGet();
}
因此,如果您在地图中还没有计数,您将输入 1,如果有,您将获取当前值并自动递增它。这应该是线程安全的。
如果您想跟踪插入的数量,您可以将外部映射类型更改为类似 Map<String, Pair<Integer, Object>>
的类型(或者,如果您不使用 Apache Commons,只需 Map<DataType, Map.Entry<Integer, InnerType>>
,其中 Integer
值是更新次数:
DataType key = ...;
Map<Integer, Object> value = ...;
dataTypeMap.compute(key, (k, current) -> {
if (current == null) {
/* Initial count is 0 */
return Pair.of(0, value);
} else {
/* Increment count */
return Pair.of(current.getFirst(), value);
}));
如果您只关心确保没有重复插入,您可以简单地使用 computeIfAbsent
:
DataType key = ...;
Map<Integer, Object> value = ...;
if (dataTypeMap.computeIfAbsent(key, k -> value)) != null) {
/* There was already a value */
throw new IllegalStateException(...);
});