如何同步多个线程访问一些公共数据
How to synchronize multiple threads from accessing some common data
我有三个不同的线程,它们为 read/manipulate 一些所有线程共有的数据创建了三个不同的对象。现在,我需要确保我们一次只允许访问一个线程。
这个例子是这样的。
public interface CommonData {
public void addData(); // adds data to the cache
public void getDataAccessKey(); // Key that will be common across different threads for each data type
}
/*
* Singleton class
*/
public class CommonDataCache() {
private final Map dataMap = new HashMap(); // this takes keys and values as custom objects
}
接口的实现 class 看起来像这样
class CommonDataImpl implements CommonData {
private String key;
public CommonDataImpl1(String key) {
this.key = key;
}
public void addData() {
// access the singleton cache class and add
}
public void getDataAccessKey() {
return key;
}
}
每个线程将被调用如下:
CommonData data = new CommonDataImpl("Key1");
new Thread(() -> data.addData()).start();
CommonData data1 = new CommonDataImpl("Key1");
new Thread(() -> data1.addData()).start();
CommonData data2 = new CommonDataImpl("Key1");
new Thread(() -> data2.addData()).start();
现在,当且仅当数据对象(传递给线程)的键相同时,我才需要同步这些线程。
到目前为止我的思考过程:
我尝试过 class 为给定的密钥提供动态锁定,看起来像这样。
/*
* Singleton class
*/
public class DataAccessKeyToLockProvider {
private volatile Map<String, ReentrantLock> accessKeyToLockHolder = new ConcurrentHashMap<>();
private DataAccessKeyToLockProvider() {
}
public ReentrantLock getLock(String key) {
return accessKeyToLockHolder.putIfAbsent(key, new ReentrantLock());
}
public void removeLock(BSSKey key) {
ReentrantLock removedLock = accessKeyToLockHolder.remove(key);
}
}
所以每个线程都会调用这个 class 并获取锁并使用它,并在处理完成后将其删除。但这可能会导致第二个线程可以获得第一个线程插入的锁对象并等待第一个线程释放锁的情况。一旦第一个线程移除锁,现在第三个线程将获得完全不同的锁,因此第二个线程和第三个线程不再同步。
像这样:
new Thread(() -> {
ReentrantLock lock = DataAccessKeyToLockProvider.get(data.getDataAccessKey());
lock.lock();
data.addData();
lock.unlock();
DataAccessKeyToLockProvider.remove(data.getDataAccessKey());
).start();
如果您需要任何其他详细信息来帮助我解决问题,请告诉我
P.S:从锁提供者那里删除密钥是强制性的,因为我将处理数百万个密钥(不一定是字符串),所以我不希望锁提供者吃光我的记忆
受@rzwitserloot 提供的解决方案的启发,我尝试放置一些通用代码,等待另一个线程完成其处理,然后再授予对下一个线程的访问权限。
public class GenericKeyToLockProvider<K> {
private volatile Map<K, ReentrantLock> keyToLockHolder = new ConcurrentHashMap<>();
public synchronized ReentrantLock getLock(K key) {
ReentrantLock existingLock = keyToLockHolder.get(key);
try {
if (existingLock != null && existingLock.isLocked()) {
existingLock.lock(); // Waits for the thread that acquired the lock previously to release it
}
return keyToLockHolder.put(key, new ReentrantLock()); // Override with the new lock
} finally {
if (existingLock != null) {
existingLock.unlock();
}
}
}
}
但看起来上一个线程创建的条目不会被删除。无论如何要解决这个问题?
首先,澄清一下:您或者使用ReentrantLock
,或者您使用synchronized
。你不在 ReentrantLock 实例上同步(你在任何你想要的对象上同步)——或者,如果你想走锁定路线,你可以在你的锁定对象上调用 lock 锁定方法,使用 try/finally 守卫始终确保您稍后调用 unlock
(并且根本不要使用 synchronized
)。
synchronized
为低级API。 Lock
和 java.util.concurrent
包中的所有其他 类 级别更高,提供了更多的抽象。不时细读 j.u.c 包中所有 类 的 javadoc 通常是个好主意,里面的东西非常有用。
关键问题是删除所有对锁定对象的引用(从而确保它可以被垃圾收集),但直到您确定有零个活动线程锁定在它上面。您当前的方法不知道有多少 类 在等待。这需要解决。一旦你 return 一个 Lock 对象的实例,它就是 'out of your hands' 并且不可能跟踪调用者是否会在它上面调用 lock
。因此,你不能那样做。相反,调用 lock 作为工作的一部分; getLock
方法实际上应该将锁定作为操作的一部分。这样,您 就可以控制流程。然而,让我们先退一步:
你说你会有数百万把钥匙。好的;但是您不太可能拥有数百万个线程。毕竟,一个线程需要一个堆栈,即使使用 -Xss
参数将堆栈大小减少到最小 128k 左右,一百万个线程意味着您正在使用 128GB 的 RAM 用于堆栈;似乎不太可能。
因此,虽然您可能拥有数百万个密钥,但 'locked' 个密钥的数量要少得多。让我们关注那些。
您可以制作一个 ConcurrentHashMap 将您的字符串键映射到锁定对象。那么:
获取锁:
创建一个新的锁对象(字面意思:Object o = new Object();
- 我们将使用 synchronized
)并使用 putIfAbsent
将其添加到地图中。如果您设法创建了 key/value 对(将使用 == 的 returned 对象与您创建的对象进行比较;如果它们相同,则您是添加它的人),您明白了,去吧,运行 代码。完成后,获取对象的同步锁,发送通知,释放并删除:
public void doWithLocking(String key, Runnable op) {
Object locker = new Object();
Object o = concurrentMap.putIfAbsent(key, locker);
if (o == locker) {
op.run();
synchronized (locker) {
locker.notifyAll(); // wake up everybody waiting.
concurrentMap.remove(key); // this has to be inside!
}
} else {
...
}
}
要等到锁可用,首先获取锁对象上的锁,然后检查 concurrentMap 是否仍然包含它。如果没有,您现在可以重试此操作。如果它还在,那么我们现在等待通知。无论如何,我们总是从头开始重试。因此:
public void performWithLocking(String key, Runnable op) throws InterruptedException {
while (true) {
Object locker = new Object();
Object o = concurrentMap.putIfAbsent(key, locker);
if (o == locker) {
try {
op.run();
} finally {
// We want to lock even if the operation throws!
synchronized (locker) {
locker.notifyAll(); // wake up everybody waiting.
concurrentMap.remove(key); // this has to be inside!
}
}
return;
} else {
synchronized (o) {
if (concurrentMap.containsKey(key)) o.wait();
}
}
}
}
}
您可以使用串联 'lock' 和 'unlock' 方法,而不是将操作与锁定密钥一起执行的设置,但现在您 运行 有编写的风险忘记调用解锁的代码。因此我不建议这样做!
您可以这样调用它,例如:
keyedLockSupportThingie.doWithLocking("mykey", () -> {
System.out.println("Hello, from safety!");
});
我有三个不同的线程,它们为 read/manipulate 一些所有线程共有的数据创建了三个不同的对象。现在,我需要确保我们一次只允许访问一个线程。
这个例子是这样的。
public interface CommonData {
public void addData(); // adds data to the cache
public void getDataAccessKey(); // Key that will be common across different threads for each data type
}
/*
* Singleton class
*/
public class CommonDataCache() {
private final Map dataMap = new HashMap(); // this takes keys and values as custom objects
}
接口的实现 class 看起来像这样
class CommonDataImpl implements CommonData {
private String key;
public CommonDataImpl1(String key) {
this.key = key;
}
public void addData() {
// access the singleton cache class and add
}
public void getDataAccessKey() {
return key;
}
}
每个线程将被调用如下:
CommonData data = new CommonDataImpl("Key1");
new Thread(() -> data.addData()).start();
CommonData data1 = new CommonDataImpl("Key1");
new Thread(() -> data1.addData()).start();
CommonData data2 = new CommonDataImpl("Key1");
new Thread(() -> data2.addData()).start();
现在,当且仅当数据对象(传递给线程)的键相同时,我才需要同步这些线程。
到目前为止我的思考过程:
我尝试过 class 为给定的密钥提供动态锁定,看起来像这样。
/*
* Singleton class
*/
public class DataAccessKeyToLockProvider {
private volatile Map<String, ReentrantLock> accessKeyToLockHolder = new ConcurrentHashMap<>();
private DataAccessKeyToLockProvider() {
}
public ReentrantLock getLock(String key) {
return accessKeyToLockHolder.putIfAbsent(key, new ReentrantLock());
}
public void removeLock(BSSKey key) {
ReentrantLock removedLock = accessKeyToLockHolder.remove(key);
}
}
所以每个线程都会调用这个 class 并获取锁并使用它,并在处理完成后将其删除。但这可能会导致第二个线程可以获得第一个线程插入的锁对象并等待第一个线程释放锁的情况。一旦第一个线程移除锁,现在第三个线程将获得完全不同的锁,因此第二个线程和第三个线程不再同步。
像这样:
new Thread(() -> {
ReentrantLock lock = DataAccessKeyToLockProvider.get(data.getDataAccessKey());
lock.lock();
data.addData();
lock.unlock();
DataAccessKeyToLockProvider.remove(data.getDataAccessKey());
).start();
如果您需要任何其他详细信息来帮助我解决问题,请告诉我
P.S:从锁提供者那里删除密钥是强制性的,因为我将处理数百万个密钥(不一定是字符串),所以我不希望锁提供者吃光我的记忆
受@rzwitserloot 提供的解决方案的启发,我尝试放置一些通用代码,等待另一个线程完成其处理,然后再授予对下一个线程的访问权限。
public class GenericKeyToLockProvider<K> {
private volatile Map<K, ReentrantLock> keyToLockHolder = new ConcurrentHashMap<>();
public synchronized ReentrantLock getLock(K key) {
ReentrantLock existingLock = keyToLockHolder.get(key);
try {
if (existingLock != null && existingLock.isLocked()) {
existingLock.lock(); // Waits for the thread that acquired the lock previously to release it
}
return keyToLockHolder.put(key, new ReentrantLock()); // Override with the new lock
} finally {
if (existingLock != null) {
existingLock.unlock();
}
}
}
}
但看起来上一个线程创建的条目不会被删除。无论如何要解决这个问题?
首先,澄清一下:您或者使用ReentrantLock
,或者您使用synchronized
。你不在 ReentrantLock 实例上同步(你在任何你想要的对象上同步)——或者,如果你想走锁定路线,你可以在你的锁定对象上调用 lock 锁定方法,使用 try/finally 守卫始终确保您稍后调用 unlock
(并且根本不要使用 synchronized
)。
synchronized
为低级API。 Lock
和 java.util.concurrent
包中的所有其他 类 级别更高,提供了更多的抽象。不时细读 j.u.c 包中所有 类 的 javadoc 通常是个好主意,里面的东西非常有用。
关键问题是删除所有对锁定对象的引用(从而确保它可以被垃圾收集),但直到您确定有零个活动线程锁定在它上面。您当前的方法不知道有多少 类 在等待。这需要解决。一旦你 return 一个 Lock 对象的实例,它就是 'out of your hands' 并且不可能跟踪调用者是否会在它上面调用 lock
。因此,你不能那样做。相反,调用 lock 作为工作的一部分; getLock
方法实际上应该将锁定作为操作的一部分。这样,您 就可以控制流程。然而,让我们先退一步:
你说你会有数百万把钥匙。好的;但是您不太可能拥有数百万个线程。毕竟,一个线程需要一个堆栈,即使使用 -Xss
参数将堆栈大小减少到最小 128k 左右,一百万个线程意味着您正在使用 128GB 的 RAM 用于堆栈;似乎不太可能。
因此,虽然您可能拥有数百万个密钥,但 'locked' 个密钥的数量要少得多。让我们关注那些。
您可以制作一个 ConcurrentHashMap 将您的字符串键映射到锁定对象。那么:
获取锁:
创建一个新的锁对象(字面意思:Object o = new Object();
- 我们将使用 synchronized
)并使用 putIfAbsent
将其添加到地图中。如果您设法创建了 key/value 对(将使用 == 的 returned 对象与您创建的对象进行比较;如果它们相同,则您是添加它的人),您明白了,去吧,运行 代码。完成后,获取对象的同步锁,发送通知,释放并删除:
public void doWithLocking(String key, Runnable op) {
Object locker = new Object();
Object o = concurrentMap.putIfAbsent(key, locker);
if (o == locker) {
op.run();
synchronized (locker) {
locker.notifyAll(); // wake up everybody waiting.
concurrentMap.remove(key); // this has to be inside!
}
} else {
...
}
}
要等到锁可用,首先获取锁对象上的锁,然后检查 concurrentMap 是否仍然包含它。如果没有,您现在可以重试此操作。如果它还在,那么我们现在等待通知。无论如何,我们总是从头开始重试。因此:
public void performWithLocking(String key, Runnable op) throws InterruptedException {
while (true) {
Object locker = new Object();
Object o = concurrentMap.putIfAbsent(key, locker);
if (o == locker) {
try {
op.run();
} finally {
// We want to lock even if the operation throws!
synchronized (locker) {
locker.notifyAll(); // wake up everybody waiting.
concurrentMap.remove(key); // this has to be inside!
}
}
return;
} else {
synchronized (o) {
if (concurrentMap.containsKey(key)) o.wait();
}
}
}
}
}
您可以使用串联 'lock' 和 'unlock' 方法,而不是将操作与锁定密钥一起执行的设置,但现在您 运行 有编写的风险忘记调用解锁的代码。因此我不建议这样做!
您可以这样调用它,例如:
keyedLockSupportThingie.doWithLocking("mykey", () -> {
System.out.println("Hello, from safety!");
});