如何释放信号量并让任何线程继续?
How to release a semaphore and let any threads continue?
我想创建一个信号量来防止某个方法一次执行超过 1 次。
如果有任何其他线程请求访问,它应该等到信号量被释放:
private Map<String, Semaphore> map;
public void test() {
String hash; //prevent to run the long running method with the same hash concurrently
if (map.contains(hash)) {
map.get(hash).aquire(); //wait for release of the lock
callLongRunningMethod();
} else {
Semaphore s = new Semaphore(1);
map.put(hash, s);
callLongRunningMethod();
s.release(); //any number of registered threads should continue
map.remove(hash);
}
}
问题:如何只用一个线程锁定信号量,然后释放它,以便任何个线程在释放后可以继续运行?
一些说明:
想象一下长运行方法是一个事务方法。查看数据库。如果未找到任何条目,则会发送一个繁重的 XML 请求并将其保存到数据库。也可能会触发进一步的异步处理,因为这应该是数据的 "initial fetch"。然后 return 来自 DB 的对象(在该方法内)。如果数据库条目已经存在,它将直接 return 实体。
现在如果多个线程同时访问 long 运行 方法,所有方法都会获取繁重的 XML(流量,性能),并且所有方法都会尝试持久化相同的方法对象到数据库中(因为 long 运行 方法是事务性的)。导致例如非唯一异常。再加上所有这些都触发了可选的异步线程。
当除一个线程外的所有线程都被锁定时,只有第一个线程负责持久化对象。然后,完成后,所有其他线程将检测到该条目已存在于 DB 中,并只为该对象提供服务。
我认为您可以使用非常高的 permit
数字(高于线程数,例如 2000000)来做到这一点。
然后在应该 运行 独占的函数中 acquire
完整的许可数 (acquire(2000000)
) 而在其他线程中你 acquire
只有一个许可.
据我了解,这里不需要使用Semaphore
。相反,您应该使用 ReentrantReadWriteLock
。此外,test
方法不是线程安全的。
下面的示例是使用 RWL 实现您的逻辑
private ConcurrentMap<String, ReadWriteLock> map = null;
void test() {
String hash = null;
ReadWriteLock rwl = new ReentrantReadWriteLock(false);
ReadWriteLock lock = map.putIfAbsent(hash, rwl);
if (lock == null) {
lock = rwl;
}
if (lock.writeLock().tryLock()) {
try {
compute();
map.remove(hash);
} finally {
lock.writeLock().unlock();
}
} else {
lock.readLock().lock();
try {
compute();
} finally {
lock.readLock().unlock();
}
}
}
在此代码中,第一个成功的线程将获取 WriteLock
而其他 Thread
将等待写锁的释放。释放 WriteLock
后,所有等待释放的 Thread
将同时进行。
据我了解您的需求,您希望能够确保第一次由一个线程执行该任务,然后您希望允许多个线程执行它,如果是这样,您需要依赖一个CountDownLatch
如下:
下面是如何用 CountDownLatch
实现它:
private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>();
public void test(String hash) {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch previous = map.putIfAbsent(hash, latch);
if (previous == null) {
try {
callLongRunningMethod();
} finally {
map.remove(hash, latch);
latch.countDown();
}
} else {
try {
previous.await();
callLongRunningMethod();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
我使用CountDownLatch
结束如下:
private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>();
public void run() {
boolean active = false;
CountDownLatch count = null;
try {
if (map.containsKey(hash)) {
count = map.get(hash);
count.await(60, TimeUnit.SECONDS); //wait for release or timeout
} else {
count = new CountDownLatch(1);
map.put(hash, count); //block any threads with same hash
active = true;
}
return runLongRunningTask();
} finally {
if (active) {
count.countDown(); //release
map.remove(hash, count);
}
}
}
我认为最简单的方法是使用 ExecutorService
和 Future
:
class ContainingClass {
private final ConcurrentHashMap<String, Future<?>> pending =
new ConcurrentHashMap<>();
private final ExecutorService executor;
ContainingClass(ExecutorService executor) {
this.executor = executor;
}
void test(String hash) {
Future<?> future = pending.computeIfAbsent(
hash,
() -> executor.submit(() -> longRunningMethod()));
// Exception handling omitted for clarity.
try {
future.get(); // Block until LRM has finished.
} finally {
// Always remove: in case of exception, this allows
// the value to be computed again.
pending.values().remove(future);
}
}
}
从值中删除未来是线程安全的,因为 computeIfAbsent
和 remove
是原子的:computeIfAbsent
在 remove
之前是 运行,在这种情况下,现有的未来将被返回,并立即完成;或者在 运行 之后,添加了一个新的未来,导致对 longRunningMethod
.
的新调用
请注意,它从 pending.values()
中删除了未来,而不是直接从 pending
中删除:考虑以下示例:
- 线程 1 和线程 2 运行 同时
- 线程 1 完成,并删除值。
- 线程 3 运行,向地图添加新的未来
- 线程 2 完成,并尝试删除该值。
如果通过键从映射中删除未来,线程 2 将删除线程 3 的未来,这与线程 2 的未来是不同的实例。
这也简化了 longRunningMethod
,因为不再需要为阻塞线程执行 "check if I need to do anything":Future.get()
在阻塞线程中成功完成就足够了表示不需要额外的工作。
我想创建一个信号量来防止某个方法一次执行超过 1 次。
如果有任何其他线程请求访问,它应该等到信号量被释放:
private Map<String, Semaphore> map;
public void test() {
String hash; //prevent to run the long running method with the same hash concurrently
if (map.contains(hash)) {
map.get(hash).aquire(); //wait for release of the lock
callLongRunningMethod();
} else {
Semaphore s = new Semaphore(1);
map.put(hash, s);
callLongRunningMethod();
s.release(); //any number of registered threads should continue
map.remove(hash);
}
}
问题:如何只用一个线程锁定信号量,然后释放它,以便任何个线程在释放后可以继续运行?
一些说明:
想象一下长运行方法是一个事务方法。查看数据库。如果未找到任何条目,则会发送一个繁重的 XML 请求并将其保存到数据库。也可能会触发进一步的异步处理,因为这应该是数据的 "initial fetch"。然后 return 来自 DB 的对象(在该方法内)。如果数据库条目已经存在,它将直接 return 实体。
现在如果多个线程同时访问 long 运行 方法,所有方法都会获取繁重的 XML(流量,性能),并且所有方法都会尝试持久化相同的方法对象到数据库中(因为 long 运行 方法是事务性的)。导致例如非唯一异常。再加上所有这些都触发了可选的异步线程。
当除一个线程外的所有线程都被锁定时,只有第一个线程负责持久化对象。然后,完成后,所有其他线程将检测到该条目已存在于 DB 中,并只为该对象提供服务。
我认为您可以使用非常高的 permit
数字(高于线程数,例如 2000000)来做到这一点。
然后在应该 运行 独占的函数中 acquire
完整的许可数 (acquire(2000000)
) 而在其他线程中你 acquire
只有一个许可.
据我了解,这里不需要使用Semaphore
。相反,您应该使用 ReentrantReadWriteLock
。此外,test
方法不是线程安全的。
下面的示例是使用 RWL 实现您的逻辑
private ConcurrentMap<String, ReadWriteLock> map = null;
void test() {
String hash = null;
ReadWriteLock rwl = new ReentrantReadWriteLock(false);
ReadWriteLock lock = map.putIfAbsent(hash, rwl);
if (lock == null) {
lock = rwl;
}
if (lock.writeLock().tryLock()) {
try {
compute();
map.remove(hash);
} finally {
lock.writeLock().unlock();
}
} else {
lock.readLock().lock();
try {
compute();
} finally {
lock.readLock().unlock();
}
}
}
在此代码中,第一个成功的线程将获取 WriteLock
而其他 Thread
将等待写锁的释放。释放 WriteLock
后,所有等待释放的 Thread
将同时进行。
据我了解您的需求,您希望能够确保第一次由一个线程执行该任务,然后您希望允许多个线程执行它,如果是这样,您需要依赖一个CountDownLatch
如下:
下面是如何用 CountDownLatch
实现它:
private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>();
public void test(String hash) {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch previous = map.putIfAbsent(hash, latch);
if (previous == null) {
try {
callLongRunningMethod();
} finally {
map.remove(hash, latch);
latch.countDown();
}
} else {
try {
previous.await();
callLongRunningMethod();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
我使用CountDownLatch
结束如下:
private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>();
public void run() {
boolean active = false;
CountDownLatch count = null;
try {
if (map.containsKey(hash)) {
count = map.get(hash);
count.await(60, TimeUnit.SECONDS); //wait for release or timeout
} else {
count = new CountDownLatch(1);
map.put(hash, count); //block any threads with same hash
active = true;
}
return runLongRunningTask();
} finally {
if (active) {
count.countDown(); //release
map.remove(hash, count);
}
}
}
我认为最简单的方法是使用 ExecutorService
和 Future
:
class ContainingClass {
private final ConcurrentHashMap<String, Future<?>> pending =
new ConcurrentHashMap<>();
private final ExecutorService executor;
ContainingClass(ExecutorService executor) {
this.executor = executor;
}
void test(String hash) {
Future<?> future = pending.computeIfAbsent(
hash,
() -> executor.submit(() -> longRunningMethod()));
// Exception handling omitted for clarity.
try {
future.get(); // Block until LRM has finished.
} finally {
// Always remove: in case of exception, this allows
// the value to be computed again.
pending.values().remove(future);
}
}
}
从值中删除未来是线程安全的,因为 computeIfAbsent
和 remove
是原子的:computeIfAbsent
在 remove
之前是 运行,在这种情况下,现有的未来将被返回,并立即完成;或者在 运行 之后,添加了一个新的未来,导致对 longRunningMethod
.
请注意,它从 pending.values()
中删除了未来,而不是直接从 pending
中删除:考虑以下示例:
- 线程 1 和线程 2 运行 同时
- 线程 1 完成,并删除值。
- 线程 3 运行,向地图添加新的未来
- 线程 2 完成,并尝试删除该值。
如果通过键从映射中删除未来,线程 2 将删除线程 3 的未来,这与线程 2 的未来是不同的实例。
这也简化了 longRunningMethod
,因为不再需要为阻塞线程执行 "check if I need to do anything":Future.get()
在阻塞线程中成功完成就足够了表示不需要额外的工作。