使用 volatile 更新和交换 HashMap

Updating and swapping HashMaps with volatile

背景

我有一个大型数据映射 (HashMap),保存在内存中,由后台线程增量更新(基于传入的消息):

<KEY> => <VALUE>
...

最终用户将通过 REST API:

查询它
GET /lookup?key=<KEY>

更新不会立即应用,而是在收到特殊控制消息后分批应用,即

MESSAGE: "Add A" 

A=<VALUE>   //Not visible yet

MESSAGE: "Add B"

B=<VALUE>   //Not visible yet

MESSAGE: "Commit"

//Updates are now visible to the end-users
A=<VALUE>
B=<VALUE

我设计的架构如下:

volatile Map passiveCopy = new HashMap();
volatile Map activeCopy = new HashMap();

Map<String,Object> pendingUpdates; 

//Interactive requests (REST API)
Object lookup(String key) {
     activeCopy.get(key);
}

//Background thread processing the incoming messages.
//Messages are processed strictly sequentially
//i.e. no other message will be processed, until
//current handleMessage() invocation is completed
//(that is guaranteed by the message processing framework itself)
void handleMessage(Message msg) {

   //New updates go to the pending updates temporary map
   if(msg.type() == ADD) {
      pendingUpdates.put(msg.getKey(),msg.getValue()); 
   }


   if(msg.type() == COMMIT) {     
      //Apply updates to the passive copy of the map
      passiveCopy.addAll(pendingUpdates);

      //Swap active and passive map copies
      Map old = activeCopy; 
      activeCopy = passiveCopy;
      passiveCopy = old;

      //Grace period, wait for on-the-air requests to complete
      //REST API has a hard timeout of 100ms, so no client
      //will wait for the response longer than that 
      Thread.sleep(1000);

      //Re-apply updates to the now-passive (ex-active) copy of the map
      passiveCopy.addAll(pendingUpdates);

      //Reset the pendingUpdates map
      pendingUpdates.clear();
   }

}

问题

对 volatile 字段进行写入-> 读取会产生先行边沿:

A write to a volatile field (§8.3.1.4) happens-before every subsequent read of that field.

https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5

并且宽限期选择正确,我希望应用到 passiveCopy 的任何更新(通过 putAll())将变得可见 交换后的最终用户请求(一次全部)。

这确实是一个案例,或者有任何极端案例会使这种方法失败?

注意

我知道创建 Map 的副本(这样每次都会将一个新的 Map 实例分配给 activeCopy),这样做是安全的,但我不想这样做(因为它确实大)。

如果您需要以原子方式添加新条目,则 volatile Map 将成为一个问题,这样用户将永远不会看到不是所有条目都已添加但仅添加其中一些的状态。

问题是在 java volatile for references 中仅确保以下内容:

  • 保证引用始终是最新的并且所有更改都可以从任何线程中看到
  • 不保证所引用对象的内容始终是最新的

(在 this book 中找到)

我还检查了 class HashMap 的实现(假设您使用的是 HashMap),您可以在其中看到方法 putAll(Map) 只是调用方法 putMapEntries(Map, boolean) 这是像这样实现:

/**
 * Implements Map.putAll and Map constructor
 *
 * @param m the map
 * @param evict false when initially constructing this map, else
 * true (relayed to method afterNodeInsertion).
 */
final void putMapEntries(Map<? extends K, ? extends V> m, boolean evict) {
    int s = m.size();
    if (s > 0) {
        if (table == null) { // pre-size
            float ft = ((float)s / loadFactor) + 1.0F;
            int t = ((ft < (float)MAXIMUM_CAPACITY) ?
                     (int)ft : MAXIMUM_CAPACITY);
            if (t > threshold)
                threshold = tableSizeFor(t);
        }
        else if (s > threshold)
            resize();
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
            K key = e.getKey();
            V value = e.getValue();
            putVal(hash(key), key, value, false, evict);
        }
    }
}

因此您看到该方法只是在 for 循环中调用方法 putVal(int, K, V, boolean, boolean) (这不是原子更新)。这意味着使用 putAll(Map) 添加所有条目与使用 for 循环使用 put(K, V) 一个接一个地添加条目之间没有真正的区别。

结论: 如果您需要确保不存在用户可以读取仅添加了一些新元素而未添加一些新元素的地图的可能状态,此处不能使用 volatile。 所以(就像你已经提到的那样)创建地图副本并交换它会更好(并保存)。虽然它使用两倍的内存,但它会更快,因为 volatile 变量通常非常慢。

除了你对 activeMapactiveCopy 的不一致使用(只需删除 activeCopy 并且只在 activeMappassiveCopy 之间交换),你的方法是懂事。

引用 JLS:

If x and y are actions of the same thread and x comes before y in program order, then hb(x,y) [x "happens before" y].

this answer中也给出了示例。

据我所知,访问 volatile variable/field 基本上是序列点;你的情况,因为swap是在程序代码修改map后after,所以应该保证map的修改在before[=43]完成=] 真正执行了对volatile字段的访问。所以这里没有竞争条件。

但是,在大多数情况下,您应该使用synchronized 或显式锁来同步并发执行。围绕使用这些进行编码的唯一原因是如果您需要高性能,即大规模并行性,线程阻塞锁是不可接受的,或者所需的并行性太高以至于线程开始饿死。

就是说,我认为您真的应该 'invest' 进行适当的互斥,最好使用 ReadWriteLock。因为 synchronizedReadWriteLock 内部使用)意味着内存屏障,所以您不再需要 volatile

例如:

final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
final Lock readLock = rwLock.getReadLock();
final Lock writeLock = rwLock.getWriteLock();

Map passiveCopy = new HashMap();
Map activeMap = new HashMap();

final Map<String,Object> pendingUpdates = new HashMap(); 

//Interactive requests (REST API)
Object lookup(String key) {

  readLock.lock();
  try {
     return activeMap.get(key);
  } finally {
    readLock.unlock();
  }
}

//Background thread processing the incoming messages.
//Messages are processed strictly sequentially
//i.e. no other message will be processed, until
//current handleMessage() invocation is completed
//(that is guaranteed by the message processing framework itself)
void handleMessage(Message msg) {

   //New updates go to the pending updates temporary map
   if(msg.type() == ADD) {
      pendingUpdates.put(msg.getKey(),msg.getValue()); 
   }


   if(msg.type() == COMMIT) {     
      //Apply updates to the passive copy of the map
      passiveCopy.addAll(pendingUpdates);

      final Map tempMap = passiveCopy;    

      writeLock.lock();

      try {
        passiveCopy = activeMap;
        activeMap = tempMap;
      } finally {
        writeLock.unlock();
      }

      // Update the now-passive copy to the same state as the active map:
      passiveCopy.addAll(pendingUpdates);
      pendingUpdates.clear();
   }

}

然而,从你的代码中,我读到 'readers' 在 'lifetime' 期间应该看到地图的一致版本,上面的代码不能保证这一点,即如果单个 'reader' 访问地图两次他可能会看到两张不同的地图。这可以通过让每个 reader 在第一次访问地图之前获取读锁本身,在最后一次访问地图后释放它来解决。这在您的情况下可能有效,也可能无效,因为如果 reader 长时间持有锁,或者有许多 reader 线程,它可能 block/starve 编写线程试图提交更新。