以下代码是否线程安全
Is following code Thread safe
我有一个场景,我必须维护一个可以由多个线程填充的映射,每个线程修改相应的列表(唯一的 identifier/key 是线程名称)并且当线程的列表大小超过固定批量大小,我们必须将记录保存在数据库中。
示例代码如下:
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.readLock().lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.readLock().unlock();
}
}
每 2 分钟后还有一个单独的线程 运行 来保存 Map 中的所有记录(以确保我们在每 2 分钟后保存一些内容并且地图大小不会变得太大)以及何时它启动它阻塞所有其他线程(检查 readLock 和 writeLock uswhere writeLock 具有更高的优先级)
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().writeLock().lock();
List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().writeLock().unlock();
}
这个解决方案几乎适用于我们测试的每个场景,除了有时我们看到一些记录丢失,即根本没有持久化,尽管它们在 Map
中添加得很好
我的问题是这段代码有什么问题?
ConcurrentHashMap 不是最好的解决方案吗?
read/write 锁的使用在这里有问题吗?
我应该进行顺序处理吗?
不,它不是线程安全的。
问题是你使用的是ReadWriteLock的read锁。这不保证进行更新的独占访问权限。为此,您需要使用 write 锁。
但您实际上根本不需要使用单独的锁。您可以简单地使用 ConcurrentHashMap.compute
方法:
instrumentMap.compute(threadName, (tn, instrumentList) -> {
if (instrumentList == null) {
instrumentList = new ArrayList<>();
}
if(instrumentList.size() >= batchSize -1) {
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
return instrumentList;
});
这允许您更新列表中的项目,同时还保证对给定键的列表的独占访问。
我怀疑您可以将 compute
调用拆分为 computeIfAbsent
(如果不存在则添加列表),然后是 computeIfPresent
(到 update/persist the list): 这两个操作的原子性在这里不是必需的。但是将它们分开没有任何实际意义。
此外,instrumentMap
几乎可以肯定不应该是易变的。除非你真的想重新分配它的值(鉴于这段代码,我对此表示怀疑),删除 volatile 并使其成为最终的。
同样,非最终锁也有问题。如果您坚持使用锁,那么也将其设置为 final。
我有一个场景,我必须维护一个可以由多个线程填充的映射,每个线程修改相应的列表(唯一的 identifier/key 是线程名称)并且当线程的列表大小超过固定批量大小,我们必须将记录保存在数据库中。
示例代码如下:
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.readLock().lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.readLock().unlock();
}
}
每 2 分钟后还有一个单独的线程 运行 来保存 Map 中的所有记录(以确保我们在每 2 分钟后保存一些内容并且地图大小不会变得太大)以及何时它启动它阻塞所有其他线程(检查 readLock 和 writeLock uswhere writeLock 具有更高的优先级)
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().writeLock().lock();
List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().writeLock().unlock();
}
这个解决方案几乎适用于我们测试的每个场景,除了有时我们看到一些记录丢失,即根本没有持久化,尽管它们在 Map
中添加得很好我的问题是这段代码有什么问题? ConcurrentHashMap 不是最好的解决方案吗? read/write 锁的使用在这里有问题吗? 我应该进行顺序处理吗?
不,它不是线程安全的。
问题是你使用的是ReadWriteLock的read锁。这不保证进行更新的独占访问权限。为此,您需要使用 write 锁。
但您实际上根本不需要使用单独的锁。您可以简单地使用 ConcurrentHashMap.compute
方法:
instrumentMap.compute(threadName, (tn, instrumentList) -> {
if (instrumentList == null) {
instrumentList = new ArrayList<>();
}
if(instrumentList.size() >= batchSize -1) {
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
return instrumentList;
});
这允许您更新列表中的项目,同时还保证对给定键的列表的独占访问。
我怀疑您可以将 compute
调用拆分为 computeIfAbsent
(如果不存在则添加列表),然后是 computeIfPresent
(到 update/persist the list): 这两个操作的原子性在这里不是必需的。但是将它们分开没有任何实际意义。
此外,instrumentMap
几乎可以肯定不应该是易变的。除非你真的想重新分配它的值(鉴于这段代码,我对此表示怀疑),删除 volatile 并使其成为最终的。
同样,非最终锁也有问题。如果您坚持使用锁,那么也将其设置为 final。