使用 synchronizedMap 和 ReentrantLock 的用户级锁定方法

User level Locking Approach With synchronizedMap & ReentrantLock

我想实现基于用户的锁定,如下面的代码所示。

如果用户“1234”(java.lang.String.class 标识符)正在执行某个进程,或者某个进程正在使用用户,则其他进程应等待该进程完成。听起来很简单,我尝试使用 ReenterrentLock 让它工作,但我陷入了永久等待状态和死锁,虽然我正要使用 synchronised 块让它工作,但我想通过ReenterrentLock.

下面是我的代码和日志。

让我知道我做错了什么。

//Research.java file
  public static final int PERIOD = 10;
  public static final int END_INCLUSIVE = 23;
  public static final int N_THREADS = 8;

  public static void main(String[] args) {
    final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS);
    IntStream.rangeClosed(1, END_INCLUSIVE).forEach(value -> executorService.submit(() -> {
      final String user = value % 2 == 0 ? "1234" : "5678";
      process(value + "process", user);
    }));
    executorService.shutdown();
  }

  private static void process(String tag, String user) {
    System.out.println("waiting tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
    AccountLock.getInstance().lock(user);
    System.out.println("in tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
    sleep(tag, PERIOD);
    AccountLock.getInstance().unlock(user);
    System.out.println("out tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
  }

  private static void sleep(String tag, long s) {
    boolean interrupt = false;
    try {
      TimeUnit.SECONDS.sleep(s);
    } catch (InterruptedException e) {
      interrupt = true;
      e.printStackTrace();
    } finally {
      if (interrupt) {
        Thread.currentThread().interrupt();
      }
    }
  }
/**
 * AccountLock
 */
final class AccountLock {
  private static final Map<String, Lock> LOCK_MAP = Collections.synchronizedMap(new HashMap<>());
  private static volatile AccountLock INSTANCE;

  private AccountLock() {
  }

  public static AccountLock getInstance() {
    if (INSTANCE == null) {
      synchronized (AccountLock.class) {
        if (INSTANCE == null) {
          INSTANCE = new AccountLock();
        }
      }
    }
    return INSTANCE;
  }

  public void lock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.lock();
      return lock;
    });
    LOCK_MAP.computeIfAbsent(user, s -> {
      final ReentrantLock lock = new ReentrantLock(true);
      lock.lock();
      return lock;
    });
  }

  public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.unlock();
      return null;
    });
  }
}
//logs
//waiting tag=2process,user=1234,TH=pool-1-thread-2
//waiting tag=3process,user=5678,TH=pool-1-thread-3
//waiting tag=1process,user=5678,TH=pool-1-thread-1
//waiting tag=4process,user=1234,TH=pool-1-thread-4
//waiting tag=5process,user=5678,TH=pool-1-thread-5
//in tag=3process,user=5678,TH=pool-1-thread-3
//in tag=4process,user=1234,TH=pool-1-thread-4
//in tag=5process,user=5678,TH=pool-1-thread-5
//waiting tag=6process,user=1234,TH=pool-1-thread-6
//waiting tag=7process,user=5678,TH=pool-1-thread-7
//waiting tag=8process,user=1234,TH=pool-1-thread-8

如果您需要相同的线程转储,请告诉我。

我在 mac

上使用 java8

死锁是由地图和锁的交互产生的。

更准确地说:您使用 Collections.syncrhonized 映射,这实际上对原始映射实例的每个(或左右)方法都设置了互斥。

这意味着,例如,只要有人在 computeIfAbsent(或 computeIfPresent)调用中,就不能在地图实例上调用其他方法。

到目前为止没问题。

除此之外, computeIfxx 调用中,您通过获取锁实例执行另一个锁定操作。

拥有两个不同的锁并且不维护严格的锁获取顺序是导致死锁的完美方法,您在此处进行了演示。

年表示例:

  1. 线程T1进入Map(获取锁M),创建锁L1并获取。 T1持有M和L1。
  2. 线程 T1 存在 map.computeXX,释放 M。T1 持有 L1(仅)。
  3. 线程T2进入映射(获取M)。 T1持有L1,T2持有M.
  4. 线程T2试图获取L1(与T1相同),被T1阻塞。 T1持有L1,T2持有M等待L1。你看到它来了吗?
  5. 线程 T1 已完成。它要释放L1,所以试图进入地图,这意味着试图获得T2持有的M。 T1持有L1,等待M。T2持有M,等待L1。游戏结束。

解决方案:不尝试同时创建锁和获取锁是很诱人的,例如在锁定方法内时不获取锁。让我们试试(只是为了说明一点,你会看到它有多难)。

如果你打破这个嵌入式锁模式,当你的线程已经有另一个锁时,你将永远不会有 lock/unlock 调用,从而防止任何死锁模式(死锁不会发生在单个可重入锁上)。

这意味着:将您的 AccountLock class 从锁定实例更改为锁定工厂实例。或者,如果您想整合东西,请以不同的方式进行。

lock(String user) {
    LOCKS.computeIfAbsent(user, u -> new ReentrantLock()).lock();
}

通过将锁获取移到地图的方法调用之外,我已将其从地图的互斥体中移除,并消除了死锁的可能性。

更正后,我创建了另一个错误。

在解锁方面,您使用 computeIfPresent 其中 returns 为空。这意味着一旦完成锁定,您打算将其从地图中删除。这一切都很好,但它在解锁情况下造成了竞争条件。

  1. T1 为用户 1 创建锁 L1,将其放入映射并锁定。
  2. T2 想要相同的 L1,从地图上获取它,并等待它的可用性
  3. T1 完成,释放 L1(T2 还不知道),并将其从地图中移除
  4. T3 进来想为用户 1 加锁,但是 L1 从映射中消失了,所以它创建 L2 并获取它。
  5. T2 发现 L1 空闲,获取它并工作
  6. 此时T2和T3同时访问用户1,不好
  7. T2 首先完成,转到地图实例并释放用户 1 的锁,即 L2,它一开始从未获得过锁。 IllegalMonitorStateException.

没有简单的方法可以解决这个问题。您必须确保想要访问用户的并发线程具有一致的锁实例(这里的问题是 T1 释放锁,而 T2 持有锁,但尚未锁定)。这不会发生在您的原始代码中 - 因为映射的互斥锁,但该互斥锁创建了死锁。

那怎么办? 您永远无法从地图中删除键。

public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
        lock.unlock();
        return lock;
});

但是没有人从映射中释放密钥,它会无限增长 - 可以添加一个 thread-safe 计数器来检查在释放锁之前是否“正在获取”锁。这会反过来造成另一个错误吗?

话虽如此,这里还有几点需要加强:

  • 锁释放应该在finally块中执行
  • 您创建的锁实例数量没有限制。你可能想避免那个(这是真正困难的部分)
  • 在地图级别使用互斥体有点浪费。 ConcurrentHashMap 将提供更细粒度的锁定机制
  • 谁知道这里留下了什么错误。

结论

尝试看看 this other question 中提供的任何解决方案是否可以帮助您实现目标。使用受人尊敬的库的实现可能更安全。

并发编程很难。其他人已经为您完成了。