多线程更新实体
Update entity in multithreading
我使用 Hibernate 和 Spring Boot 更改数据库中的用户平衡。例如,我有 1000 笔付款给不同的用户,有些付款可能会超过 1 次给同一用户。
我使用执行程序服务在不同的线程中执行此操作。
支付服务:
@Override
public void pay(BigDecimal amount) {
List<Users> users = userRepository.findBySomeCriteria();
ExecutorService executorService = Executors.newFixedThreadPool(10);
users.forEach(u -> {
executorService.execute(() -> walletService.add(u, amount, BalanceType.CASH));
executorService.execute(() -> walletService.add(u, amount, BalanceType.TRADING));
});
executorService.shutDown();
}
钱包服务:
@Override
public void add(User user, BigDecimal amount, Wallet.BalanceType balanceType) {
ReentrantLock lock = locks.computeIfAbsent(LOCK_KEY + user.getId(), (key) -> new ReentrantLock());
boolean isLock = lock.tryLock();
while (!isLock) {
isLock = lock.tryLock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Wallet wallet = walletRepository.findByUser(user).orElseThrow(
() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Wallet not found")
);
log.debug("[1] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);
switch (balanceType) {
case CASH -> wallet.setCashBalance(wallet.getCashBalance().add(amount));
case TRADING -> wallet.setTradingBalance(wallet.getTradingBalance().add(amount));
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Wrong balance type");
}
log.debug("[2] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);
Wallet test = walletRepository.saveAndFlush(wallet);
log.debug("[3] Cash balance: {}, trading balance: {}, add amount: {}", test.getCashBalance(), test.getTradingBalance(), amount);
} finally {
lock.unlock();
}
}
这里的问题是有时平衡没有改变。我尝试了 public synchronized void add
而不是 ReentrantLock
,但与 ReentrantLock
.
相比,不正确的余额发生的频率更高
对于 ReentrantLock,日志是:
10:00:00.925 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.925 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.984 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.996 [1] Cash balance: 10, trading balance: 0, add amount: 10, TRADING
10:00:00.996 [2] Cash balance: 10, trading balance: 10, add amount: 10, TRADING
10:00:01.044 [3] Cash balance: 10, trading balance: 10, add amount: 10, TRADING
- 预期:现金余额:10,交易余额:10
- 给定:现金余额:0,交易余额:0
在日志中,一切似乎都是正确的,但未为某些用户保存余额。
使用同步方法,每个用户在 1 个线程中使用 2 种类型的余额
executorService.execute(() -> {
walletService.add(u, amount, BalanceType.TRADING));
walletService.add(u, amount, BalanceType.TRADING));
}
});
日志是
10:00:00.100 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.100 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.164 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.166 [1] Cash balance: 0, trading balance: 0, add amount: 10, TRADING
10:00:00.166 [2] Cash balance: 0, trading balance: 10, add amount: 10, TRADING
10:00:00.230 [3] Cash balance: 0, trading balance: 10, add amount: 10, TRADING
- 预期:现金余额:10,交易余额:10
- 给定:现金余额:0,交易余额:10
在我第二次调用时看到的日志中,我从数据库中得到未更新的余额。
为什么会出现这个问题,如何保证在多线程中改变它的正确平衡?
至于我,你试图使用错误的方法,你的决定不应该按设计行事。您不应该在应用程序级别处理并发 - 让数据库为您做。另外,如果按顺序加锁写数据库的话,在不同的线程中执行walletService.add是没有用的
因此,只需为您的 钱包 实体设置适当的 optimistic or pessimistic 锁,即可在不同的线程中使用它而无需担心。
此外,请记住一个有趣的事实:如果您调用 saveAndFlush 方法,数据库中的更改并不会固定。方法只有在@Transactional注解的情况下才会提交事务,所以应该实现存储中间层classes,将存储层class注解为@Transactional,并通过此方式调用钱包仓库中间 class.
我使用 Hibernate 和 Spring Boot 更改数据库中的用户平衡。例如,我有 1000 笔付款给不同的用户,有些付款可能会超过 1 次给同一用户。 我使用执行程序服务在不同的线程中执行此操作。
支付服务:
@Override
public void pay(BigDecimal amount) {
List<Users> users = userRepository.findBySomeCriteria();
ExecutorService executorService = Executors.newFixedThreadPool(10);
users.forEach(u -> {
executorService.execute(() -> walletService.add(u, amount, BalanceType.CASH));
executorService.execute(() -> walletService.add(u, amount, BalanceType.TRADING));
});
executorService.shutDown();
}
钱包服务:
@Override
public void add(User user, BigDecimal amount, Wallet.BalanceType balanceType) {
ReentrantLock lock = locks.computeIfAbsent(LOCK_KEY + user.getId(), (key) -> new ReentrantLock());
boolean isLock = lock.tryLock();
while (!isLock) {
isLock = lock.tryLock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Wallet wallet = walletRepository.findByUser(user).orElseThrow(
() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Wallet not found")
);
log.debug("[1] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);
switch (balanceType) {
case CASH -> wallet.setCashBalance(wallet.getCashBalance().add(amount));
case TRADING -> wallet.setTradingBalance(wallet.getTradingBalance().add(amount));
default -> throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Wrong balance type");
}
log.debug("[2] Cash balance: {}, trading balance: {}, add amount: {}", wallet.getCashBalance(), wallet.getTradingBalance(), amount);
Wallet test = walletRepository.saveAndFlush(wallet);
log.debug("[3] Cash balance: {}, trading balance: {}, add amount: {}", test.getCashBalance(), test.getTradingBalance(), amount);
} finally {
lock.unlock();
}
}
这里的问题是有时平衡没有改变。我尝试了 public synchronized void add
而不是 ReentrantLock
,但与 ReentrantLock
.
对于 ReentrantLock,日志是:
10:00:00.925 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.925 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.984 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.996 [1] Cash balance: 10, trading balance: 0, add amount: 10, TRADING
10:00:00.996 [2] Cash balance: 10, trading balance: 10, add amount: 10, TRADING
10:00:01.044 [3] Cash balance: 10, trading balance: 10, add amount: 10, TRADING
- 预期:现金余额:10,交易余额:10
- 给定:现金余额:0,交易余额:0
在日志中,一切似乎都是正确的,但未为某些用户保存余额。
使用同步方法,每个用户在 1 个线程中使用 2 种类型的余额
executorService.execute(() -> {
walletService.add(u, amount, BalanceType.TRADING));
walletService.add(u, amount, BalanceType.TRADING));
}
});
日志是
10:00:00.100 [1] Cash balance: 0, trading balance: 0, add amount: 10, CASH
10:00:00.100 [2] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.164 [3] Cash balance: 10, trading balance: 0, add amount: 10, CASH
10:00:00.166 [1] Cash balance: 0, trading balance: 0, add amount: 10, TRADING
10:00:00.166 [2] Cash balance: 0, trading balance: 10, add amount: 10, TRADING
10:00:00.230 [3] Cash balance: 0, trading balance: 10, add amount: 10, TRADING
- 预期:现金余额:10,交易余额:10
- 给定:现金余额:0,交易余额:10
在我第二次调用时看到的日志中,我从数据库中得到未更新的余额。
为什么会出现这个问题,如何保证在多线程中改变它的正确平衡?
至于我,你试图使用错误的方法,你的决定不应该按设计行事。您不应该在应用程序级别处理并发 - 让数据库为您做。另外,如果按顺序加锁写数据库的话,在不同的线程中执行walletService.add是没有用的
因此,只需为您的 钱包 实体设置适当的 optimistic or pessimistic 锁,即可在不同的线程中使用它而无需担心。
此外,请记住一个有趣的事实:如果您调用 saveAndFlush 方法,数据库中的更改并不会固定。方法只有在@Transactional注解的情况下才会提交事务,所以应该实现存储中间层classes,将存储层class注解为@Transactional,并通过此方式调用钱包仓库中间 class.