使用乐观并发方法处理多个生产者插入唯一 "immutable" 实体的有效方法是什么?

What is an efficient way to handle inserts of unique "immutable" entities by multiple producers with optimistic concurrency approach?

假设一个系统有多个并发生产者,每个生产者都努力保存一些 objects 的图表,其中包含以下可通过其名称唯一标识的公共实体:

CREATE TABLE CommonEntityGroup(
    Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
    Name NVARCHAR(100) NOT NULL
);
GO

CREATE UNIQUE INDEX IX_CommonEntityGroup_Name 
    ON CommonEntityGroup(Name)
GO


CREATE TABLE CommonEntity(
    Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
    Name NVARCHAR(100) NOT NULL,
    CommonEntityGroupId INT NOT NULL,
    CONSTRAINT FK_CommonEntity_CommonEntityGroup FOREIGN KEY(CommonEntityGroupId) 
        REFERENCES CommonEntityGroup(Id)
);
GO

CREATE UNIQUE INDEX IX_CommonEntity_CommonEntityGroupId_Name 
    ON CommonEntity(CommonEntityGroupId, Name)
GO

例如,生产者A节省了一些CommonEntityMeeting,而生产者B节省了CommonEntitySet。他们中的任何一个都必须坚持 CommonEntity 与他们的特定项目相关。

基本上,重点是:

因此,Entity Framework(数据库优先,虽然它可能并不重要)作为 DAL 和 SQL 服务器作为存储是一种有效且可靠的方法来确保所有这些生产者将成功地同时保留他们相交的 object 图表?

考虑到UNIQUE INDEX已经确保不会重复CommonEntities(Name,GroupName对是唯一的)我可以看到以下解决方案:

  1. 确保每个 CommonEntity/CommonGroupEntity 是 found/created+SaveChanged(),然后再构建 object 的图形的其余部分。

在这种情况下,当为相关实体调用 SaveChanges 时,不会有任何索引违规,因为其他生产者之前创建了相同的实体。

为了实现它我会有一些

public class CommonEntityGroupRepository // sort of
{
    public CommonEntityGroupRepository(EntitiesDbContext db) ...

    // CommonEntityRepository will use this class/method internally to create parent CommonEntityGroup.
    public CommonEntityGroup FindOrCreateAndSave(String groupName)
    {
        return
            this.TryFind(groupName) ?? // db.FirstOrDefault(...)
            this.CreateAndSave(groupName);
    }

    private CommonEntityGroup CreateAndSave(String groupName)
    {
        var group = this.Db.CommonEntityGroups.Create();
        group.Name = groupName;
        this.Db.CommonGroups.Add(group)

        try
        {
            this.Db.SaveChanges();
            return group;
        }
        catch (DbUpdateException dbExc)
        {
            // Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)
            return this.Find(groupName); // TryFind that throws exception.
        }
    }
}

使用这种方法,将多次调用 SaveChanges,并且每个 CommonEntity 都有自己的存储库,尽管这似乎是最可靠的解决方案。

  1. 如果发生索引违规,只需创建整个图并从头开始重建它

有点丑陋和低效(10 CommonEntities 我们可能需要重试 10 次),但简单且或多或少可靠。

  1. 只需创建整个图并在发生索引违规时替换重复的条目

不确定是否有一种简单可靠的方法来替换或多或少复杂的 object 图中的重复条目,尽管可以实施特定案例和更通用的 reflection-based 解决方案。

不过,与之前的解决方案一样,它可能需要多次重试。

  1. 尝试将此逻辑移动到数据库 (SP)

怀疑在存储过程中处理起来会更容易。它将与刚刚在数据库端实施的乐观或悲观方法相同。

虽然它可以提供更好的性能(在这种情况下不是问题)并将插入逻辑放在一个公共位置。

  1. Using SERIALIZABLE isolation level/TABLOCKX+SERIALIZABLE table hint in Stored Procedure - 它肯定可以工作,但我不想锁定 tables 完全超出了实际需要,因为真正的比赛非常罕见。正如标题中已经提到的,我想找到一些乐观的并发方法。

我可能会尝试第一个解决方案,但也许有更好的选择或一些潜在的陷阱。

选择方法当然取决于两个过程将使用的功能类型和数据量。

如果我们采用第一种方法,那么肯定对于每个 SaveChanges() 调用,Entity Framework 都会放置一个事务。如果有大量记录,这可能会降低性能。

