如何在 put() 期间阻止 ConcurrentHashMap get() 操作

How can I block ConcurrentHashMap get() operations during a put()

ConcurrentHashMap<String, Config> configStore = new ConcurrentHashMap<>();
...
void updateStore() {
  Config newConfig = generateNewConfig();
  Config oldConfig = configStore.get(configName);
  if (newConfig.replaces(oldConfig)) {
   configStore.put(configName, newConfig);
  }
}

ConcurrentHashMap可以被多线程读取,但只能被单线程更新。当 put() 操作正在进行时,我想阻止 get() 操作。这里的基本原理是,如果 put() 操作正在进行,这意味着映射中的当前条目是陈旧的,所有 get() 操作都应该阻塞,直到 put() 完成。我怎样才能在不同步整个地图的情况下在 Java 中实现这一点?

看来您确实可以将其推迟到 compute,它会为您处理:

Config newConfig = generateNewConfig();
configStore.compute(
    newConfig,
    (oldConfig, value) -> {
       if (newConfig.replaces(oldConfig)) {
            return key;
       }
       return oldConfig;
    }
);

使用此方法可获得两个保证:

Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple

The entire method invocation is performed atomically

根据其文档。

这根本行不通。想一想:当代码意识到信息过时时,一段时间过去了,然后完成了 .put 调用。即使 .put 调用以某种方式阻塞,时间线如下:

  • 宇宙中发生了一些事件,使您的配置陈旧。
  • 一段时间过去了。 [A]
  • 您的 运行 一些实现这种情况的代码。
  • 一段时间过去了。 [B]
  • 您的代码开始 .put 调用。
  • 时间流逝的时间极短。 [C]
  • 您的代码完成了 .put 调用。

您要的是一种消除 [C] 的策略,同时完全不采取任何措施来防止读取点 [A][B] 处的陈旧数据,这两个点似乎相当多问题更多。

随便,给我答案

ConcurrentHashMap 如果你想要这个就错了,它是为多个 concurrent(因此得名)访问而设计的。您想要的是一个普通的旧 HashMap,其中 每个 对它的访问都通过一个锁。或者,您可以扭转逻辑:做您想做的事情的唯一方法是对所有内容(读和写)都加锁;此时 ConcurrentHashMap 的 'Concurrent' 部分变得毫无意义:

private final Object lock = new Object[0];

public void updateConfig() {
    synchronized (lock) {
       // do the stuff
    }
}

public Config getConfig(String key) {
    synchronized (lock) {
        return configStore.get(key);
    }
}

注意:使用私人锁; public 锁就像 public 字段。如果你控制之外的代码可以引用一个对象,并且你锁定了它,你需要描述你的代码关于该锁的行为,然后注册以永远维护该行为,或者指示很明显,当您更改行为时,您的 API 刚刚经历了重大更改,因此您也应该修改主版本号。

出于同样的原因,鉴于您想要 API 控制,public 字段几乎总是一个坏主意,您希望您锁定的 refs 不能被任何人访问,除了您直接控制的代码。因此,为什么上面的代码不在方法本身上使用 synchronized 关键字(因为 this 通常是一个到处泄漏的引用)。

好吧,也许我想要不同的答案

答案是 'it does not matter' 或 'use locks'。如果[C]真的是你所关心的,那时间太短了,与[A][B]的时间相比就相形见绌了,如果A/B是可以接受的,当然C也是,既然如此,那就顺其自然吧。

或者,您可以使用锁,但甚至在数据变得陈旧之前就锁定。此时间线保证不会发生过时的数据读取:

  • 宇宙永远不会让您的数据过时。
  • 您的代码本身是过时日期的唯一原因。
  • 每当代码 运行 将或可能最终使数据陈旧时:
  • 甚至在开始之前就获取锁。
  • 做一些(可能)使某些配置陈旧的事情。
  • 继续抓紧锁;修复配置。
  • 解除锁定。

建议用compute(...)代替put().

但如果你想

to block the get() operations when a put() operation is in progress

那么你也应该使用 compute(...) 而不是 get()

那是因为 ConcurrentHashMap get()compute() 进行时不会阻塞。


这里有一个单元测试来证明它:

  @Test
  public void myTest() throws Exception {
    var map = new ConcurrentHashMap<>(Map.of("key", "v1"));
    var insideComputeLatch = new CountDownLatch(1);

    var threadGet = new Thread(() -> {
      try {
        insideComputeLatch.await();
        System.out.println("threadGet: before get()");
        var v = map.get("key");
        System.out.println("threadGet: after get() (v='" + v + "')");
      } catch (InterruptedException e) {
        throw new Error(e);
      }
    });

    var threadCompute = new Thread(() -> {
      System.out.println("threadCompute: before compute()");
      map.compute("key", (k, v) -> {
        try {
          System.out.println("threadCompute: inside compute(): start");
          insideComputeLatch.countDown();
          threadGet.join();
          System.out.println("threadCompute: inside compute(): end");
          return "v2";
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      });
      System.out.println("threadCompute: after compute()");
    });

    threadGet.start();
    threadCompute.start();

    threadGet.join();
    threadCompute.join();
  }

输出:

threadCompute: before compute()
threadCompute: inside compute(): start
threadGet: before get()
threadGet: after get() (v='v1')
threadCompute: inside compute(): end
threadCompute: after compute()

How can I go about achieving this in Java without synchronizing the whole map?

这里有一些很好的答案,但使用 ConcurrentMap.replace(key, oldValue, newValue) method which is atomic.

有一个更简单的答案
while (true) {
    Config newConfig = generateNewConfig();
    Config oldConfig = configStore.get(configName);
    if (!newConfig.replaces(oldConfig)) {
        // nothing to do
        break;
    }
    // this is atomic and will only replace the config if the old hasn't changed
    if (configStore.replace(configName, oldConfig, newConfig)) {
        // if we replaced it then we are done
        break;
    }
    // otherwise, loop around and create a new config
}