如何在 put() 期间阻止 ConcurrentHashMap get() 操作
How can I block ConcurrentHashMap get() operations during a put()
ConcurrentHashMap<String, Config> configStore = new ConcurrentHashMap<>();
...
void updateStore() {
Config newConfig = generateNewConfig();
Config oldConfig = configStore.get(configName);
if (newConfig.replaces(oldConfig)) {
configStore.put(configName, newConfig);
}
}
ConcurrentHashMap
可以被多线程读取,但只能被单线程更新。当 put()
操作正在进行时,我想阻止 get()
操作。这里的基本原理是,如果 put()
操作正在进行,这意味着映射中的当前条目是陈旧的,所有 get()
操作都应该阻塞,直到 put()
完成。我怎样才能在不同步整个地图的情况下在 Java 中实现这一点?
看来您确实可以将其推迟到 compute
,它会为您处理:
Config newConfig = generateNewConfig();
configStore.compute(
newConfig,
(oldConfig, value) -> {
if (newConfig.replaces(oldConfig)) {
return key;
}
return oldConfig;
}
);
使用此方法可获得两个保证:
Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple
和
The entire method invocation is performed atomically
根据其文档。
这根本行不通。想一想:当代码意识到信息过时时,一段时间过去了,然后完成了 .put
调用。即使 .put
调用以某种方式阻塞,时间线如下:
- 宇宙中发生了一些事件,使您的配置陈旧。
- 一段时间过去了。 [A]
- 您的 运行 一些实现这种情况的代码。
- 一段时间过去了。 [B]
- 您的代码开始
.put
调用。
- 时间流逝的时间极短。 [C]
- 您的代码完成了
.put
调用。
您要的是一种消除 [C]
的策略,同时完全不采取任何措施来防止读取点 [A]
和 [B]
处的陈旧数据,这两个点似乎相当多问题更多。
随便,给我答案
ConcurrentHashMap 如果你想要这个就错了,它是为多个 concurrent(因此得名)访问而设计的。您想要的是一个普通的旧 HashMap
,其中 每个 对它的访问都通过一个锁。或者,您可以扭转逻辑:做您想做的事情的唯一方法是对所有内容(读和写)都加锁;此时 ConcurrentHashMap
的 'Concurrent' 部分变得毫无意义:
private final Object lock = new Object[0];
public void updateConfig() {
synchronized (lock) {
// do the stuff
}
}
public Config getConfig(String key) {
synchronized (lock) {
return configStore.get(key);
}
}
注意:使用私人锁; public 锁就像 public 字段。如果你控制之外的代码可以引用一个对象,并且你锁定了它,你需要描述你的代码关于该锁的行为,然后注册以永远维护该行为,或者指示很明显,当您更改行为时,您的 API 刚刚经历了重大更改,因此您也应该修改主版本号。
出于同样的原因,鉴于您想要 API 控制,public 字段几乎总是一个坏主意,您希望您锁定的 refs 不能被任何人访问,除了您直接控制的代码。因此,为什么上面的代码不在方法本身上使用 synchronized
关键字(因为 this
通常是一个到处泄漏的引用)。
好吧,也许我想要不同的答案
答案是 'it does not matter' 或 'use locks'。如果[C]
真的是你所关心的,那时间太短了,与[A]
和[B]
的时间相比就相形见绌了,如果A/B是可以接受的,当然C也是,既然如此,那就顺其自然吧。
或者,您可以使用锁,但甚至在数据变得陈旧之前就锁定。此时间线保证不会发生过时的数据读取:
- 宇宙永远不会让您的数据过时。
- 您的代码本身是过时日期的唯一原因。
- 每当代码 运行 将或可能最终使数据陈旧时:
- 甚至在开始之前就获取锁。
- 做一些(可能)使某些配置陈旧的事情。
- 继续抓紧锁;修复配置。
- 解除锁定。
建议用compute(...)
代替put()
.
但如果你想
to block the get() operations when a put() operation is in progress
那么你也应该使用 compute(...)
而不是 get()
。
那是因为 ConcurrentHashMap
get()
在 compute()
进行时不会阻塞。
这里有一个单元测试来证明它:
@Test
public void myTest() throws Exception {
var map = new ConcurrentHashMap<>(Map.of("key", "v1"));
var insideComputeLatch = new CountDownLatch(1);
var threadGet = new Thread(() -> {
try {
insideComputeLatch.await();
System.out.println("threadGet: before get()");
var v = map.get("key");
System.out.println("threadGet: after get() (v='" + v + "')");
} catch (InterruptedException e) {
throw new Error(e);
}
});
var threadCompute = new Thread(() -> {
System.out.println("threadCompute: before compute()");
map.compute("key", (k, v) -> {
try {
System.out.println("threadCompute: inside compute(): start");
insideComputeLatch.countDown();
threadGet.join();
System.out.println("threadCompute: inside compute(): end");
return "v2";
} catch (InterruptedException e) {
throw new Error(e);
}
});
System.out.println("threadCompute: after compute()");
});
threadGet.start();
threadCompute.start();
threadGet.join();
threadCompute.join();
}
输出:
threadCompute: before compute()
threadCompute: inside compute(): start
threadGet: before get()
threadGet: after get() (v='v1')
threadCompute: inside compute(): end
threadCompute: after compute()
How can I go about achieving this in Java without synchronizing the whole map?
这里有一些很好的答案,但使用 ConcurrentMap.replace(key, oldValue, newValue)
method which is atomic.
有一个更简单的答案
while (true) {
Config newConfig = generateNewConfig();
Config oldConfig = configStore.get(configName);
if (!newConfig.replaces(oldConfig)) {
// nothing to do
break;
}
// this is atomic and will only replace the config if the old hasn't changed
if (configStore.replace(configName, oldConfig, newConfig)) {
// if we replaced it then we are done
break;
}
// otherwise, loop around and create a new config
}
ConcurrentHashMap<String, Config> configStore = new ConcurrentHashMap<>();
...
void updateStore() {
Config newConfig = generateNewConfig();
Config oldConfig = configStore.get(configName);
if (newConfig.replaces(oldConfig)) {
configStore.put(configName, newConfig);
}
}
ConcurrentHashMap
可以被多线程读取,但只能被单线程更新。当 put()
操作正在进行时,我想阻止 get()
操作。这里的基本原理是,如果 put()
操作正在进行,这意味着映射中的当前条目是陈旧的,所有 get()
操作都应该阻塞,直到 put()
完成。我怎样才能在不同步整个地图的情况下在 Java 中实现这一点?
看来您确实可以将其推迟到 compute
,它会为您处理:
Config newConfig = generateNewConfig();
configStore.compute(
newConfig,
(oldConfig, value) -> {
if (newConfig.replaces(oldConfig)) {
return key;
}
return oldConfig;
}
);
使用此方法可获得两个保证:
Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple
和
The entire method invocation is performed atomically
根据其文档。
这根本行不通。想一想:当代码意识到信息过时时,一段时间过去了,然后完成了 .put
调用。即使 .put
调用以某种方式阻塞,时间线如下:
- 宇宙中发生了一些事件,使您的配置陈旧。
- 一段时间过去了。 [A]
- 您的 运行 一些实现这种情况的代码。
- 一段时间过去了。 [B]
- 您的代码开始
.put
调用。 - 时间流逝的时间极短。 [C]
- 您的代码完成了
.put
调用。
您要的是一种消除 [C]
的策略,同时完全不采取任何措施来防止读取点 [A]
和 [B]
处的陈旧数据,这两个点似乎相当多问题更多。
随便,给我答案
ConcurrentHashMap 如果你想要这个就错了,它是为多个 concurrent(因此得名)访问而设计的。您想要的是一个普通的旧 HashMap
,其中 每个 对它的访问都通过一个锁。或者,您可以扭转逻辑:做您想做的事情的唯一方法是对所有内容(读和写)都加锁;此时 ConcurrentHashMap
的 'Concurrent' 部分变得毫无意义:
private final Object lock = new Object[0];
public void updateConfig() {
synchronized (lock) {
// do the stuff
}
}
public Config getConfig(String key) {
synchronized (lock) {
return configStore.get(key);
}
}
注意:使用私人锁; public 锁就像 public 字段。如果你控制之外的代码可以引用一个对象,并且你锁定了它,你需要描述你的代码关于该锁的行为,然后注册以永远维护该行为,或者指示很明显,当您更改行为时,您的 API 刚刚经历了重大更改,因此您也应该修改主版本号。
出于同样的原因,鉴于您想要 API 控制,public 字段几乎总是一个坏主意,您希望您锁定的 refs 不能被任何人访问,除了您直接控制的代码。因此,为什么上面的代码不在方法本身上使用 synchronized
关键字(因为 this
通常是一个到处泄漏的引用)。
好吧,也许我想要不同的答案
答案是 'it does not matter' 或 'use locks'。如果[C]
真的是你所关心的,那时间太短了,与[A]
和[B]
的时间相比就相形见绌了,如果A/B是可以接受的,当然C也是,既然如此,那就顺其自然吧。
或者,您可以使用锁,但甚至在数据变得陈旧之前就锁定。此时间线保证不会发生过时的数据读取:
- 宇宙永远不会让您的数据过时。
- 您的代码本身是过时日期的唯一原因。
- 每当代码 运行 将或可能最终使数据陈旧时:
- 甚至在开始之前就获取锁。
- 做一些(可能)使某些配置陈旧的事情。
- 继续抓紧锁;修复配置。
- 解除锁定。
compute(...)
代替put()
.
但如果你想
to block the get() operations when a put() operation is in progress
那么你也应该使用 compute(...)
而不是 get()
。
那是因为 ConcurrentHashMap
get()
在 compute()
进行时不会阻塞。
这里有一个单元测试来证明它:
@Test
public void myTest() throws Exception {
var map = new ConcurrentHashMap<>(Map.of("key", "v1"));
var insideComputeLatch = new CountDownLatch(1);
var threadGet = new Thread(() -> {
try {
insideComputeLatch.await();
System.out.println("threadGet: before get()");
var v = map.get("key");
System.out.println("threadGet: after get() (v='" + v + "')");
} catch (InterruptedException e) {
throw new Error(e);
}
});
var threadCompute = new Thread(() -> {
System.out.println("threadCompute: before compute()");
map.compute("key", (k, v) -> {
try {
System.out.println("threadCompute: inside compute(): start");
insideComputeLatch.countDown();
threadGet.join();
System.out.println("threadCompute: inside compute(): end");
return "v2";
} catch (InterruptedException e) {
throw new Error(e);
}
});
System.out.println("threadCompute: after compute()");
});
threadGet.start();
threadCompute.start();
threadGet.join();
threadCompute.join();
}
输出:
threadCompute: before compute()
threadCompute: inside compute(): start
threadGet: before get()
threadGet: after get() (v='v1')
threadCompute: inside compute(): end
threadCompute: after compute()
How can I go about achieving this in Java without synchronizing the whole map?
这里有一些很好的答案,但使用 ConcurrentMap.replace(key, oldValue, newValue)
method which is atomic.
while (true) {
Config newConfig = generateNewConfig();
Config oldConfig = configStore.get(configName);
if (!newConfig.replaces(oldConfig)) {
// nothing to do
break;
}
// this is atomic and will only replace the config if the old hasn't changed
if (configStore.replace(configName, oldConfig, newConfig)) {
// if we replaced it then we are done
break;
}
// otherwise, loop around and create a new config
}