如何释放信号量并让任何线程继续?

How to release a semaphore and let any threads continue?

我想创建一个信号量来防止某个方法一次执行超过 1 次。

如果有任何其他线程请求访问,它应该等到信号量被释放:

private Map<String, Semaphore> map;

public void test() {
    String hash; //prevent to run the long running method with the same hash concurrently
    if (map.contains(hash)) {
        map.get(hash).aquire(); //wait for release of the lock
        callLongRunningMethod();
    } else {
        Semaphore s = new Semaphore(1);
        map.put(hash, s);
        callLongRunningMethod();
        s.release(); //any number of registered threads should continue
        map.remove(hash);
    }
}

问题:如何只用一个线程锁定信号量,然后释放它,以便任何个线程在释放后可以继续运行?

一些说明:

想象一下长运行方法是一个事务方法。查看数据库。如果未找到任何条目,则会发送一个繁重的 XML 请求并将其保存到数据库。也可能会触发进一步的异步处理,因为这应该是数据的 "initial fetch"。然后 return 来自 DB 的对象(在该方法内)。如果数据库条目已经存在,它将直接 return 实体。

现在如果多个线程同时访问 long 运行 方法,所有方法都会获取繁重的 XML(流量,性能),并且所有方法都会尝试持久化相同的方法对象到数据库中(因为 long 运行 方法是事务性的)。导致例如非唯一异常。再加上所有这些都触发​​了可选的异步线程。

当除一个线程外的所有线程都被锁定时,只有第一个线程负责持久化对象。然后,完成后,所有其他线程将检测到该条目已存在于 DB 中,并只为该对象提供服务。

我认为您可以使用非常高的 permit 数字(高于线程数,例如 2000000)来做到这一点。

然后在应该 运行 独占的函数中 acquire 完整的许可数 (acquire(2000000)) 而在其他线程中你 acquire 只有一个许可.

据我了解,这里不需要使用Semaphore。相反,您应该使用 ReentrantReadWriteLock。此外,test 方法不是线程安全的。

下面的示例是使用 RWL 实现您的逻辑

private ConcurrentMap<String, ReadWriteLock> map = null;

void test() {
    String hash = null;
    ReadWriteLock rwl = new ReentrantReadWriteLock(false);
    ReadWriteLock lock = map.putIfAbsent(hash,  rwl);

    if (lock == null) {
        lock = rwl;
    }

    if (lock.writeLock().tryLock()) {
        try {
            compute();
            map.remove(hash);
        } finally {
            lock.writeLock().unlock();
        }
    } else {
        lock.readLock().lock();
        try {
            compute();
        } finally {
            lock.readLock().unlock();
        }
    }
}

在此代码中,第一个成功的线程将获取 WriteLock 而其他 Thread 将等待写锁的释放。释放 WriteLock 后,所有等待释放的 Thread 将同时进行。

据我了解您的需求,您希望能够确保第一次由一个线程执行该任务,然后您希望允许多个线程执行它,如果是这样,您需要依赖一个CountDownLatch 如下:

下面是如何用 CountDownLatch 实现它:

private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>(); 

public void test(String hash) {
    final CountDownLatch latch = new CountDownLatch(1);
    final CountDownLatch previous = map.putIfAbsent(hash, latch);
    if (previous == null) {
        try {
            callLongRunningMethod();
        } finally {
            map.remove(hash, latch);
            latch.countDown();
        }
    } else {
        try {
            previous.await();
            callLongRunningMethod();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

我使用CountDownLatch结束如下:

private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>();

public void run() {
        boolean active = false;
        CountDownLatch count = null;

        try {
            if (map.containsKey(hash)) {
                count = map.get(hash);
                count.await(60, TimeUnit.SECONDS); //wait for release or timeout
            } else {
                count = new CountDownLatch(1);
                map.put(hash, count); //block any threads with same hash
                active = true;
            }

            return runLongRunningTask();
        } finally {
            if (active) {
                count.countDown(); //release
                map.remove(hash, count);
            }
        }
}

我认为最简单的方法是使用 ExecutorServiceFuture:

class ContainingClass {
  private final ConcurrentHashMap<String, Future<?>> pending =
    new ConcurrentHashMap<>();
  private final ExecutorService executor;

  ContainingClass(ExecutorService executor) {
    this.executor = executor;
  }

  void test(String hash) {
    Future<?> future = pending.computeIfAbsent(
        hash,
        () -> executor.submit(() -> longRunningMethod()));

    // Exception handling omitted for clarity.
    try {
      future.get();  // Block until LRM has finished.
    } finally {
      // Always remove: in case of exception, this allows
      // the value to be computed again.
      pending.values().remove(future);
    }
  }
}

Ideone Demo

从值中删除未来是线程安全的,因为 computeIfAbsentremove 是原子的:computeIfAbsentremove 之前是 运行,在这种情况下,现有的未来将被返回,并立即完成;或者在 运行 之后,添加了一个新的未来,导致对 longRunningMethod.

的新调用

请注意,它从 pending.values() 中删除了未来,而不是直接从 pending 中删除:考虑以下示例:

  • 线程 1 和线程 2 运行 同时
  • 线程 1 完成,并删除值。
  • 线程 3 运行,向地图添加新的未来
  • 线程 2 完成,并尝试删除该值。

如果通过键从映射中删除未来,线程 2 将删除线程 3 的未来,这与线程 2 的未来是不同的实例。

这也简化了 longRunningMethod,因为不再需要为阻塞线程执行 "check if I need to do anything":Future.get() 在阻塞线程中成功完成就足够了表示不需要额外的工作。