如果有相当数量的记录需要inserted/updated,那么我肯定会选择基于存储过程的方法。使用这种方法,您将完全控制数据库并查询记录以检查它是否存在将非常容易(尽管此处可能需要进行一些微调)。我看不出用存储过程实现相同的功能是否会有任何挑战。通过一些实现优化,例如将数据加载到临时表(不是 SQL 临时表,而是可用于临时存储数据的物理表),这可以进一步增强以包含存储过程已处理的完整信息日志。

根据您最后的关键点,另一个解决方案是将您的 "Creation" 逻辑移动到中央应用程序 server/service (参见更新 2)队列用户可以使用 "add" 条记录。

因为你的大部分记录已经存在,如果你使用某种缓存,你应该能够使这非常有效

现在,大约有一个记录。
您必须记住,EF 并非旨在支持 "bulk" 操作,因此,创建数千条记录将(真的)很慢。

我已经使用了 2 种解决方案来帮助您,并且非常快速地获得了大量记录 1)EntityFramework.BulkInsert
2)SqlBulkCopy

两者都非常容易使用

还有,希望你已经看过了Fastest Way of Inserting in Entity Framework

更新
下面是我最近使用过两次的另一个解决方案
不要在用户执行 "Save" 时保存您的记录,而是将其安排在 X 秒后发生。
如果与此同时其他人试图保存相同的记录,只需 "slide" 计划日期。

您可以在下面看到一个示例代码,它尝试将同一条记录保存 10 次(同时),但实际保存只发生一次。

实际结果可以在这里看到:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApplicationScheduler
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentSaveService service = new ConcurrentSaveService();
            int entity = 1;
            for (int i = 0; i < 10; i++)
            {
                //Save the same record 10 times(this could be conrurrent)
                service.BeginSave(entity);
            }

            Console.ReadLine();
        }
    }

    public class ConcurrentSaveService
    {
        private static readonly ConcurrentDictionary<int, DateTime> _trackedSubjectsDictionary = new ConcurrentDictionary<int, DateTime>();

        private readonly int _delayInSeconds;

        public ConcurrentSaveService()
        {
            _delayInSeconds = 5;
        }

        public async void BeginSave(int key)
        {
            Console.WriteLine("Started Saving");
            DateTime existingTaskDate;
            _trackedSubjectsDictionary.TryGetValue(key, out existingTaskDate);

            DateTime scheduledDate = DateTime.Now.AddSeconds(_delayInSeconds);
            _trackedSubjectsDictionary.AddOrUpdate(key, scheduledDate, (i, d) => scheduledDate);

            if (existingTaskDate > DateTime.Now)
                return;

            do
            {
                await Task.Delay(TimeSpan.FromSeconds(_delayInSeconds));

                DateTime loadedScheduledDate;
                _trackedSubjectsDictionary.TryGetValue(key, out loadedScheduledDate);
                if (loadedScheduledDate > DateTime.Now)
                    continue;

                if (loadedScheduledDate == DateTime.MinValue)
                    break;

                _trackedSubjectsDictionary.TryRemove(key, out loadedScheduledDate);

                if (loadedScheduledDate > DateTime.MinValue)
                {
                    //DoWork
                    Console.WriteLine("Update/Insert record:" + key);
                }

                break;
            } while (true);

            Console.WriteLine("Finished Saving");
        }
    }
}

更新 2 由于您可以在 WebAPI 应用程序中控制 "creation" 进程,因此您应该能够使用某种缓存来避免重复,如以下伪代码

using System.Collections.Concurrent;
using System.Web.Http;

namespace WebApplication2.Controllers
{
    public class ValuesController : ApiController
    {
        static object _lock = new object();
        static ConcurrentDictionary<string, object> cache = new ConcurrentDictionary<string, object>();
        public object Post(InputModel value)
        {
            var existing = cache[value.Name];
            if (existing != null)
                return new object();//Your saved record

            lock (_lock)
            {
                existing = cache[value.Name];
                if (existing != null)
                    return new object();//Your saved record

                object newRecord = new object();//Save your Object

                cache.AddOrUpdate(value.Name, newRecord, (s, o) => newRecord);

                return newRecord;
            }
        }
    }

    public class InputModel
    {
        public string Name;
    }
}

Table 值参数

一种选择是使用 table valued parameters 而不是单独调用数据库。

使用 table 值参数的示例过程:

create type dbo.CommonEntity_udt as table (
    CommonEntityGroupId int not null
  , Name      nvarchar(100) not null
  , primary key (CommonEntityGroupId,Name)
    );
go

