使用乐观并发方法处理多个生产者插入唯一 "immutable" 实体的有效方法是什么?
What is an efficient way to handle inserts of unique "immutable" entities by multiple producers with optimistic concurrency approach?
假设一个系统有多个并发生产者,每个生产者都努力保存一些 objects 的图表,其中包含以下可通过其名称唯一标识的公共实体:
CREATE TABLE CommonEntityGroup(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
Name NVARCHAR(100) NOT NULL
);
GO
CREATE UNIQUE INDEX IX_CommonEntityGroup_Name
ON CommonEntityGroup(Name)
GO
CREATE TABLE CommonEntity(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
Name NVARCHAR(100) NOT NULL,
CommonEntityGroupId INT NOT NULL,
CONSTRAINT FK_CommonEntity_CommonEntityGroup FOREIGN KEY(CommonEntityGroupId)
REFERENCES CommonEntityGroup(Id)
);
GO
CREATE UNIQUE INDEX IX_CommonEntity_CommonEntityGroupId_Name
ON CommonEntity(CommonEntityGroupId, Name)
GO
例如,生产者A节省了一些CommonEntityMeeting
,而生产者B节省了CommonEntitySet
。他们中的任何一个都必须坚持 CommonEntity
与他们的特定项目相关。
基本上,重点是:
- 有独立制作人。
- 它们同时运作。
- 理论上(尽管 可能 会改变,现在还不完全正确)它们将通过相同的 Web 服务运行 (ASP.Net Web API) , 只是它们各自的端点/"resources"。所以理想的解决方案不应依赖于此。
- 他们努力保存 object 的不同图形,其中可能包含尚未存在的 CommonEntity/CommonEntityGroup object。
- CommonEntity/CommonEntityGroup 是 immutable,一旦创建,将永远不会被修改或删除。
- CommonEntity/CommonEntityGroup 根据它们的某些属性(
Name
和相关的公共实体(如果有的话)是唯一的(例如 CommonEntity
是 CommonEntity.Name
+[=19= 唯一的])).
- 生产者不会 know/care 关于那些
CommonEntities
的 ID - 他们通常只传递带有 Names
(唯一)那些 CommonEntities
和相关信息的 DTO。所以任何 Common(Group)Entity
必须是 found/created by Name
.
- 有一个
生产者可能会尝试创造相同的东西
CommonEntity/CommonEntityGroup同时
- 虽然很多
这样的 CommonEntity/CommonEntityGroup object 更有可能已经
存在于数据库中。
因此,Entity Framework(数据库优先,虽然它可能并不重要)作为 DAL 和 SQL 服务器作为存储是一种有效且可靠的方法来确保所有这些生产者将成功地同时保留他们相交的 object 图表?
考虑到UNIQUE INDEX
已经确保不会重复CommonEntities
(Name,GroupName对是唯一的)我可以看到以下解决方案:
- 确保每个 CommonEntity/CommonGroupEntity 是 found/created+SaveChanged(),然后再构建 object 的图形的其余部分。
在这种情况下,当为相关实体调用 SaveChanges
时,不会有任何索引违规,因为其他生产者之前创建了相同的实体。
为了实现它我会有一些
public class CommonEntityGroupRepository // sort of
{
public CommonEntityGroupRepository(EntitiesDbContext db) ...
// CommonEntityRepository will use this class/method internally to create parent CommonEntityGroup.
public CommonEntityGroup FindOrCreateAndSave(String groupName)
{
return
this.TryFind(groupName) ?? // db.FirstOrDefault(...)
this.CreateAndSave(groupName);
}
private CommonEntityGroup CreateAndSave(String groupName)
{
var group = this.Db.CommonEntityGroups.Create();
group.Name = groupName;
this.Db.CommonGroups.Add(group)
try
{
this.Db.SaveChanges();
return group;
}
catch (DbUpdateException dbExc)
{
// Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)
return this.Find(groupName); // TryFind that throws exception.
}
}
}
使用这种方法,将多次调用 SaveChanges,并且每个 CommonEntity 都有自己的存储库,尽管这似乎是最可靠的解决方案。
- 如果发生索引违规,只需创建整个图并从头开始重建它
有点丑陋和低效(10 CommonEntities
我们可能需要重试 10 次),但简单且或多或少可靠。
- 只需创建整个图并在发生索引违规时替换重复的条目
不确定是否有一种简单可靠的方法来替换或多或少复杂的 object 图中的重复条目,尽管可以实施特定案例和更通用的 reflection-based 解决方案。
不过,与之前的解决方案一样,它可能需要多次重试。
- 尝试将此逻辑移动到数据库 (SP)
怀疑在存储过程中处理起来会更容易。它将与刚刚在数据库端实施的乐观或悲观方法相同。
虽然它可以提供更好的性能(在这种情况下不是问题)并将插入逻辑放在一个公共位置。
- Using SERIALIZABLE isolation level/TABLOCKX+SERIALIZABLE table hint in Stored Procedure - 它肯定可以工作,但我不想锁定 tables 完全超出了实际需要,因为真正的比赛非常罕见。正如标题中已经提到的,我想找到一些乐观的并发方法。
我可能会尝试第一个解决方案,但也许有更好的选择或一些潜在的陷阱。
选择方法当然取决于两个过程将使用的功能类型和数据量。
如果我们采用第一种方法,那么肯定对于每个 SaveChanges() 调用,Entity Framework 都会放置一个事务。如果有大量记录,这可能会降低性能。
如果有相当数量的记录需要inserted/updated,那么我肯定会选择基于存储过程的方法。使用这种方法,您将完全控制数据库并查询记录以检查它是否存在将非常容易(尽管此处可能需要进行一些微调)。我看不出用存储过程实现相同的功能是否会有任何挑战。通过一些实现优化,例如将数据加载到临时表(不是 SQL 临时表,而是可用于临时存储数据的物理表),这可以进一步增强以包含存储过程已处理的完整信息日志。
根据您最后的关键点,另一个解决方案是将您的 "Creation" 逻辑移动到中央应用程序 server/service (参见更新 2)队列用户可以使用 "add" 条记录。
因为你的大部分记录已经存在,如果你使用某种缓存,你应该能够使这非常有效
现在,大约有一个记录。
您必须记住,EF 并非旨在支持 "bulk" 操作,因此,创建数千条记录将(真的)很慢。
我已经使用了 2 种解决方案来帮助您,并且非常快速地获得了大量记录
1)EntityFramework.BulkInsert
2)SqlBulkCopy
两者都非常容易使用
还有,希望你已经看过了Fastest Way of Inserting in Entity Framework
更新
下面是我最近使用过两次的另一个解决方案
不要在用户执行 "Save" 时保存您的记录,而是将其安排在 X 秒后发生。
如果与此同时其他人试图保存相同的记录,只需 "slide" 计划日期。
您可以在下面看到一个示例代码,它尝试将同一条记录保存 10 次(同时),但实际保存只发生一次。
实际结果可以在这里看到:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace ConsoleApplicationScheduler
{
class Program
{
static void Main(string[] args)
{
ConcurrentSaveService service = new ConcurrentSaveService();
int entity = 1;
for (int i = 0; i < 10; i++)
{
//Save the same record 10 times(this could be conrurrent)
service.BeginSave(entity);
}
Console.ReadLine();
}
}
public class ConcurrentSaveService
{
private static readonly ConcurrentDictionary<int, DateTime> _trackedSubjectsDictionary = new ConcurrentDictionary<int, DateTime>();
private readonly int _delayInSeconds;
public ConcurrentSaveService()
{
_delayInSeconds = 5;
}
public async void BeginSave(int key)
{
Console.WriteLine("Started Saving");
DateTime existingTaskDate;
_trackedSubjectsDictionary.TryGetValue(key, out existingTaskDate);
DateTime scheduledDate = DateTime.Now.AddSeconds(_delayInSeconds);
_trackedSubjectsDictionary.AddOrUpdate(key, scheduledDate, (i, d) => scheduledDate);
if (existingTaskDate > DateTime.Now)
return;
do
{
await Task.Delay(TimeSpan.FromSeconds(_delayInSeconds));
DateTime loadedScheduledDate;
_trackedSubjectsDictionary.TryGetValue(key, out loadedScheduledDate);
if (loadedScheduledDate > DateTime.Now)
continue;
if (loadedScheduledDate == DateTime.MinValue)
break;
_trackedSubjectsDictionary.TryRemove(key, out loadedScheduledDate);
if (loadedScheduledDate > DateTime.MinValue)
{
//DoWork
Console.WriteLine("Update/Insert record:" + key);
}
break;
} while (true);
Console.WriteLine("Finished Saving");
}
}
}
更新 2
由于您可以在 WebAPI 应用程序中控制 "creation" 进程,因此您应该能够使用某种缓存来避免重复,如以下伪代码
using System.Collections.Concurrent;
using System.Web.Http;
namespace WebApplication2.Controllers
{
public class ValuesController : ApiController
{
static object _lock = new object();
static ConcurrentDictionary<string, object> cache = new ConcurrentDictionary<string, object>();
public object Post(InputModel value)
{
var existing = cache[value.Name];
if (existing != null)
return new object();//Your saved record
lock (_lock)
{
existing = cache[value.Name];
if (existing != null)
return new object();//Your saved record
object newRecord = new object();//Save your Object
cache.AddOrUpdate(value.Name, newRecord, (s, o) => newRecord);
return newRecord;
}
}
}
public class InputModel
{
public string Name;
}
}
Table 值参数
一种选择是使用 table valued parameters
而不是单独调用数据库。
使用 table 值参数的示例过程:
create type dbo.CommonEntity_udt as table (
CommonEntityGroupId int not null
, Name nvarchar(100) not null
, primary key (CommonEntityGroupId,Name)
);
go
create procedure dbo.CommonEntity_set (
@CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
set nocount on;
set xact_abort on;
if exists (
select 1
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
))
begin;
insert dbo.CommonEntity (Name)
select s.Name
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t with (updlock, holdlock)
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
);
end;
end;
go
table取值参数参考:
- SQL Server 2008 Table-Valued Parameters and C# Custom Iterators: A Match Made In Heaven! - Leonard Lobel
- Using Table Valued Parameters in Entity Framework - Ritesh Sharma
- Using Table-Valued Parameters in SQL Server and .NET - Erland Sommarskog
- how to use TVPs with Entity Framework 4.1 and CodeFirst
- Maximizing Performance with Table-Valued Parameters - Dan Guzman
除非有令人信服的论据,否则我不推荐 merge
。这种情况只是看插入,所以看起来有点大材小用。
带有 table 值参数的示例 merge
版本:
create procedure dbo.CommonEntity_merge (
@CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
set nocount on;
set xact_abort on;
if exists (
select 1
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
))
begin;
merge dbo.CommonEntity with (holdlock) as t
using (select CommonEntityGroupId, Name from @CommonEntity) as s
on (t.Name = s.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId)
when not matched by target
then insert (CommonEntityGroupId, Name)
values (s.CommonEntityGroupId, s.Name);
end;
end;
go
merge
参考:
- Use Caution with SQL Server's
MERGE
Statement - Aaron Bertrand
- The Case of the Blocking Merge Statement (LCK_M_RS_U locks) - Kendra Little
- UPSERT Race Condition With Merge - sqlteam
- An Interesting MERGE Bug - Paul White
- Can I optimize this merge statement - Aaron Bertrand
- If you are using indexed views and MERGE, please read this! - Aaron Bertrand
ignore_dup_key
代码注释:
// Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)
ignore_dup_key
is going to use serializable
behind the the scenes; potentially costly overhead on non-clustered indexes; and even when the index is clustered, can have significant costs depending on the amount of duplicates.
这可以在存储过程中使用 Sam Saffron's upsert (update/insert) pattern, or one of the patterns shown here: Performance impact of different error handling techniques - Aaron Bertrand 来处理。
Producers do not know/care about IDs of those CommonEntities - they
usually just pass DTOs with Names(unique) of those CommonEntities and
related information. So any Common(Group)Entity has to be
found/created by Name.
我假设 table 存储对象的 CommonEntity
通过它们的 ID
,而不是 Name
。
我假设对象的 table 定义看起来像这样:
CREATE TABLE SomeObject(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
ObjectName NVARCHAR(100) NOT NULL,
CommonEntityId INT NOT NULL,
CONSTRAINT FK_SomeObject_CommonEntity FOREIGN KEY(CommonEntityId)
REFERENCES CommonEntity(Id)
);
同时,高级SaveSomeObject
函数有CommonEntity.Name
和CommonEntityGroup.Name
(不是ID
)作为参数。这意味着函数必须在某处查找实体的 Name
并找到其对应的 ID
.
因此,带有参数 (ObjectName, CommonEntityName, CommonEntityGroupName)
的高级 SaveSomeObject
函数可以分两步实现:
CommonEntityID = GetCommonEntityID(CommonEntityName, CommonEntityGroupName);
SaveSomeObject(ObjectName, CommonEntityID);
GetCommonEntityID
是一个帮助程序 function/stored 过程,它通过 Name
查找实体的 ID
并在需要时创建一个实体(生成一个 ID
) .
这里我们明确地将这一步提取到一个单独的专用函数中。只有这个函数必须处理并发问题。它可以使用乐观并发方法或悲观并发方法来实现。这个函数的用户并不关心它使用了什么魔法来return有效ID
,但是用户可以确定他可以安全地使用returned ID
来持久化对象的其余部分。
悲观并发方式
悲观并发的做法很简单。确保只有一个实例
GetCommonEntityID
可以是 运行。我会为此使用 sp_getapplock
(而不是 SERIALIZABLE 事务隔离级别或 table 提示)。 sp_getapplock
本质上是一个互斥锁,一旦获得锁,我们就可以确定此存储过程的其他实例不会 运行 并行。这使逻辑变得简单 - 如果未找到,请尝试读取 ID
和 INSERT
新行。
CREATE PROCEDURE [dbo].[GetCommonEntityID]
@ParamCommonEntityName NVARCHAR(100),
@ParamCommonEntityGroupName NVARCHAR(100),
@ParamCommonEntityID int OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
BEGIN TRANSACTION;
BEGIN TRY
SET @ParamCommonEntityID = NULL;
DECLARE @VarCommonEntityGroupID int = NULL;
DECLARE @VarLockResult int;
EXEC @VarLockResult = sp_getapplock
@Resource = 'GetCommonEntityID_app_lock',
@LockMode = 'Exclusive',
@LockOwner = 'Transaction',
@LockTimeout = 60000,
@DbPrincipal = 'public';
IF @VarLockResult >= 0
BEGIN
-- Acquired the lock
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
IF @VarCommonEntityGroupID IS NULL
BEGIN
-- Such name doesn't exist, create it.
INSERT INTO CommonEntityGroup (Name)
VALUES (@ParamCommonEntityGroupName);
SET @VarCommonEntityGroupID = SCOPE_IDENTITY();
END;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
IF @ParamCommonEntityID IS NULL
BEGIN
-- Such name doesn't exist, create it.
INSERT INTO CommonEntity
(Name
,CommonEntityGroupId)
VALUES
(@ParamCommonEntityName
,@VarCommonEntityGroupID);
SET @ParamCommonEntityID = SCOPE_IDENTITY();
END;
END ELSE BEGIN
-- TODO: process the error. Retry
END;
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: process the error. Retry?
END CATCH;
END
乐观并发方式
不要试图锁定任何东西。乐观行动并查看ID
。如果未找到,请尝试 INSERT
新值并在存在唯一索引违规时重试。
CREATE PROCEDURE [dbo].[GetCommonEntityID]
@ParamCommonEntityName NVARCHAR(100),
@ParamCommonEntityGroupName NVARCHAR(100),
@ParamCommonEntityID int OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
SET @ParamCommonEntityID = NULL;
DECLARE @VarCommonEntityGroupID int = NULL;
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
WHILE @VarCommonEntityGroupID IS NULL
BEGIN
-- Such name doesn't exist, create it.
BEGIN TRANSACTION;
BEGIN TRY
INSERT INTO CommonEntityGroup (Name)
VALUES (@ParamCommonEntityGroupName);
SET @VarCommonEntityGroupID = SCOPE_IDENTITY();
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
-- error is indeed due to unique index violation and retry
END CATCH;
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
END;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
WHILE @ParamCommonEntityID IS NULL
BEGIN
-- Such name doesn't exist, create it.
BEGIN TRANSACTION;
BEGIN TRY
INSERT INTO CommonEntity
(Name
,CommonEntityGroupId)
VALUES
(@ParamCommonEntityName
,@VarCommonEntityGroupID);
SET @ParamCommonEntityID = SCOPE_IDENTITY();
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
-- error is indeed due to unique index violation and retry
END CATCH;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
END;
END
在这两种方法中,您都应该有重试逻辑。当您希望名称已经在实体 table 中并且重试的可能性很低(如问题中描述的情况)时,乐观的方法通常会更好。当您预计会有很多竞争进程尝试插入相同的名称时,悲观的方法通常会更好。如果您序列化插入,您可能会过得更好。
假设一个系统有多个并发生产者,每个生产者都努力保存一些 objects 的图表,其中包含以下可通过其名称唯一标识的公共实体:
CREATE TABLE CommonEntityGroup(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
Name NVARCHAR(100) NOT NULL
);
GO
CREATE UNIQUE INDEX IX_CommonEntityGroup_Name
ON CommonEntityGroup(Name)
GO
CREATE TABLE CommonEntity(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
Name NVARCHAR(100) NOT NULL,
CommonEntityGroupId INT NOT NULL,
CONSTRAINT FK_CommonEntity_CommonEntityGroup FOREIGN KEY(CommonEntityGroupId)
REFERENCES CommonEntityGroup(Id)
);
GO
CREATE UNIQUE INDEX IX_CommonEntity_CommonEntityGroupId_Name
ON CommonEntity(CommonEntityGroupId, Name)
GO
例如,生产者A节省了一些CommonEntityMeeting
,而生产者B节省了CommonEntitySet
。他们中的任何一个都必须坚持 CommonEntity
与他们的特定项目相关。
基本上,重点是:
- 有独立制作人。
- 它们同时运作。
- 理论上(尽管 可能 会改变,现在还不完全正确)它们将通过相同的 Web 服务运行 (ASP.Net Web API) , 只是它们各自的端点/"resources"。所以理想的解决方案不应依赖于此。
- 他们努力保存 object 的不同图形,其中可能包含尚未存在的 CommonEntity/CommonEntityGroup object。
- CommonEntity/CommonEntityGroup 是 immutable,一旦创建,将永远不会被修改或删除。
- CommonEntity/CommonEntityGroup 根据它们的某些属性(
Name
和相关的公共实体(如果有的话)是唯一的(例如CommonEntity
是CommonEntity.Name
+[=19= 唯一的])). - 生产者不会 know/care 关于那些
CommonEntities
的 ID - 他们通常只传递带有Names
(唯一)那些CommonEntities
和相关信息的 DTO。所以任何Common(Group)Entity
必须是 found/created byName
. - 有一个 生产者可能会尝试创造相同的东西 CommonEntity/CommonEntityGroup同时
- 虽然很多 这样的 CommonEntity/CommonEntityGroup object 更有可能已经 存在于数据库中。
因此,Entity Framework(数据库优先,虽然它可能并不重要)作为 DAL 和 SQL 服务器作为存储是一种有效且可靠的方法来确保所有这些生产者将成功地同时保留他们相交的 object 图表?
考虑到UNIQUE INDEX
已经确保不会重复CommonEntities
(Name,GroupName对是唯一的)我可以看到以下解决方案:
- 确保每个 CommonEntity/CommonGroupEntity 是 found/created+SaveChanged(),然后再构建 object 的图形的其余部分。
在这种情况下,当为相关实体调用 SaveChanges
时,不会有任何索引违规,因为其他生产者之前创建了相同的实体。
为了实现它我会有一些
public class CommonEntityGroupRepository // sort of
{
public CommonEntityGroupRepository(EntitiesDbContext db) ...
// CommonEntityRepository will use this class/method internally to create parent CommonEntityGroup.
public CommonEntityGroup FindOrCreateAndSave(String groupName)
{
return
this.TryFind(groupName) ?? // db.FirstOrDefault(...)
this.CreateAndSave(groupName);
}
private CommonEntityGroup CreateAndSave(String groupName)
{
var group = this.Db.CommonEntityGroups.Create();
group.Name = groupName;
this.Db.CommonGroups.Add(group)
try
{
this.Db.SaveChanges();
return group;
}
catch (DbUpdateException dbExc)
{
// Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)
return this.Find(groupName); // TryFind that throws exception.
}
}
}
使用这种方法,将多次调用 SaveChanges,并且每个 CommonEntity 都有自己的存储库,尽管这似乎是最可靠的解决方案。
- 如果发生索引违规,只需创建整个图并从头开始重建它
有点丑陋和低效(10 CommonEntities
我们可能需要重试 10 次),但简单且或多或少可靠。
- 只需创建整个图并在发生索引违规时替换重复的条目
不确定是否有一种简单可靠的方法来替换或多或少复杂的 object 图中的重复条目,尽管可以实施特定案例和更通用的 reflection-based 解决方案。
不过,与之前的解决方案一样,它可能需要多次重试。
- 尝试将此逻辑移动到数据库 (SP)
怀疑在存储过程中处理起来会更容易。它将与刚刚在数据库端实施的乐观或悲观方法相同。
虽然它可以提供更好的性能(在这种情况下不是问题)并将插入逻辑放在一个公共位置。
- Using SERIALIZABLE isolation level/TABLOCKX+SERIALIZABLE table hint in Stored Procedure - 它肯定可以工作,但我不想锁定 tables 完全超出了实际需要,因为真正的比赛非常罕见。正如标题中已经提到的,我想找到一些乐观的并发方法。
我可能会尝试第一个解决方案,但也许有更好的选择或一些潜在的陷阱。
选择方法当然取决于两个过程将使用的功能类型和数据量。
如果我们采用第一种方法,那么肯定对于每个 SaveChanges() 调用,Entity Framework 都会放置一个事务。如果有大量记录,这可能会降低性能。
如果有相当数量的记录需要inserted/updated,那么我肯定会选择基于存储过程的方法。使用这种方法,您将完全控制数据库并查询记录以检查它是否存在将非常容易(尽管此处可能需要进行一些微调)。我看不出用存储过程实现相同的功能是否会有任何挑战。通过一些实现优化,例如将数据加载到临时表(不是 SQL 临时表,而是可用于临时存储数据的物理表),这可以进一步增强以包含存储过程已处理的完整信息日志。
根据您最后的关键点,另一个解决方案是将您的 "Creation" 逻辑移动到中央应用程序 server/service (参见更新 2)队列用户可以使用 "add" 条记录。
因为你的大部分记录已经存在,如果你使用某种缓存,你应该能够使这非常有效
现在,大约有一个记录。
您必须记住,EF 并非旨在支持 "bulk" 操作,因此,创建数千条记录将(真的)很慢。
我已经使用了 2 种解决方案来帮助您,并且非常快速地获得了大量记录
1)EntityFramework.BulkInsert
2)SqlBulkCopy
两者都非常容易使用
还有,希望你已经看过了Fastest Way of Inserting in Entity Framework
更新
下面是我最近使用过两次的另一个解决方案
不要在用户执行 "Save" 时保存您的记录,而是将其安排在 X 秒后发生。
如果与此同时其他人试图保存相同的记录,只需 "slide" 计划日期。
您可以在下面看到一个示例代码,它尝试将同一条记录保存 10 次(同时),但实际保存只发生一次。
实际结果可以在这里看到:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace ConsoleApplicationScheduler
{
class Program
{
static void Main(string[] args)
{
ConcurrentSaveService service = new ConcurrentSaveService();
int entity = 1;
for (int i = 0; i < 10; i++)
{
//Save the same record 10 times(this could be conrurrent)
service.BeginSave(entity);
}
Console.ReadLine();
}
}
public class ConcurrentSaveService
{
private static readonly ConcurrentDictionary<int, DateTime> _trackedSubjectsDictionary = new ConcurrentDictionary<int, DateTime>();
private readonly int _delayInSeconds;
public ConcurrentSaveService()
{
_delayInSeconds = 5;
}
public async void BeginSave(int key)
{
Console.WriteLine("Started Saving");
DateTime existingTaskDate;
_trackedSubjectsDictionary.TryGetValue(key, out existingTaskDate);
DateTime scheduledDate = DateTime.Now.AddSeconds(_delayInSeconds);
_trackedSubjectsDictionary.AddOrUpdate(key, scheduledDate, (i, d) => scheduledDate);
if (existingTaskDate > DateTime.Now)
return;
do
{
await Task.Delay(TimeSpan.FromSeconds(_delayInSeconds));
DateTime loadedScheduledDate;
_trackedSubjectsDictionary.TryGetValue(key, out loadedScheduledDate);
if (loadedScheduledDate > DateTime.Now)
continue;
if (loadedScheduledDate == DateTime.MinValue)
break;
_trackedSubjectsDictionary.TryRemove(key, out loadedScheduledDate);
if (loadedScheduledDate > DateTime.MinValue)
{
//DoWork
Console.WriteLine("Update/Insert record:" + key);
}
break;
} while (true);
Console.WriteLine("Finished Saving");
}
}
}
更新 2 由于您可以在 WebAPI 应用程序中控制 "creation" 进程,因此您应该能够使用某种缓存来避免重复,如以下伪代码
using System.Collections.Concurrent;
using System.Web.Http;
namespace WebApplication2.Controllers
{
public class ValuesController : ApiController
{
static object _lock = new object();
static ConcurrentDictionary<string, object> cache = new ConcurrentDictionary<string, object>();
public object Post(InputModel value)
{
var existing = cache[value.Name];
if (existing != null)
return new object();//Your saved record
lock (_lock)
{
existing = cache[value.Name];
if (existing != null)
return new object();//Your saved record
object newRecord = new object();//Save your Object
cache.AddOrUpdate(value.Name, newRecord, (s, o) => newRecord);
return newRecord;
}
}
}
public class InputModel
{
public string Name;
}
}
Table 值参数
一种选择是使用 table valued parameters
而不是单独调用数据库。
使用 table 值参数的示例过程:
create type dbo.CommonEntity_udt as table (
CommonEntityGroupId int not null
, Name nvarchar(100) not null
, primary key (CommonEntityGroupId,Name)
);
go
create procedure dbo.CommonEntity_set (
@CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
set nocount on;
set xact_abort on;
if exists (
select 1
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
))
begin;
insert dbo.CommonEntity (Name)
select s.Name
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t with (updlock, holdlock)
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
);
end;
end;
go
table取值参数参考:
- SQL Server 2008 Table-Valued Parameters and C# Custom Iterators: A Match Made In Heaven! - Leonard Lobel
- Using Table Valued Parameters in Entity Framework - Ritesh Sharma
- Using Table-Valued Parameters in SQL Server and .NET - Erland Sommarskog
- how to use TVPs with Entity Framework 4.1 and CodeFirst
- Maximizing Performance with Table-Valued Parameters - Dan Guzman
除非有令人信服的论据,否则我不推荐 merge
。这种情况只是看插入,所以看起来有点大材小用。
带有 table 值参数的示例 merge
版本:
create procedure dbo.CommonEntity_merge (
@CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
set nocount on;
set xact_abort on;
if exists (
select 1
from @CommonEntity as s
where not exists (
select 1
from dbo.CommonEntity as t
where s.Name = t.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId
))
begin;
merge dbo.CommonEntity with (holdlock) as t
using (select CommonEntityGroupId, Name from @CommonEntity) as s
on (t.Name = s.Name
and s.CommonEntityGroupId = t.CommonEntityGroupId)
when not matched by target
then insert (CommonEntityGroupId, Name)
values (s.CommonEntityGroupId, s.Name);
end;
end;
go
merge
参考:
- Use Caution with SQL Server's
MERGE
Statement - Aaron Bertrand - The Case of the Blocking Merge Statement (LCK_M_RS_U locks) - Kendra Little
- UPSERT Race Condition With Merge - sqlteam
- An Interesting MERGE Bug - Paul White
- Can I optimize this merge statement - Aaron Bertrand
- If you are using indexed views and MERGE, please read this! - Aaron Bertrand
ignore_dup_key
代码注释:
// Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)
ignore_dup_key
is going to use serializable
behind the the scenes; potentially costly overhead on non-clustered indexes; and even when the index is clustered, can have significant costs depending on the amount of duplicates.
这可以在存储过程中使用 Sam Saffron's upsert (update/insert) pattern, or one of the patterns shown here: Performance impact of different error handling techniques - Aaron Bertrand 来处理。
Producers do not know/care about IDs of those CommonEntities - they usually just pass DTOs with Names(unique) of those CommonEntities and related information. So any Common(Group)Entity has to be found/created by Name.
我假设 table 存储对象的 CommonEntity
通过它们的 ID
,而不是 Name
。
我假设对象的 table 定义看起来像这样:
CREATE TABLE SomeObject(
Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
ObjectName NVARCHAR(100) NOT NULL,
CommonEntityId INT NOT NULL,
CONSTRAINT FK_SomeObject_CommonEntity FOREIGN KEY(CommonEntityId)
REFERENCES CommonEntity(Id)
);
同时,高级SaveSomeObject
函数有CommonEntity.Name
和CommonEntityGroup.Name
(不是ID
)作为参数。这意味着函数必须在某处查找实体的 Name
并找到其对应的 ID
.
因此,带有参数 (ObjectName, CommonEntityName, CommonEntityGroupName)
的高级 SaveSomeObject
函数可以分两步实现:
CommonEntityID = GetCommonEntityID(CommonEntityName, CommonEntityGroupName);
SaveSomeObject(ObjectName, CommonEntityID);
GetCommonEntityID
是一个帮助程序 function/stored 过程,它通过 Name
查找实体的 ID
并在需要时创建一个实体(生成一个 ID
) .
这里我们明确地将这一步提取到一个单独的专用函数中。只有这个函数必须处理并发问题。它可以使用乐观并发方法或悲观并发方法来实现。这个函数的用户并不关心它使用了什么魔法来return有效ID
,但是用户可以确定他可以安全地使用returned ID
来持久化对象的其余部分。
悲观并发方式
悲观并发的做法很简单。确保只有一个实例
GetCommonEntityID
可以是 运行。我会为此使用 sp_getapplock
(而不是 SERIALIZABLE 事务隔离级别或 table 提示)。 sp_getapplock
本质上是一个互斥锁,一旦获得锁,我们就可以确定此存储过程的其他实例不会 运行 并行。这使逻辑变得简单 - 如果未找到,请尝试读取 ID
和 INSERT
新行。
CREATE PROCEDURE [dbo].[GetCommonEntityID]
@ParamCommonEntityName NVARCHAR(100),
@ParamCommonEntityGroupName NVARCHAR(100),
@ParamCommonEntityID int OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
BEGIN TRANSACTION;
BEGIN TRY
SET @ParamCommonEntityID = NULL;
DECLARE @VarCommonEntityGroupID int = NULL;
DECLARE @VarLockResult int;
EXEC @VarLockResult = sp_getapplock
@Resource = 'GetCommonEntityID_app_lock',
@LockMode = 'Exclusive',
@LockOwner = 'Transaction',
@LockTimeout = 60000,
@DbPrincipal = 'public';
IF @VarLockResult >= 0
BEGIN
-- Acquired the lock
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
IF @VarCommonEntityGroupID IS NULL
BEGIN
-- Such name doesn't exist, create it.
INSERT INTO CommonEntityGroup (Name)
VALUES (@ParamCommonEntityGroupName);
SET @VarCommonEntityGroupID = SCOPE_IDENTITY();
END;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
IF @ParamCommonEntityID IS NULL
BEGIN
-- Such name doesn't exist, create it.
INSERT INTO CommonEntity
(Name
,CommonEntityGroupId)
VALUES
(@ParamCommonEntityName
,@VarCommonEntityGroupID);
SET @ParamCommonEntityID = SCOPE_IDENTITY();
END;
END ELSE BEGIN
-- TODO: process the error. Retry
END;
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: process the error. Retry?
END CATCH;
END
乐观并发方式
不要试图锁定任何东西。乐观行动并查看ID
。如果未找到,请尝试 INSERT
新值并在存在唯一索引违规时重试。
CREATE PROCEDURE [dbo].[GetCommonEntityID]
@ParamCommonEntityName NVARCHAR(100),
@ParamCommonEntityGroupName NVARCHAR(100),
@ParamCommonEntityID int OUTPUT
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
SET @ParamCommonEntityID = NULL;
DECLARE @VarCommonEntityGroupID int = NULL;
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
WHILE @VarCommonEntityGroupID IS NULL
BEGIN
-- Such name doesn't exist, create it.
BEGIN TRANSACTION;
BEGIN TRY
INSERT INTO CommonEntityGroup (Name)
VALUES (@ParamCommonEntityGroupName);
SET @VarCommonEntityGroupID = SCOPE_IDENTITY();
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
-- error is indeed due to unique index violation and retry
END CATCH;
SELECT @VarCommonEntityGroupID = ID
FROM CommonEntityGroup
WHERE Name = @ParamCommonEntityGroupName;
END;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
WHILE @ParamCommonEntityID IS NULL
BEGIN
-- Such name doesn't exist, create it.
BEGIN TRANSACTION;
BEGIN TRY
INSERT INTO CommonEntity
(Name
,CommonEntityGroupId)
VALUES
(@ParamCommonEntityName
,@VarCommonEntityGroupID);
SET @ParamCommonEntityID = SCOPE_IDENTITY();
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
-- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
-- error is indeed due to unique index violation and retry
END CATCH;
SELECT @ParamCommonEntityID = ID
FROM CommonEntity
WHERE
Name = @ParamCommonEntityName
AND CommonEntityGroupId = @VarCommonEntityGroupID
;
END;
END
在这两种方法中,您都应该有重试逻辑。当您希望名称已经在实体 table 中并且重试的可能性很低(如问题中描述的情况)时,乐观的方法通常会更好。当您预计会有很多竞争进程尝试插入相同的名称时,悲观的方法通常会更好。如果您序列化插入,您可能会过得更好。