Java concurrentHashMap 迭代
Java concurrentHashMap iteration
我正在使用一个线程(我们称它为 "MapChecker"),它在整个生命周期内都在 ConcurrentHashMap 上循环。
该地图由其他线程填充,并由 MapChecker 使用迭代器对其进行清除。
地图结构如下:
private volatile Map<MyObject, SynchronizedList<MyOtherObject>> map = new ConcurrentHashMap<>();
//SynchronizedList = Collections.syncrhonizedList.
MapChecker 必须更新其循环内每个键的值。通过从列表中删除元素或删除完整的地图条目来进行更新。
同步发生在两个步骤中:
- 在地图中添加数据时(同步)
- 检索地图的迭代器时(在 MapChecker 中同步)。
锁在地图本身 ( synchronized(map) ) 上。
我不关心在我的迭代器视图中总是有最后更新的值,但我需要确保在下一次迭代中检索所有缺失的值,这很重要,因为我不想跳过任何元素。此外,正确更新 SynchronizedList 也很重要。
我的问题是:
我可以确保通过这种架构插入/更新所有条目吗?有错过某些东西的风险吗?
如果 MapChecker 删除了一个条目,而另一个线程正在更新同一个条目,会发生什么情况? ConcurrentHashMap 应该阻止这些操作,所以我预计不会有麻烦。
这是 MapChecker 循环:
while (!isInterrupted()) {
executeClearingPhases();
Iterator<Map.Entry<PoolManager, List<PooledObject>>> it = null;
synchronized (idleInstancesMap) {
it = idleInstancesMap.entrySet().iterator();
}
while (it.hasNext()) {
Map.Entry<PoolManager, List<PooledObject>> entry = it.next();
PoolManager poolManager = entry.getKey();
boolean stop = false;
while (!stop) {
//this list is empty very often but it shouldn't, that's the problem I am facing. I need to assure updates visibility.
List<PooledObject> idlePooledObjects = entry.getValue();
if (idlePooledObjects.isEmpty()) {
stop = true;
} else {
PooledObject pooledObject = null;
try {
pooledObject = idlePooledObjects.get(0);
info(loggingId, " - REMOOOVINNGG: \"", pooledObject.getClientId(), "\".");
PoolingStatus destroyStatus = poolManager.destroyIfExpired(pooledObject);
switch (destroyStatus) {
case DESTROY:
info(loggingId, " - Removed pooled object \"", pooledObject.getClientId(), "\" from pool: \"", poolManager.getClientId(), "\".");
idlePooledObjects.remove(0);
break;
case IDLE:
stop = true;
break;
default:
idlePooledObjects.remove(0);
break;
}
} catch (@SuppressWarnings("unused") PoolDestroyedException e) {
warn(loggingId, " - WARNING: Pooled object \"", pooledObject.getClientId(), "\" skipped, pool: \"", poolManager.getClientId(), "\" has been destroyed.");
synchronized(idleInstancesMap) {
it.remove();
}
stop = true;
} catch (PoolManagementException e) {
error(e, loggingId, " - ERROR: Errors occurred during the operation.");
idlePooledObjects.remove(0);
}
}
}
}
Thread.yield();
}
这是被任何其他线程调用(多次)的方法:
public void addPooledObject(PoolManager poolManager, PooledObject pooledObject) {
synchronized (idleInstancesMap) {
List<PooledObject> idleInstances = idleInstancesMap.get(poolManager);
if (idleInstances == null) {
idleInstances = Collections.synchronizedList(new LinkedList<PooledObject>());
idleInstancesMap.put(poolManager, idleInstances);
}
idleInstances.add(pooledObject);
}
}
谢谢
but I need to be sure that all the missing values will be retrieved in the next iterations, this is important because I do not want to skip any element.
首先,我认为根据您的解决方案,我认为只要您继续执行 MapChecker loop
,您将获得地图中的所有项目。我建议你在你提供的 MapChecker
代码之外有一个额外的 while(true) 循环。
但是根据您的所有描述,我建议您应该使用 Queue 而不是 Map,显然,您的问题需要 push/pop 操作,也许 BlockingQueue
更适合这里。
感谢 PatrickChen 的建议,我将 PooledObject 实例的列表移到了每个 PoolManager 中(它已经拥有这个列表,因为它以完全同步的方式拥有池及其内部状态)。
这是结果:
//MapChecker lifecycle
public void run() {
try {
while (!isInterrupted()) {
executeClearingPhases();
ListIterator<PoolManager> it = null;
//This really helps. poolManagers is the list of PoolManager instances.
//It's unlikely that this list will have many elements (maybe not more than 20)
synchronized (poolManagers) {
Iterator<PoolManager> originalIt = poolManagers.iterator();
while (originalIt.hasNext()) {
if (originalIt.next().isDestroyed()) {
originalIt.remove();
}
}
//This iterator will contain the current view of the list.
//It will update on the next iteration.
it = new LinkedList<PoolManager>(poolManagers).listIterator();
}
while (it.hasNext()) {
PoolManager poolManager = it.next();
try {
//This method will lock on its internal synchronized pool in order to
//scan for expired objects.
poolManager.destroyExpired();
} catch (@SuppressWarnings("unused") PoolDestroyedException e) {
warn(loggingId, " - WARNING: Pool: \"", poolManager.getClientId(), "\" has been destroyed.");
it.remove();
}
}
Thread.yield();
}
throw new InterruptedException();
} catch (@SuppressWarnings("unused") InterruptedException e) {
started = false;
Thread.currentThread().interrupt();
debug(loggingId, " - Pool checker interrupted.");
}
}
//Method invoked by multiple threads
public void addPooledObject(PoolManager poolManager) {
synchronized (poolManagers) {
poolManagers.add(poolManager);
}
}
我正在使用一个线程(我们称它为 "MapChecker"),它在整个生命周期内都在 ConcurrentHashMap 上循环。
该地图由其他线程填充,并由 MapChecker 使用迭代器对其进行清除。
地图结构如下:
private volatile Map<MyObject, SynchronizedList<MyOtherObject>> map = new ConcurrentHashMap<>();
//SynchronizedList = Collections.syncrhonizedList.
MapChecker 必须更新其循环内每个键的值。通过从列表中删除元素或删除完整的地图条目来进行更新。
同步发生在两个步骤中:
- 在地图中添加数据时(同步)
- 检索地图的迭代器时(在 MapChecker 中同步)。
锁在地图本身 ( synchronized(map) ) 上。
我不关心在我的迭代器视图中总是有最后更新的值,但我需要确保在下一次迭代中检索所有缺失的值,这很重要,因为我不想跳过任何元素。此外,正确更新 SynchronizedList 也很重要。
我的问题是: 我可以确保通过这种架构插入/更新所有条目吗?有错过某些东西的风险吗? 如果 MapChecker 删除了一个条目,而另一个线程正在更新同一个条目,会发生什么情况? ConcurrentHashMap 应该阻止这些操作,所以我预计不会有麻烦。
这是 MapChecker 循环:
while (!isInterrupted()) {
executeClearingPhases();
Iterator<Map.Entry<PoolManager, List<PooledObject>>> it = null;
synchronized (idleInstancesMap) {
it = idleInstancesMap.entrySet().iterator();
}
while (it.hasNext()) {
Map.Entry<PoolManager, List<PooledObject>> entry = it.next();
PoolManager poolManager = entry.getKey();
boolean stop = false;
while (!stop) {
//this list is empty very often but it shouldn't, that's the problem I am facing. I need to assure updates visibility.
List<PooledObject> idlePooledObjects = entry.getValue();
if (idlePooledObjects.isEmpty()) {
stop = true;
} else {
PooledObject pooledObject = null;
try {
pooledObject = idlePooledObjects.get(0);
info(loggingId, " - REMOOOVINNGG: \"", pooledObject.getClientId(), "\".");
PoolingStatus destroyStatus = poolManager.destroyIfExpired(pooledObject);
switch (destroyStatus) {
case DESTROY:
info(loggingId, " - Removed pooled object \"", pooledObject.getClientId(), "\" from pool: \"", poolManager.getClientId(), "\".");
idlePooledObjects.remove(0);
break;
case IDLE:
stop = true;
break;
default:
idlePooledObjects.remove(0);
break;
}
} catch (@SuppressWarnings("unused") PoolDestroyedException e) {
warn(loggingId, " - WARNING: Pooled object \"", pooledObject.getClientId(), "\" skipped, pool: \"", poolManager.getClientId(), "\" has been destroyed.");
synchronized(idleInstancesMap) {
it.remove();
}
stop = true;
} catch (PoolManagementException e) {
error(e, loggingId, " - ERROR: Errors occurred during the operation.");
idlePooledObjects.remove(0);
}
}
}
}
Thread.yield();
}
这是被任何其他线程调用(多次)的方法:
public void addPooledObject(PoolManager poolManager, PooledObject pooledObject) {
synchronized (idleInstancesMap) {
List<PooledObject> idleInstances = idleInstancesMap.get(poolManager);
if (idleInstances == null) {
idleInstances = Collections.synchronizedList(new LinkedList<PooledObject>());
idleInstancesMap.put(poolManager, idleInstances);
}
idleInstances.add(pooledObject);
}
}
谢谢
but I need to be sure that all the missing values will be retrieved in the next iterations, this is important because I do not want to skip any element.
首先,我认为根据您的解决方案,我认为只要您继续执行 MapChecker loop
,您将获得地图中的所有项目。我建议你在你提供的 MapChecker
代码之外有一个额外的 while(true) 循环。
但是根据您的所有描述,我建议您应该使用 Queue 而不是 Map,显然,您的问题需要 push/pop 操作,也许 BlockingQueue
更适合这里。
感谢 PatrickChen 的建议,我将 PooledObject 实例的列表移到了每个 PoolManager 中(它已经拥有这个列表,因为它以完全同步的方式拥有池及其内部状态)。
这是结果:
//MapChecker lifecycle
public void run() {
try {
while (!isInterrupted()) {
executeClearingPhases();
ListIterator<PoolManager> it = null;
//This really helps. poolManagers is the list of PoolManager instances.
//It's unlikely that this list will have many elements (maybe not more than 20)
synchronized (poolManagers) {
Iterator<PoolManager> originalIt = poolManagers.iterator();
while (originalIt.hasNext()) {
if (originalIt.next().isDestroyed()) {
originalIt.remove();
}
}
//This iterator will contain the current view of the list.
//It will update on the next iteration.
it = new LinkedList<PoolManager>(poolManagers).listIterator();
}
while (it.hasNext()) {
PoolManager poolManager = it.next();
try {
//This method will lock on its internal synchronized pool in order to
//scan for expired objects.
poolManager.destroyExpired();
} catch (@SuppressWarnings("unused") PoolDestroyedException e) {
warn(loggingId, " - WARNING: Pool: \"", poolManager.getClientId(), "\" has been destroyed.");
it.remove();
}
}
Thread.yield();
}
throw new InterruptedException();
} catch (@SuppressWarnings("unused") InterruptedException e) {
started = false;
Thread.currentThread().interrupt();
debug(loggingId, " - Pool checker interrupted.");
}
}
//Method invoked by multiple threads
public void addPooledObject(PoolManager poolManager) {
synchronized (poolManagers) {
poolManagers.add(poolManager);
}
}