create procedure dbo.CommonEntity_set (
    @CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
  set nocount on;
  set xact_abort on;
  if exists (
    select 1 
      from @CommonEntity as s
        where not exists (
          select 1 
            from dbo.CommonEntity as t
            where s.Name = t.Name
              and s.CommonEntityGroupId = t.CommonEntityGroupId
            ))
    begin;
      insert dbo.CommonEntity (Name)
        select s.Name
          from @CommonEntity as s
          where not exists (
            select 1 
              from dbo.CommonEntity as t with (updlock, holdlock)
              where s.Name = t.Name
                and s.CommonEntityGroupId = t.CommonEntityGroupId
              );
    end;
end;
go

table取值参数参考:


除非有令人信服的论据,否则我不推荐 merge。这种情况只是看插入,所以看起来有点大材小用。

带有 table 值参数的示例 merge 版本:

create procedure dbo.CommonEntity_merge (
    @CommonEntity dbo.CommonEntity_udt readonly
) as
begin;
  set nocount on;
  set xact_abort on;
  if exists (
    select 1 
      from @CommonEntity as s
        where not exists (
          select 1 
            from dbo.CommonEntity as t
            where s.Name = t.Name
              and s.CommonEntityGroupId = t.CommonEntityGroupId
            ))
    begin;
      merge dbo.CommonEntity with (holdlock) as t
      using (select CommonEntityGroupId, Name from @CommonEntity) as s
      on (t.Name = s.Name
        and s.CommonEntityGroupId = t.CommonEntityGroupId)
      when not matched by target
        then insert (CommonEntityGroupId, Name) 
        values (s.CommonEntityGroupId, s.Name);
    end;
end;
go

merge参考:


ignore_dup_key代码注释:

// Check that it was Name Index violation (perhaps make indices IGNORE_DUP_KEY)

ignore_dup_key is going to use serializable behind the the scenes; potentially costly overhead on non-clustered indexes; and even when the index is clustered, can have significant costs depending on the amount of duplicates.

这可以在存储过程中使用 Sam Saffron's upsert (update/insert) pattern, or one of the patterns shown here: Performance impact of different error handling techniques - Aaron Bertrand 来处理。


Producers do not know/care about IDs of those CommonEntities - they usually just pass DTOs with Names(unique) of those CommonEntities and related information. So any Common(Group)Entity has to be found/created by Name.

我假设 table 存储对象的 CommonEntity 通过它们的 ID,而不是 Name

我假设对象的 table 定义看起来像这样:

CREATE TABLE SomeObject(
    Id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY,
    ObjectName NVARCHAR(100) NOT NULL,
    CommonEntityId INT NOT NULL,
    CONSTRAINT FK_SomeObject_CommonEntity FOREIGN KEY(CommonEntityId) 
        REFERENCES CommonEntity(Id)
);

同时,高级SaveSomeObject函数有CommonEntity.NameCommonEntityGroup.Name(不是ID)作为参数。这意味着函数必须在某处查找实体的 Name 并找到其对应的 ID.

因此,带有参数 (ObjectName, CommonEntityName, CommonEntityGroupName) 的高级 SaveSomeObject 函数可以分两步实现:

CommonEntityID = GetCommonEntityID(CommonEntityName, CommonEntityGroupName);
SaveSomeObject(ObjectName, CommonEntityID);

GetCommonEntityID 是一个帮助程序 function/stored 过程,它通过 Name 查找实体的 ID 并在需要时创建一个实体(生成一个 ID) .

这里我们明确地将这一步提取到一个单独的专用函数中。只有这个函数必须处理并发问题。它可以使用乐观并发方法或悲观并发方法来实现。这个函数的用户并不关心它使用了什么魔法来return有效ID,但是用户可以确定他可以安全地使用returned ID来持久化对象的其余部分。


悲观并发方式

悲观并发的做法很简单。确保只有一个实例 GetCommonEntityID 可以是 运行。我会为此使用 sp_getapplock(而不是 SERIALIZABLE 事务隔离级别或 table 提示)。 sp_getapplock 本质上是一个互斥锁,一旦获得锁,我们就可以确定此存储过程的其他实例不会 运行 并行。这使逻辑变得简单 - 如果未找到,请尝试读取 IDINSERT 新行。

CREATE PROCEDURE [dbo].[GetCommonEntityID]
    @ParamCommonEntityName NVARCHAR(100),
    @ParamCommonEntityGroupName NVARCHAR(100),
    @ParamCommonEntityID int OUTPUT
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY

        SET @ParamCommonEntityID = NULL;
        DECLARE @VarCommonEntityGroupID int = NULL;

        DECLARE @VarLockResult int;
        EXEC @VarLockResult = sp_getapplock
            @Resource = 'GetCommonEntityID_app_lock',
            @LockMode = 'Exclusive',
            @LockOwner = 'Transaction',
            @LockTimeout = 60000,
            @DbPrincipal = 'public';

        IF @VarLockResult >= 0
        BEGIN
            -- Acquired the lock

            SELECT @VarCommonEntityGroupID = ID
            FROM CommonEntityGroup
            WHERE Name = @ParamCommonEntityGroupName;

            IF @VarCommonEntityGroupID IS NULL
            BEGIN
                -- Such name doesn't exist, create it.
                INSERT INTO CommonEntityGroup (Name)
                VALUES (@ParamCommonEntityGroupName);

                SET @VarCommonEntityGroupID = SCOPE_IDENTITY();
            END;

            SELECT @ParamCommonEntityID = ID
            FROM CommonEntity
            WHERE
                Name = @ParamCommonEntityName
                AND CommonEntityGroupId = @VarCommonEntityGroupID
            ;

            IF @ParamCommonEntityID IS NULL
            BEGIN
                -- Such name doesn't exist, create it.
                INSERT INTO CommonEntity
                    (Name
                    ,CommonEntityGroupId)
                VALUES
                    (@ParamCommonEntityName
                    ,@VarCommonEntityGroupID);

                SET @ParamCommonEntityID = SCOPE_IDENTITY();
            END;

        END ELSE BEGIN
            -- TODO: process the error. Retry
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
            -- TODO: process the error. Retry?
    END CATCH;

END

乐观并发方式

不要试图锁定任何东西。乐观行动并查看ID。如果未找到,请尝试 INSERT 新值并在存在唯一索引违规时重试。

CREATE PROCEDURE [dbo].[GetCommonEntityID]
    @ParamCommonEntityName NVARCHAR(100),
    @ParamCommonEntityGroupName NVARCHAR(100),
    @ParamCommonEntityID int OUTPUT
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    SET @ParamCommonEntityID = NULL;
    DECLARE @VarCommonEntityGroupID int = NULL;

    SELECT @VarCommonEntityGroupID = ID
    FROM CommonEntityGroup
    WHERE Name = @ParamCommonEntityGroupName;

    WHILE @VarCommonEntityGroupID IS NULL
    BEGIN
        -- Such name doesn't exist, create it.
        BEGIN TRANSACTION;
        BEGIN TRY

            INSERT INTO CommonEntityGroup (Name)
            VALUES (@ParamCommonEntityGroupName);

            SET @VarCommonEntityGroupID = SCOPE_IDENTITY();

            COMMIT TRANSACTION;
        END TRY
        BEGIN CATCH
            ROLLBACK TRANSACTION;
            -- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
            -- error is indeed due to unique index violation and retry
        END CATCH;

        SELECT @VarCommonEntityGroupID = ID
        FROM CommonEntityGroup
        WHERE Name = @ParamCommonEntityGroupName;

    END;


    SELECT @ParamCommonEntityID = ID
    FROM CommonEntity
    WHERE
        Name = @ParamCommonEntityName
        AND CommonEntityGroupId = @VarCommonEntityGroupID
    ;

    WHILE @ParamCommonEntityID IS NULL
    BEGIN
        -- Such name doesn't exist, create it.
        BEGIN TRANSACTION;
        BEGIN TRY

            INSERT INTO CommonEntity
                (Name
                ,CommonEntityGroupId)
            VALUES
                (@ParamCommonEntityName
                ,@VarCommonEntityGroupID);

            SET @ParamCommonEntityID = SCOPE_IDENTITY();

            COMMIT TRANSACTION;
        END TRY
        BEGIN CATCH
            ROLLBACK TRANSACTION;
            -- TODO: Use ERROR_NUMBER() and ERROR_STATE() to check that
            -- error is indeed due to unique index violation and retry
        END CATCH;

        SELECT @ParamCommonEntityID = ID
        FROM CommonEntity
        WHERE
            Name = @ParamCommonEntityName
            AND CommonEntityGroupId = @VarCommonEntityGroupID
        ;

    END;

END

在这两种方法中,您都应该有重试逻辑。当您希望名称已经在实体 table 中并且重试的可能性很低(如问题中描述的情况)时,乐观的方法通常会更好。当您预计会有很多竞争进程尝试插入相同的名称时,悲观的方法通常会更好。如果您序列化插入,您可能会过得更好。