多线程EF Core死锁+ BeginTransaction + Commit
EF Core deadlock with multi threads + BeginTransaction + Commit
我对 SaveChangesAsync()
和 BeginTransaction()
+ transaction.Commit()
的工作原理有一些疑问。
我的团队有一个 .NET Core worker,它从 Microsoft EventHub 接收事件并通过 EF Core 3 将数据保存到 SQL 服务器。
其中一种事件类型的数据量很大,所以我们创建了几个table,将数据分开,然后将其保存到这些table中。子 table 引用父 table 的 id
列 (FK_Key)。
在某些情况下,在保存新数据之前必须删除数据库中的某些数据,因此我们删除 -> 插入数据。
要将数据保存到数据库中,我们调用dbContext.Database.BeginTransaction()
和transaction.Commit()
。当我们 运行 worker 时,我们会得到像 Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
这样的死锁异常
我发现 PurgeDataInChildTables()
中的 .BatchDeleteAsync()
之一或 Upsert()
中的 BulkInsertOrUpdateAsync()
之一抛出死锁异常(每次我 运行 它都会改变工人)。
代码如下:
public async Task DeleteAndUpsert(List<MyEntity> entitiesToDelete, List<MyEntity> entitiesToUpsert)
{
if (entitiesToDelete.Any())
await myRepository.Delete(entitiesToDelete);
if (entitiesToUpsert.Any())
await myRepository.Upsert(entitiesToUpsert);
}
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child1>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child2>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child3>(x => x.Id).ToList());
transaction.Commit();
}
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
transaction.Commit();
}
}
}
private async Task PurgeDataInChildTables(IList<MyEntity> entities, MyDbContext dbContext)
{
var ids = entities.Select(x => x.Id).ToList();
await dbContext.Child1.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child2.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child3.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
}
当 worker 启动时,它会创建四个线程,它们都更新到同一个 table(也删除)。因此,我假设当一个线程启动一个事务而另一个线程启动另一个事务(或类似的东西..)然后尝试更新插入(或删除)子 tables.
时会发生死锁。
我尝试了一些方法来解决这个问题,并注意到当我删除 BeginTransaction()
并改用 SaveChangesAsync()
时,死锁似乎得到了解决。
修改后的代码如下:
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child1).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child2).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child3).ToList());
await dbContext.SaveChangesAsync();
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
await dbContext.SaveChangesAsync();
}
}
worker 启动后约 30 秒发生死锁,但在我修改代码后 2 ~ 3 分钟内没有发生,所以我认为问题已解决,认为如果我 运行工人更长。
最后,这是我的问题:
- 当我使用
BeginTransaction()
+ .Commit()
时会发生死锁,但当我使用SaveChangesAsync()
时不会发生。为什么会这样?
- 这两种方式在交易方面有什么区别?
- 如果修改后的代码仍然会导致死锁或者不是很好的解决方案,如何解决?
如果不查看数据库的分析会话,很难准确地说。需要查找的是采用何种锁(它在哪里 shared
以及它在哪里 exclusive
或 update
)以及交易何时 居然开了。我将描述一个需要用实际数据库分析来证明的理论行为。
当你用 Database.BeginTransaction() 包装所有东西时:
隔离级别不是由 EF 设置的,它使用数据库默认隔离级别。在 Microsoft SQL Server
的情况下,它将是 Read committed
。这种隔离级别表示并发事务可以读取数据,但如果正在进行修改,其他事务将等待它完成,即使它们只想读取。交易将在调用 Commit()
之前举行。
当您没有明确指定交易时:
Select 语句和 SaveChangesAsync
将导致具有相同隔离级别的单独事务默认为数据库。事务的保留时间不会超过它需要的时间:例如,在 SaveChangesAsync
的情况下,当所有更改都被写入时,从调用该方法的那一刻开始,它就会存在。
Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
当有多个事务试图访问某个资源时出现此消息,其中一个正在尝试读取数据而另一个正在尝试修改。在这种情况下,为避免死锁,数据库将尝试终止需要较少资源量才能回滚的事务。在您的情况下 — 这是一个尝试读取的事务。就回滚的重量而言,读取是轻量级的。
总结:
当你有一个巨大的锁持有一个资源很长一段时间时,它会阻止其他工作人员访问该资源,因为当其他工作人员可能在 var ids = entities.Select(x => x.Id).ToList();
点尝试读取时,数据库只会杀死其他工作人员的事务。当你重写你的代码时,你摆脱了长锁。更重要的是,正如我从 BulkInsertOrUpdateAsync 的文档中看到的那样,此扩展在每次调用时都使用内部事务,不影响也不涉及 EF 上下文。如果是这样,那么这意味着当数据不是通过扩展更改,而是以通常的 EF 方式更改时,实际事务甚至比对 SaveChangesAsync
的调用还少。
我对 SaveChangesAsync()
和 BeginTransaction()
+ transaction.Commit()
的工作原理有一些疑问。
我的团队有一个 .NET Core worker,它从 Microsoft EventHub 接收事件并通过 EF Core 3 将数据保存到 SQL 服务器。
其中一种事件类型的数据量很大,所以我们创建了几个table,将数据分开,然后将其保存到这些table中。子 table 引用父 table 的 id
列 (FK_Key)。
在某些情况下,在保存新数据之前必须删除数据库中的某些数据,因此我们删除 -> 插入数据。
要将数据保存到数据库中,我们调用dbContext.Database.BeginTransaction()
和transaction.Commit()
。当我们 运行 worker 时,我们会得到像 Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
我发现 PurgeDataInChildTables()
中的 .BatchDeleteAsync()
之一或 Upsert()
中的 BulkInsertOrUpdateAsync()
之一抛出死锁异常(每次我 运行 它都会改变工人)。
代码如下:
public async Task DeleteAndUpsert(List<MyEntity> entitiesToDelete, List<MyEntity> entitiesToUpsert)
{
if (entitiesToDelete.Any())
await myRepository.Delete(entitiesToDelete);
if (entitiesToUpsert.Any())
await myRepository.Upsert(entitiesToUpsert);
}
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child1>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child2>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child3>(x => x.Id).ToList());
transaction.Commit();
}
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
transaction.Commit();
}
}
}
private async Task PurgeDataInChildTables(IList<MyEntity> entities, MyDbContext dbContext)
{
var ids = entities.Select(x => x.Id).ToList();
await dbContext.Child1.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child2.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child3.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
}
当 worker 启动时,它会创建四个线程,它们都更新到同一个 table(也删除)。因此,我假设当一个线程启动一个事务而另一个线程启动另一个事务(或类似的东西..)然后尝试更新插入(或删除)子 tables.
时会发生死锁。
我尝试了一些方法来解决这个问题,并注意到当我删除 BeginTransaction()
并改用 SaveChangesAsync()
时,死锁似乎得到了解决。
修改后的代码如下:
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child1).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child2).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child3).ToList());
await dbContext.SaveChangesAsync();
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
await dbContext.SaveChangesAsync();
}
}
worker 启动后约 30 秒发生死锁,但在我修改代码后 2 ~ 3 分钟内没有发生,所以我认为问题已解决,认为如果我 运行工人更长。
最后,这是我的问题:
- 当我使用
BeginTransaction()
+.Commit()
时会发生死锁,但当我使用SaveChangesAsync()
时不会发生。为什么会这样? - 这两种方式在交易方面有什么区别?
- 如果修改后的代码仍然会导致死锁或者不是很好的解决方案,如何解决?
如果不查看数据库的分析会话,很难准确地说。需要查找的是采用何种锁(它在哪里 shared
以及它在哪里 exclusive
或 update
)以及交易何时 居然开了。我将描述一个需要用实际数据库分析来证明的理论行为。
当你用 Database.BeginTransaction() 包装所有东西时:
隔离级别不是由 EF 设置的,它使用数据库默认隔离级别。在 Microsoft SQL Server
的情况下,它将是 Read committed
。这种隔离级别表示并发事务可以读取数据,但如果正在进行修改,其他事务将等待它完成,即使它们只想读取。交易将在调用 Commit()
之前举行。
当您没有明确指定交易时:
Select 语句和 SaveChangesAsync
将导致具有相同隔离级别的单独事务默认为数据库。事务的保留时间不会超过它需要的时间:例如,在 SaveChangesAsync
的情况下,当所有更改都被写入时,从调用该方法的那一刻开始,它就会存在。
Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
当有多个事务试图访问某个资源时出现此消息,其中一个正在尝试读取数据而另一个正在尝试修改。在这种情况下,为避免死锁,数据库将尝试终止需要较少资源量才能回滚的事务。在您的情况下 — 这是一个尝试读取的事务。就回滚的重量而言,读取是轻量级的。
总结:
当你有一个巨大的锁持有一个资源很长一段时间时,它会阻止其他工作人员访问该资源,因为当其他工作人员可能在 var ids = entities.Select(x => x.Id).ToList();
点尝试读取时,数据库只会杀死其他工作人员的事务。当你重写你的代码时,你摆脱了长锁。更重要的是,正如我从 BulkInsertOrUpdateAsync 的文档中看到的那样,此扩展在每次调用时都使用内部事务,不影响也不涉及 EF 上下文。如果是这样,那么这意味着当数据不是通过扩展更改,而是以通常的 EF 方式更改时,实际事务甚至比对 SaveChangesAsync
的调用还少。