T-SQL 使用锁定选择不同的行 table
T-SQL Selecting distinct rows using locking table
我的支付系统有问题。我将尝试提供一个最小的例子来重现它。访问数据库是使用Entity Framework6实现的,但在这个问题中似乎并不重要。
系统中有一个 table Payments (ID IDENTITY (1,1) bigint, Status int)
,包括待处理和已完成的所有付款。
一个 ASP.NET 应用程序在此 table 中插入许多付款 Status = 1 (Pending)
。
我有一个名为 "Pending processor" 的 Windows-service 应用程序,它运行多个(从 10 到 30 个)线程(worker)。
每个工作人员都在无限循环中执行相同的代码:它获取下一个待处理付款,对其进行处理并将其保存回数据库。
while (!_cancellationTokenSource.IsCancellationRequested)
{
var payment = _paymentsRepository.GetNextPending();
payment = _paymentProcessor.Process(payment);
_paymentsRepository.Update(payment);
}
现在,我遇到了一个问题:多个线程拾取同一个付款并处理多次。结果,我们有一个不满意的用户,损失了钱。
我决定使用锁定 table PaymentLocks (ID bigint UNIQUE)
来解决这个问题。现在,我将支付 ID 插入锁定 table,处理此付款并从锁定 table 中删除此 ID。
如果此付款已由另一个线程处理,则由于唯一约束和方法 returns null,插入失败。然后,这个线程什么都不处理,继续循环.
while (!_cancellationTokenSource.IsCancellationRequested)
{
var payment = _paymentsRepository.GetNextPendingAndAcquireLock();
if (payment != null) _paymentProcessor.Process(payment);
_paymentsRepository.UpdateAndReleaseLock(payment);
}
// ...
public Payment GetNextPendingAndAcquireLock()
{
try
{
string query = @"
BEGIN TRY
BEGIN TRAN
DECLARE @paymentId bigint;
SET @paymentId = (SELECT TOP 1 p.Id
FROM [dbo].[Payments] p
LEFT JOIN [dbo].[PaymentLocks] pl
ON p.Id = pl.PaymentId
WHERE pl.PaymentId IS NULL);
INSERT INTO [dbo].[PaymentLocks] ([PaymentId]) VALUES (@paymentId);
SELECT * FROM [dbo].[Payments] WHERE [Id] = @paymentId;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
END CATCH";
return _dbContext.Payments.SqlQuery(query).Single();
}
catch (EntityCommandExecutionException eex)
{
LoggingFactory.GetLogger().Error("PaymentsRepository.GetPendingAndAcquireLock INNER TRANSACTION caught an exception {0}", eex);
return null;
}
}
我的问题是,如果我有 10 个以上的线程工作者,那么这种插入失败就会经常发生(75% 的查询失败)。这意味着我失去了 75% 的工作效率,因为我的所有线程都是 "fighting" 支付相同的费用,而实际上只有一个线程得到它。
如何以这种方式编写查询,使其始终需要真正可用的 付款。也许,我应该在某些行上使用 SQL 锁;或更改我的查询,以便在获取下一个 ID 时立即锁定。
如有任何关于此问题的说明,请随时提出。
我相信可以通过重新设计减少竞争线程对 table 的争用:
- 有一个工作分配线程(或 TPL
Task
),它以不连续的时间间隔(例如通过 System.Threading.Timer
)轮询新工作的付款 table
- 然后在 N 个线程之间分配新工作以进行实际处理。
调度线程将轮询新工作(即等待未决/未处理的付款),并将它们分配给适当的处理线程。所有需要传递的似乎是要处理的@paymentId
。
可以使用每个任务专用的 ConcurrentQueue
处理线程/任务来对工作进行排队。
负载平衡可以像调度线程到适当队列的循环分配一样简单地完成。
Sql/DTC/XA 事务边界的职责在处理线程上,即原子锁定、执行工作并更新传入记录的状态。
出现竞争条件的机会很小,任务排队不止一次但尚未处理,因此在工作调度员的后续轮询中被选中。在这种情况下,处理任务 / Sql 需要通过添加检查来防止这种情况发生,以确保提供的交易的状态在继续其工作之前仍处于待处理状态。
如果并发队列开始备份,这种重复工作会加剧,因此如果队列深度变得太深,可能需要引入一种机制来限制调度线程。
这样,您应该能够取消自定义锁定机制,而是依赖于经过良好测试的事务管道,例如 SqlTransaction
、TransactionScopes
或 RDBMS 事务。
您可以利用交易和 table 提示(锁)来做到这一点。
while (!_cancellationTokenSource.IsCancellationRequested)
{
using (var dbContextTransaction = _dbContext.Database.BeginTransaction())
{
var payment = _paymentsRepository.GetNextPendingLock();
payment = _paymentProcessor.Process(payment);
_paymentsRepository.Update(payment);
}
}
public Payment GetNextPendingAndLock()
{
try
{
string query = @"
SELECT TOP 1 Id
FROM [dbo].[Payments] WITH (updlock, readpast)";
return _dbContext.Payments.SqlQuery(query).Single();
}
catch (EntityCommandExecutionException eex)
{
LoggingFactory.GetLogger().Error("PaymentsRepository.GetPendingAndLock caught an exception {0}", eex);
return null;
}
}
updlock 导致相关行被锁定并且没有进一步的读取(SELECT)将获取该行。
readpast 导致查询跳过锁定的行。
Here's 描述此方法的好文章。
我的支付系统有问题。我将尝试提供一个最小的例子来重现它。访问数据库是使用Entity Framework6实现的,但在这个问题中似乎并不重要。
系统中有一个 table Payments (ID IDENTITY (1,1) bigint, Status int)
,包括待处理和已完成的所有付款。
一个 ASP.NET 应用程序在此 table 中插入许多付款 Status = 1 (Pending)
。
我有一个名为 "Pending processor" 的 Windows-service 应用程序,它运行多个(从 10 到 30 个)线程(worker)。 每个工作人员都在无限循环中执行相同的代码:它获取下一个待处理付款,对其进行处理并将其保存回数据库。
while (!_cancellationTokenSource.IsCancellationRequested)
{
var payment = _paymentsRepository.GetNextPending();
payment = _paymentProcessor.Process(payment);
_paymentsRepository.Update(payment);
}
现在,我遇到了一个问题:多个线程拾取同一个付款并处理多次。结果,我们有一个不满意的用户,损失了钱。
我决定使用锁定 table PaymentLocks (ID bigint UNIQUE)
来解决这个问题。现在,我将支付 ID 插入锁定 table,处理此付款并从锁定 table 中删除此 ID。
如果此付款已由另一个线程处理,则由于唯一约束和方法 returns null,插入失败。然后,这个线程什么都不处理,继续循环.
while (!_cancellationTokenSource.IsCancellationRequested)
{
var payment = _paymentsRepository.GetNextPendingAndAcquireLock();
if (payment != null) _paymentProcessor.Process(payment);
_paymentsRepository.UpdateAndReleaseLock(payment);
}
// ...
public Payment GetNextPendingAndAcquireLock()
{
try
{
string query = @"
BEGIN TRY
BEGIN TRAN
DECLARE @paymentId bigint;
SET @paymentId = (SELECT TOP 1 p.Id
FROM [dbo].[Payments] p
LEFT JOIN [dbo].[PaymentLocks] pl
ON p.Id = pl.PaymentId
WHERE pl.PaymentId IS NULL);
INSERT INTO [dbo].[PaymentLocks] ([PaymentId]) VALUES (@paymentId);
SELECT * FROM [dbo].[Payments] WHERE [Id] = @paymentId;
COMMIT TRAN
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
END CATCH";
return _dbContext.Payments.SqlQuery(query).Single();
}
catch (EntityCommandExecutionException eex)
{
LoggingFactory.GetLogger().Error("PaymentsRepository.GetPendingAndAcquireLock INNER TRANSACTION caught an exception {0}", eex);
return null;
}
}
我的问题是,如果我有 10 个以上的线程工作者,那么这种插入失败就会经常发生(75% 的查询失败)。这意味着我失去了 75% 的工作效率,因为我的所有线程都是 "fighting" 支付相同的费用,而实际上只有一个线程得到它。
如何以这种方式编写查询,使其始终需要真正可用的 付款。也许,我应该在某些行上使用 SQL 锁;或更改我的查询,以便在获取下一个 ID 时立即锁定。
如有任何关于此问题的说明,请随时提出。
我相信可以通过重新设计减少竞争线程对 table 的争用:
- 有一个工作分配线程(或 TPL
Task
),它以不连续的时间间隔(例如通过System.Threading.Timer
)轮询新工作的付款 table - 然后在 N 个线程之间分配新工作以进行实际处理。
调度线程将轮询新工作(即等待未决/未处理的付款),并将它们分配给适当的处理线程。所有需要传递的似乎是要处理的@paymentId
。
可以使用每个任务专用的 ConcurrentQueue
处理线程/任务来对工作进行排队。
负载平衡可以像调度线程到适当队列的循环分配一样简单地完成。
Sql/DTC/XA 事务边界的职责在处理线程上,即原子锁定、执行工作并更新传入记录的状态。
出现竞争条件的机会很小,任务排队不止一次但尚未处理,因此在工作调度员的后续轮询中被选中。在这种情况下,处理任务 / Sql 需要通过添加检查来防止这种情况发生,以确保提供的交易的状态在继续其工作之前仍处于待处理状态。
如果并发队列开始备份,这种重复工作会加剧,因此如果队列深度变得太深,可能需要引入一种机制来限制调度线程。
这样,您应该能够取消自定义锁定机制,而是依赖于经过良好测试的事务管道,例如 SqlTransaction
、TransactionScopes
或 RDBMS 事务。
您可以利用交易和 table 提示(锁)来做到这一点。
while (!_cancellationTokenSource.IsCancellationRequested)
{
using (var dbContextTransaction = _dbContext.Database.BeginTransaction())
{
var payment = _paymentsRepository.GetNextPendingLock();
payment = _paymentProcessor.Process(payment);
_paymentsRepository.Update(payment);
}
}
public Payment GetNextPendingAndLock()
{
try
{
string query = @"
SELECT TOP 1 Id
FROM [dbo].[Payments] WITH (updlock, readpast)";
return _dbContext.Payments.SqlQuery(query).Single();
}
catch (EntityCommandExecutionException eex)
{
LoggingFactory.GetLogger().Error("PaymentsRepository.GetPendingAndLock caught an exception {0}", eex);
return null;
}
}
updlock 导致相关行被锁定并且没有进一步的读取(SELECT)将获取该行。
readpast 导致查询跳过锁定的行。
Here's 描述此方法的好文章。