让多个线程对数据集进行操作,而一个线程将其汇总

Letting multiple Threads operate on a data set while one Thread sums it up

我正在尝试实施一个银行系统,我有一组账户。有多个线程试图在账户之间转移资金,而一个线程连续(或者更确切地说,在随机时间)试图总结银行中的总资金(所有账户余额的总和)。

解决这个问题的方法一开始听起来很明显;对执行事务的线程使用 ReentrantReadWriteLocksreadLock,对执行求和的线程使用 writeLock。然而,在以这种方式实现之后(参见下面的代码),我发现性能大幅下降 / "transaction-throughput" 甚至与仅使用一个线程进行事务相比也是如此。

请注意,这甚至还没有使用求和方法,因此从未获取过 writeLock。如果我只删除标有 // !! 的行,也不调用求和方法,突然 "transfer throughput" 使用多线程时比使用单线程时高很多,这就是目标。

我现在的问题是,如果我从不尝试获取 writeLock,为什么简单地引入 readWriteLock 会使整个过程变慢那么多,而我在这里做错了什么,因为我找不到问题。

锁定很昂贵,但在您的情况下,我假设当您 运行 测试时可能会有某种 "almost deadlock":如果某个线程在 synchronized(from){} 块,而另一个线程想要解锁它的 synchronized(to){} 块中的 from 实例,那么它将无法:第一个 synchronized 将阻止线程 #2进入 synchronized(to){} 块,因此锁不会很快释放。

这可能会导致很多线程挂在锁的队列中,这使得 get/release 锁变慢。

更多注意事项:当第二部分 (to.setBalance(to.getBalance() + amount);) 由于某种原因(异常、死锁)未执行时,您的代码将导致问题。您需要找到一种方法来围绕这两个操作创建一个事务,以确保它们要么都被执行,要么 none 被执行。

执行此操作的一个好方法是创建一个 Balance 值对象。在您的代码中,您可以创建两个新的,更新两个余额,然后只调用两个设置器 - 因为设置器不会失败,所以两个余额都将被更新,或者代码将在调用任何设置器之前失败。

首先,将更新放入它自己的 synchronized 块中是正确的,即使 getter 和 setter 本身是 synchronized,所以你避免了 check-then-act 反模式。

但是,从性能的角度来看,这并不是最佳选择,因为您获得了同一个锁三次(from 帐户获得了四次)。 JVM 或 HotSpot 优化器知道同步原语并能够优化这种嵌套同步模式,但是(现在我们不得不猜测一下)如果您在中间获得另一个锁,它可能会阻止这些优化。

正如在另一个问题中已经建议的那样,您可以转向无锁更新,但当然您必须完全理解它。无锁更新以一个特殊操作为中心,compareAndSet 仅当变量具有预期的旧值时才执行更新,换句话说,其间没有执行并发更新,而执行检查和更新作为一个原子操作。并且该操作不是使用 synchronized 而是直接使用专用的 CPU 指令实现的。

使用模式总是这样

  1. 读取当前值
  2. 计算新值(或拒绝更新)
  3. 尝试执行更新,如果当前值仍然相同,则更新将成功

缺点是更新可能会失败,这需要重复这三个步骤,但如果计算量不是太大,这是可以接受的,因为更新失败表明另一个线程必须在其间更新成功,所以会有永远是进步。

这导致了帐户的示例代码:

static void safeWithdraw(AtomicInteger account, int amount) {
    for(;;) { // a loop as we might have to repeat the steps
        int current=account.get(); // 1. read the current value
        if(amount>current) throw new IllegalStateException();// 2. possibly reject
        int newValue=current-amount; // 2. calculate new value
        // 3. update if current value didn’t change
        if(account.compareAndSet(current, newValue))
            return; // exit on success
    }
}

因此,为了支持无锁访问,提供 getBalancesetBalance 操作是远远不够的,因为每次尝试从 getset 中组合一个操作没有锁定的操作将失败。 您有三个选择:

  1. 将每个支持的更新操作作为专用方法提供,如 safeWithdraw 方法
  2. 提供 compareAndSet 方法以允许调用者使用该方法编写自己的更新操作
  3. 提供一个以更新函数为参数的更新方法,如AtomicInteger does in Java 8; 当然,这在使用 Java 8 时特别方便,您可以在其中使用 lambda 表达式来实现实际的更新功能。

请注意 AtomicInteger 本身使用了所有选项。有针对 increment 等常见操作的专用更新方法,还有 compareAndSet 方法允许组合任意更新操作。

您通常会使用 锁或 synchronized,同时使用两者是不常见的。

要管理您的方案,您通常会在每个帐户上使用细粒度的锁,而不是像现在这样使用粗粒度的锁。您还可以使用侦听器实现总计机制。

public interface Listener {

    public void changed(int oldValue, int newValue);
}

public class Account {

    private int id;
    private int balance;
    protected ReadWriteLock lock = new ReentrantReadWriteLock();
    List<Listener> accountListeners = new ArrayList<>();

    public Account(int id) {
        this.id = id;
        this.balance = 0;
    }

    public int getBalance() {
        int localBalance;
        lock.readLock().lock();
        try {
            localBalance = this.balance;
        } finally {
            lock.readLock().unlock();
        }
        return localBalance;
    }

    public void setBalance(int balance) {
        if (balance < 0) {
            throw new IllegalArgumentException("Negative balance");
        }
        // Keep track of the old balance for the listener.
        int oldValue = this.balance;
        lock.writeLock().lock();
        try {
            this.balance = balance;
        } finally {
            lock.writeLock().unlock();
        }
        if (this.balance != oldValue) {
            // Inform all listeners of any change.
            accountListeners.stream().forEach((l) -> {
                l.changed(oldValue, this.balance);
            });
        }
    }

    public boolean lock() throws InterruptedException {
        return lock.writeLock().tryLock(1, TimeUnit.SECONDS);
    }

    public void unlock() {
        lock.writeLock().unlock();
    }

    public void addListener(Listener l) {
        accountListeners.add(l);
    }

    public int getId() {
        return this.id;
    }

}

public class BankingSystem {

    protected List<Account> accounts;

    public boolean transfer(Account from, Account to, int amount) throws InterruptedException {
        if (from.getId() != to.getId()) {
            if (from.lock()) {
                try {
                    if (from.getBalance() < amount) {
                        return false;
                    }
                    if (to.lock()) {
                        try {
                            // We have write locks on both accounts.
                            from.setBalance(from.getBalance() - amount);
                            to.setBalance(to.getBalance() + amount);
                        } finally {
                            to.unlock();
                        }

                    } else {
                        // Not sure what to do - failed to lock the account.
                    }
                } finally {
                    from.unlock();
                }

            } else {
                // Not sure what to do - failed to lock the account.
            }
        }
        return true;
    }

    // Rest of class..
}

请注意,您可以在同一个线程中两次获取写锁 - 第二次也是允许的。锁仅排除来自 other 个线程的访问。