多线程更新实体

Update entity in multithreading

我使用 Hibernate 和 Spring Boot 更改数据库中的用户平衡。例如,我有 1000 笔付款给不同的用户,有些付款可能会超过 1 次给同一用户。 我使用执行程序服务在不同的线程中执行此操作。

支付服务:

@Override
public void pay(BigDecimal amount) {
    List<Users> users = userRepository.findBySomeCriteria();
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    users.forEach(u -> {
       executorService.execute(() -> walletService.add(u, amount, BalanceType.CASH));
       executorService.execute(() -> walletService.add(u, amount, BalanceType.TRADING));
    });
    executorService.shutDown();
}

钱包服务:

@Override
public void add(User user, BigDecimal amount, Wallet.BalanceType balanceType) {
    ReentrantLock lock = locks.computeIfAbsent(LOCK_KEY + user.getId(), (key) -> new ReentrantLock());
    boolean isLock = lock.tryLock();
    while (!isLock) {
        isLock = lock.tryLock();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    try {
        Wallet wallet = walletRepository.findByUser(user).orElseThrow(
                () -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Wallet not found")
        );

        log.debug("[1] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);

        switch (balanceType) {
            case CASH -> wallet.setCashBalance(wallet.getCashBalance().add(amount));
            case TRADING -> wallet.setTradingBalance(wallet.getTradingBalance().add(amount));
            default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Wrong balance type");

        }
        log.debug("[2] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);

        Wallet test = walletRepository.saveAndFlush(wallet);

        log.debug("[3] Cash balance: {}, trading balance: {}, add amount: {}", test.getCashBalance(), test.getTradingBalance(), amount);
    } finally {
        lock.unlock();
    }
}

这里的问题是有时平衡没有改变。我尝试了 public synchronized void add 而不是 ReentrantLock,但与 ReentrantLock.

相比,不正确的余额发生的频率更高

对于 ReentrantLock,日志是:

10:00:00.925 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.925 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.984 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.996 [1] Cash balance: 10, trading balance: 0, add amount: 10, TRADING
10:00:00.996 [2] Cash balance: 10, trading balance: 10, add amount: 10, TRADING
10:00:01.044 [3] Cash balance: 10, trading balance: 10, add amount: 10, TRADING

在日志中,一切似乎都是正确的,但未为某些用户保存余额。

使用同步方法,每个用户在 1 个线程中使用 2 种类型的余额

executorService.execute(() -> {
       walletService.add(u, amount, BalanceType.TRADING));
       walletService.add(u, amount, BalanceType.TRADING));
    }
});

日志是

10:00:00.100 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.100 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.164 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.166 [1] Cash balance: 0, trading balance: 0, add amount: 10, TRADING
10:00:00.166 [2] Cash balance: 0, trading balance: 10, add amount: 10, TRADING
10:00:00.230 [3] Cash balance: 0, trading balance: 10, add amount: 10, TRADING

在我第二次调用时看到的日志中,我从数据库中得到未更新的余额。

为什么会出现这个问题,如何保证在多线程中改变它的正确平衡?

至于我,你试图使用错误的方法,你的决定不应该按设计行事。您不应该在应用程序级别处理并发 - 让数据库为您做。另外,如果按顺序加锁写数据库的话,在不同的线程中执行walletService.add是没有用的

因此,只需为您的 钱包 实体设置适当的 optimistic or pessimistic 锁,即可在不同的线程中使用它而无需担心。

此外,请记住一个有趣的事实:如果您调用 saveAndFlush 方法,数据库中的更改并不会固定。方法只有在@Transactional注解的情况下才会提交事务,所以应该实现存储中间层classes,将存储层class注解为@Transactional,并通过此方式调用钱包仓库中间 class.