[ConcurrentHashMap]: 映射 update/get 操作 returns 单线程操作中来自映射的陈旧值
[ConcurrentHashMap]: Map update/get operation returns stale value from the map in single thread operation
对于某些 key
地图仍然具有陈旧的价值。一些键没有反映新的更新。此错误属于 not reproducible
类别。
代码:
class DemoCache{
private ConcurrentHashMap<String,Demo> demoByName = new ConcurrentHashMap<String,Demo>();
private ConcurrentHashMap<String,Demo> demoByID = new ConcurrentHashMap<String,Demo>();
public initializeFromDB(){
log.info(me + "Initializing/refresh instrument from database.");
DemoDbDynamo demoDbDynamo = new DemoDbDynamo();
final AtomicInteger progressCounter = new AtomicInteger();
try
{
demoDbDynamo.listAll()
.stream()
.peek(i -> progressCounter.incrementAndGet())
.forEach(this::updateCache);
}
catch (Exception e)
{
log.error(me + "Exception fetching demo cache from table. " + e );
}
log.info(me + "count: " + progressCounter.get());
}
}
public void updateCache(Demo demo){
Demo existing = demoByID.get(demo.getID());
demoByID.put( dmeo.getID(), demo );
// this is an updated existing demo
if(existing != null) {
//if name have changed
demoByName.remove(existing.getName());
}
demoByName.put ( demo.getName(), demo );
//logging new value -- demo.getName and demo.getValue
//logging old value - demo.getName and demo.getValue
}
}
地图大小:6k
整个操作是单线程的。 JMS 主题点击此 class 以从数据库初始化映射。
我们有 4 个不同的服务器,每个服务器都有这个本地缓存,它由 JMS 主题消息刷新。在 4 台服务器中,有 3 台服务器更新了所有正确的值,而 1 台服务器仍然保留少数键的陈旧值。
此问题的根本原因可能是什么?
评论结果更新:
日志:
JMS msg
ip-10-0-33-185 20:15:40.374 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} -- DemoCache.onMessage() : [msg=<response mt='5099'/>]
ip-10-0-33-185 20:15:40.375 [ThreadName=ActiveMQ Session Task-72] INFO cache.DemoCache {} -- DemoCache.initializeFromDB(): Initializing/refresh cache from database.
ip-10-0-33-185 20:15:45.897 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} --
[newObject=[NewID=06926627-e950-48f3-9c53-b679f61120ec newName=foo,newValue=2640.98]]
[OldObject=[oldName=foo,oldValue=2641.05]]
ip-10-0-33-185 20:15:45.913 [ActiveMQ Session Task-72] INFO cache.DemoCache {} -- DemoCache.initializeFromDB(): count: 5362
这里的_collector = ip-10-0-33-185
即服务器节点。
此服务器 returns 旧值 = 2641.05 而不是新值。
只有一个线程是 运行 ActiveMQ Session Task-72
。我没有看到任何其他线程。
尽管使用了 ConcurrentHashMap,代码不是线程安全的。
这是一种可能发生不一致的情况。假设缓存包含 ID = "X" 的值 demo0
。假设有 2 个线程,A 和 B。线程 A 具有具有相同 ID 的下一个版本的 ibject,比如 demo1
。线程 B 知道这个 demo1
并且同时获得了它的更新版本 demo2
.
现在两个线程都想更新缓存。
线程 A 调用 updateCache()
,值为 demo1
。在调用 Demo existing = ...
之后和调用 demoByID.put(...)
之前,该线程被挂起并执行线程 B。线程 B 使用值 demo2
调用 updateCache()
,未被中断并成功将值 demo2
放入缓存。
现在继续线程A。它执行 demoByID.put(...)
等。但它适用于 demo1
。因此它将 demo1
放入缓存,从而替换 demoByID
和 demoByName
.
中的较新版本 demo2
你能做什么?
检查是否存在和修改缓存之间的所有操作都应该作为一个单线程块来完成。例如,对方法 updateCache()
的全部内容使用 lock 或使用 declare 方法 updateCache()
synchronized.
对于某些 key
地图仍然具有陈旧的价值。一些键没有反映新的更新。此错误属于 not reproducible
类别。
代码:
class DemoCache{
private ConcurrentHashMap<String,Demo> demoByName = new ConcurrentHashMap<String,Demo>();
private ConcurrentHashMap<String,Demo> demoByID = new ConcurrentHashMap<String,Demo>();
public initializeFromDB(){
log.info(me + "Initializing/refresh instrument from database.");
DemoDbDynamo demoDbDynamo = new DemoDbDynamo();
final AtomicInteger progressCounter = new AtomicInteger();
try
{
demoDbDynamo.listAll()
.stream()
.peek(i -> progressCounter.incrementAndGet())
.forEach(this::updateCache);
}
catch (Exception e)
{
log.error(me + "Exception fetching demo cache from table. " + e );
}
log.info(me + "count: " + progressCounter.get());
}
}
public void updateCache(Demo demo){
Demo existing = demoByID.get(demo.getID());
demoByID.put( dmeo.getID(), demo );
// this is an updated existing demo
if(existing != null) {
//if name have changed
demoByName.remove(existing.getName());
}
demoByName.put ( demo.getName(), demo );
//logging new value -- demo.getName and demo.getValue
//logging old value - demo.getName and demo.getValue
}
}
地图大小:6k
整个操作是单线程的。 JMS 主题点击此 class 以从数据库初始化映射。 我们有 4 个不同的服务器,每个服务器都有这个本地缓存,它由 JMS 主题消息刷新。在 4 台服务器中,有 3 台服务器更新了所有正确的值,而 1 台服务器仍然保留少数键的陈旧值。
此问题的根本原因可能是什么?
评论结果更新:
日志:
JMS msg
ip-10-0-33-185 20:15:40.374 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} -- DemoCache.onMessage() : [msg=<response mt='5099'/>]
ip-10-0-33-185 20:15:40.375 [ThreadName=ActiveMQ Session Task-72] INFO cache.DemoCache {} -- DemoCache.initializeFromDB(): Initializing/refresh cache from database.
ip-10-0-33-185 20:15:45.897 [ThreadName=ActiveMQ Session Task-72] DEBUG cache.DemoCache {} --
[newObject=[NewID=06926627-e950-48f3-9c53-b679f61120ec newName=foo,newValue=2640.98]]
[OldObject=[oldName=foo,oldValue=2641.05]]
ip-10-0-33-185 20:15:45.913 [ActiveMQ Session Task-72] INFO cache.DemoCache {} -- DemoCache.initializeFromDB(): count: 5362
这里的_collector = ip-10-0-33-185
即服务器节点。
此服务器 returns 旧值 = 2641.05 而不是新值。
只有一个线程是 运行 ActiveMQ Session Task-72
。我没有看到任何其他线程。
尽管使用了 ConcurrentHashMap,代码不是线程安全的。
这是一种可能发生不一致的情况。假设缓存包含 ID = "X" 的值 demo0
。假设有 2 个线程,A 和 B。线程 A 具有具有相同 ID 的下一个版本的 ibject,比如 demo1
。线程 B 知道这个 demo1
并且同时获得了它的更新版本 demo2
.
现在两个线程都想更新缓存。
线程 A 调用 updateCache()
,值为 demo1
。在调用 Demo existing = ...
之后和调用 demoByID.put(...)
之前,该线程被挂起并执行线程 B。线程 B 使用值 demo2
调用 updateCache()
,未被中断并成功将值 demo2
放入缓存。
现在继续线程A。它执行 demoByID.put(...)
等。但它适用于 demo1
。因此它将 demo1
放入缓存,从而替换 demoByID
和 demoByName
.
demo2
你能做什么?
检查是否存在和修改缓存之间的所有操作都应该作为一个单线程块来完成。例如,对方法 updateCache()
的全部内容使用 lock 或使用 declare 方法 updateCache()
synchronized.