ReentrantReadWriteLock 多个读取线程

ReentrantReadWriteLock multiple reading threads

美好的一天

我有一个关于 ReentrantReadWriteLocks 的问题。我正在尝试解决一个问题,其中多个 reader 线程应该能够在数据结构上并行操作,而一个编写器线程只能单独操作(而没有 reader 线程处于活动状态)。我正在使用 Java 中的 ReentrantReadWriteLocks 实现这一点,但是从时间测量来看,reader 线程似乎也在相互锁定。我认为这不应该发生,所以我想知道我是否实施错误。我的实现方式如下:

readingMethod(){
    lock.readLock().lock();
    do reading ...
    lock.readLock().unlock();
}

writingMethod(){
    lock.writeLock().lock();
    do writing ...
    lock.writeLock().unlock();
}

其中读取方法被许多不同的线程调用。从时间上看,读取方法是顺序执行的,即使写入方法从未被调用过!关于这里出了什么问题的任何想法?提前谢谢你-干杯

编辑:我试图想出一个 SSCCE,我希望这是清楚的:

public class Bank {
private Int[] accounts;
public ReadWriteLock lock = new ReentrantReadWriteLock();

// Multiple Threads are doing transactions.
public void transfer(int from, int to, int amount){
    lock.readLock().lock(); // Locking read.

    // Consider this the do-reading.
    synchronized(accounts[from]){
        accounts[from] -= amount;
    }
    synchronized(accounts[to]){
        accounts[to] += amount;
    }

    lock.readLock().unlock(); // Unlocking read.
}

// Only one thread does summation.
public int totalMoney(){
    lock.writeLock().lock; // Locking write.

    // Consider this the do-writing.
    int sum = 0;
    for(int i = 0; i < accounts.length; i++){
        synchronized(accounts[i]){
            sum += accounts[i];
        }
    }

    lock.writeLock().unlock; // Unlocking write.

    return sum;
}}

我知道read-Lock里面的部分实际上不是读而是写。我这样做是因为有多个线程在执行写操作,而只有一个线程在执行读操作,但是在读操作的时候,不能对数组做任何改动。这在我的理解中有效。同样,只要不添加写方法和读锁,读锁内的代码就可以在多线程下正常工作。

您可以 运行 2 个线程进行此测试

static ReadWriteLock l = new ReentrantReadWriteLock();

static void readMehod() {
    l.readLock().lock();
    System.out.println(Thread.currentThread() + " entered");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    l.readLock().unlock();
    System.out.println(Thread.currentThread() + " exited");
}

并查看两个线程是否都进入了读锁。

您的代码非常糟糕,您不必担心任何性能影响。您的代码不是线程安全的。 从不在可变变量上同步

synchronized(accounts[from]){
    accounts[from] -= amount;
}

此代码执行以下操作:

  • 在没有任何同步的情况下读取数组 accounts 位置 from 的内容,因此可能读取到无可救药的过时值,或者由仍在其 [=16 中的线程写入的值=]块
  • 锁定它读取的任何对象(请记住 Integer 个对象 created by auto-boxing 的身份未指定 [-128 到 +127 范围除外])
  • 再次读取数组accounts中位置from
  • 的内容
  • 从其 int 值中减去 amount,自动装箱结果(在大多数情况下产生 不同的 对象)
  • 将新对象存储在数组 accounts 中的位置 from

这意味着不同的线程可以同时写入相同的数组位置,同时锁定在其第一次(非同步)读取时发现的不同 Integer 实例,从而打开数据竞争的可能性。

这也意味着如果这些位置恰好具有相同的值且碰巧由相同的实例表示,则线程可能会在不同的数组位置上相互阻塞。例如。用零值预初始化数组(或全部为 -128 到 +127 范围内的相同值)是接近单线程性能的好方法,因为零(或这些其他小值)是少数Integer 保证由同一实例表示的值。由于你没有经历过 NullPointerExceptions,你显然已经用一些东西预先初始化了数组。


总而言之,synchronized 适用于对象实例,而不适用于变量。这就是为什么在 int 变量上尝试编译时无法编译的原因。由于在不同对象上进行同步就像根本没有任何同步一样,因此您永远不应该在可变变量上进行同步。

如果您想要对不同帐户进行线程安全的并发访问,您可以使用 AtomicIntegers。这样的解决方案将为每个帐户使用一个 AtomicInteger 实例,该实例永远不会改变。使用其线程安全方法仅更新其余额值。

public class Bank {
    private final AtomicInteger[] accounts;
    public final ReadWriteLock lock = new ReentrantReadWriteLock();
    Bank(int numAccounts) {
        // initialize, keep in mind that this array MUST NOT change
        accounts=new AtomicInteger[numAccounts];
        for(int i=0; i<numAccounts; i++) accounts[i]=new AtomicInteger();
    }

    // Multiple Threads are doing transactions.
    public void transfer(int from, int to, int amount){
        final Lock sharedLock = lock.readLock();
        sharedLock.lock();
        try {
            accounts[from].addAndGet(-amount);
            accounts[to  ].addAndGet(+amount);
        }
        finally {
            sharedLock.unlock();
        }
    }

    // Only one thread does summation.
    public int totalMoney(){
        int sum = 0;
        final Lock exclusiveLock = lock.writeLock();
        exclusiveLock.lock();
        try {
            for(AtomicInteger account: accounts)
                sum += account.get();
        }
        finally {
            exclusiveLock.unlock();
        }
        return sum;
    }
}

为了完整起见,我猜这个问题会出现,这里是禁止拿走比可用资金更多的提款流程的样子:

static void safeWithdraw(AtomicInteger account, int amount) {
    for(;;) {
        int current=account.get();
        if(amount>current) throw new IllegalStateException();
        if(account.compareAndSet(current, current-amount)) return;
    }
}

可以通过将行 accounts[from].addAndGet(-amount); 替换为 safeWithdraw(accounts[from], amount); 来包含它。


写完上面的例子,我想起来还有一个class AtomicIntegerArray更适合这种任务...

private final AtomicIntegerArray accounts;
public final ReadWriteLock lock = new ReentrantReadWriteLock();

Bank(int numAccounts) {
    accounts=new AtomicIntegerArray(numAccounts);
}

// Multiple Threads are doing transactions.
public void transfer(int from, int to, int amount){
    final Lock sharedLock = lock.readLock();
    sharedLock.lock();
    try {
        accounts.addAndGet(from, -amount);
        accounts.addAndGet(to,   +amount);
    }
    finally {
        sharedLock.unlock();
    }
}

// Only one thread does summation.
public int totalMoney(){
    int sum = 0;
    final Lock exclusiveLock = lock.writeLock();
    exclusiveLock.lock();
    try {
        for(int ix=0, num=accounts.length(); ix<num; ix++)
            sum += accounts.get(ix);
    }
    finally {
        exclusiveLock.unlock();
    }
    return sum;
}