Table 值参数插入表现不佳

Table-valued parameter insert performing poorly

我实施了 TVP+SP 插入策略,因为我需要插入大量行(可能同时),同时能够在 return 中获取一些信息,例如 Id 和其他东西。最初我使用 EF 代码优先方法来生成数据库结构。我的实体:

设施组

public class FacilityGroup
{
    public int Id { get; set; }

    [Required]
    public string Name { get; set; }

    public string InternalNotes { get; set; }

    public virtual List<FacilityInstance> Facilities { get; set; } = new List<FacilityInstance>();
}

设施实例

public class FacilityInstance
{
    public int Id { get; set; }

    [Required]
    [Index("IX_FacilityName")]
    [StringLength(450)]
    public string Name { get; set; }

    [Required]
    public string FacilityCode { get; set; }

    //[Required]
    public virtual FacilityGroup FacilityGroup { get; set; }

    [ForeignKey(nameof(FacilityGroup))]
    [Index("IX_FacilityGroupId")]
    public int FacilityGroupId { get; set; }

    public virtual List<DataBatch> RelatedBatches { get; set; } = new List<DataBatch>();

    public virtual HashSet<BatchRecord> BatchRecords { get; set; } = new HashSet<BatchRecord>();
}

批处理记录

public class BatchRecord
{
    public long Id { get; set; }

    //todo index?
    public string ItemName { get; set; }

    [Index("IX_Supplier")]
    [StringLength(450)]
    public string Supplier { get; set; }

    public decimal Quantity { get; set; }

    public string ItemUnit { get; set; }

    public string EntityUnit { get; set; }

    public decimal ItemSize { get; set; }

    public decimal PackageSize { get; set; }

    [Index("IX_FamilyCode")]
    [Required]
    [StringLength(4)]
    public string FamilyCode { get; set; }

    [Required]
    public string Family { get; set; }

    [Index("IX_CategoryCode")]
    [Required]
    [StringLength(16)]
    public string CategoryCode { get; set; }

    [Required]
    public string Category { get; set; }

    [Index("IX_SubCategoryCode")]
    [Required]
    [StringLength(16)]
    public string SubCategoryCode { get; set; }

    [Required]
    public string SubCategory { get; set; }

    public string ItemGroupCode { get; set; }

    public string ItemGroup { get; set; }

    public decimal PurchaseValue { get; set; }

    public decimal UnitPurchaseValue { get; set; }

    public decimal PackagePurchaseValue { get; set; }

    [Required]
    public virtual DataBatch DataBatch { get; set; }

    [ForeignKey(nameof(DataBatch))]
    public int DataBatchId { get; set; }

    [Required]
    public virtual FacilityInstance FacilityInstance { get; set; }

    [ForeignKey(nameof(FacilityInstance))]
    [Index("IX_FacilityInstance")]
    public int FacilityInstanceId { get; set; }

    [Required]
    public virtual Currency Currency { get; set; }

    [ForeignKey(nameof(Currency))]
    public int CurrencyId { get; set; }
}

DataBatch

public class DataBatch
{
    public int Id { get; set; }

    [Required]
    public string Name { get; set; }

    public DateTime DateCreated { get; set; }

    public BatchStatus BatchStatus { get; set; }

    public virtual List<FacilityInstance> RelatedFacilities { get; set; } = new List<FacilityInstance>();

    public virtual HashSet<BatchRecord> BatchRecords { get; set; } = new HashSet<BatchRecord>();
}

然后是我的SQL服务器相关代码,TVP结构:

CREATE TYPE dbo.RecordImportStructure 
AS TABLE (
ItemName VARCHAR(MAX),
Supplier VARCHAR(MAX),
Quantity DECIMAL(18, 2),
ItemUnit VARCHAR(MAX),
EntityUnit VARCHAR(MAX),
ItemSize DECIMAL(18, 2),
PackageSize DECIMAL(18, 2),
FamilyCode VARCHAR(4),
Family VARCHAR(MAX),
CategoryCode VARCHAR(MAX),
Category VARCHAR(MAX),
SubCategoryCode VARCHAR(MAX),
SubCategory VARCHAR(MAX),
ItemGroupCode VARCHAR(MAX),
ItemGroup VARCHAR(MAX),
PurchaseValue DECIMAL(18, 2),
UnitPurchaseValue DECIMAL(18, 2),
PackagePurchaseValue DECIMAL(18, 2),
FacilityCode VARCHAR(MAX),
CurrencyCode VARCHAR(MAX)
);

插入存储过程:

CREATE PROCEDURE dbo.ImportBatchRecords (
    @BatchId INT,
    @ImportTable dbo.RecordImportStructure READONLY
)
AS
SET NOCOUNT ON;

DECLARE     @ErrorCode  int  
DECLARE     @Step  varchar(200)

--Clear old stuff?
--TRUNCATE TABLE dbo.BatchRecords; 

INSERT INTO dbo.BatchRecords (
    ItemName,
    Supplier,
    Quantity,
    ItemUnit,
    EntityUnit,
    ItemSize,
    PackageSize,
    FamilyCode,
    Family,
    CategoryCode,
    Category,
    SubCategoryCode,
    SubCategory,
    ItemGroupCode,
    ItemGroup,
    PurchaseValue,
    UnitPurchaseValue,
    PackagePurchaseValue,
    DataBatchId,
    FacilityInstanceId,
    CurrencyId
)
    OUTPUT INSERTED.Id
    SELECT
    ItemName,
    Supplier,
    Quantity,
    ItemUnit,
    EntityUnit,
    ItemSize,
    PackageSize,
    FamilyCode,
    Family,
    CategoryCode,
    Category,
    SubCategoryCode,
    SubCategory,
    ItemGroupCode,
    ItemGroup,
    PurchaseValue,
    UnitPurchaseValue,
    PackagePurchaseValue,
    @BatchId,
    --FacilityInstanceId,
    --CurrencyId
    (SELECT TOP 1 f.Id from dbo.FacilityInstances f WHERE f.FacilityCode=FacilityCode),
    (SELECT TOP 1 c.Id from dbo.Currencies c WHERE c.CurrencyCode=CurrencyCode) 
    FROM    @ImportTable;

最后是我在 .NET 端执行这些东西的快速、仅测试解决方案。

public class BatchRecordDataHandler : IBulkDataHandler<BatchRecordImportItem>
{
    public async Task<int> ImportAsync(SqlConnection conn, SqlTransaction transaction, IEnumerable<BatchRecordImportItem> src)
    {
        using (var cmd = new SqlCommand())
        {
            cmd.CommandText = "ImportBatchRecords";
            cmd.Connection = conn;
            cmd.Transaction = transaction;
            cmd.CommandType = CommandType.StoredProcedure;
            cmd.CommandTimeout = 600;

            var batchIdParam = new SqlParameter
            {
                ParameterName = "@BatchId",
                SqlDbType = SqlDbType.Int,
                Value = 1
            };

            var tableParam = new SqlParameter
            {
                ParameterName = "@ImportTable",
                TypeName = "dbo.RecordImportStructure",
                SqlDbType = SqlDbType.Structured,
                Value = DataToSqlRecords(src)
            };

            cmd.Parameters.Add(batchIdParam);
            cmd.Parameters.Add(tableParam);

            cmd.Transaction = transaction;

            using (var res = await cmd.ExecuteReaderAsync())
            {
                var resultTable = new DataTable();
                resultTable.Load(res);

                var cnt = resultTable.AsEnumerable().Count();

                return cnt;
            }
        }
    }

    private IEnumerable<SqlDataRecord> DataToSqlRecords(IEnumerable<BatchRecordImportItem> src)
    {
        var tvpSchema = new[] {
            new SqlMetaData("ItemName", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("Supplier", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("Quantity", SqlDbType.Decimal),
            new SqlMetaData("ItemUnit", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("EntityUnit", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("ItemSize", SqlDbType.Decimal),
            new SqlMetaData("PackageSize", SqlDbType.Decimal),
            new SqlMetaData("FamilyCode", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("Family", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("CategoryCode", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("Category", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("SubCategoryCode", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("SubCategory", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("ItemGroupCode", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("ItemGroup", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("PurchaseValue", SqlDbType.Decimal),
            new SqlMetaData("UnitPurchaseValue", SqlDbType.Decimal),
            new SqlMetaData("PackagePurchaseValue", SqlDbType.Decimal),
            new SqlMetaData("FacilityInstanceId", SqlDbType.VarChar, SqlMetaData.Max),
            new SqlMetaData("CurrencyId", SqlDbType.VarChar, SqlMetaData.Max),
        };

        var dataRecord = new SqlDataRecord(tvpSchema);

        foreach (var importItem in src)
        {
            dataRecord.SetValues(importItem.ItemName,
                importItem.Supplier,
                importItem.Quantity,
                importItem.ItemUnit,
                importItem.EntityUnit,
                importItem.ItemSize,
                importItem.PackageSize,
                importItem.FamilyCode,
                importItem.Family,
                importItem.CategoryCode,
                importItem.Category,
                importItem.SubCategoryCode,
                importItem.SubCategory,
                importItem.ItemGroupCode,
                importItem.ItemGroup,
                importItem.PurchaseValue,
                importItem.UnitPurchaseValue,
                importItem.PackagePurchaseValue,
                importItem.FacilityCode,
                importItem.CurrencyCode);

            yield return dataRecord;
        }
    }
}

导入实体结构:

public class BatchRecordImportItem
{
    public string ItemName { get; set; }

    public string Supplier { get; set; }

    public decimal Quantity { get; set; }

    public string ItemUnit { get; set; }

    public string EntityUnit { get; set; }

    public decimal ItemSize { get; set; }

    public decimal PackageSize { get; set; }

    public string FamilyCode { get; set; }

    public string Family { get; set; }

    public string CategoryCode { get; set; }

    public string Category { get; set; }

    public string SubCategoryCode { get; set; }

    public string SubCategory { get; set; }

    public string ItemGroupCode { get; set; }

    public string ItemGroup { get; set; }

    public decimal PurchaseValue { get; set; }

    public decimal UnitPurchaseValue { get; set; }

    public decimal PackagePurchaseValue { get; set; }

    public int DataBatchId { get; set; }

    public string FacilityCode { get; set; }

    public string CurrencyCode { get; set; }
}

请不要介意最后没用的reader,真的没什么用。因此,如果没有 reader 插入 2.5kk 行大约需要 26 分钟,而 SqlBulkCopy 大约需要 6+- 分钟。我做错了什么吗?如果这很重要,我正在使用 IsolationLevel.Snapshot。使用SQL Server 2014,可自由更改数据库结构和索引。

UPD 1


完成了@Xedni 描述的几次 adjustments/improvement 尝试,特别是:

  1. 将所有没有最大长度的字符串字段限制为某个固定长度
  2. 将所有 TVP 成员从 VARCHAR(MAX) 更改为 VARCHAR(*SomeValue*)
  3. 向 FacilityInstance->FacilityCode 添加了唯一索引
  4. 为 Curreency->CurrencyCode 添加了唯一索引
  5. 尝试将 WITH RECOMPILE 添加到我的 SP
  6. 尝试使用 DataTable 而不是 IEnumerable<SqlDataRecord>
  7. 尝试将数据分批放入更小的桶中,每次 SP 执行 50k 和 100k,而不是 2.5kk

我现在的结构是这样的:

CREATE TYPE dbo.RecordImportStructure 
AS TABLE (
ItemName VARCHAR(4096),
Supplier VARCHAR(450),
Quantity DECIMAL(18, 2),
ItemUnit VARCHAR(2048),
EntityUnit VARCHAR(2048),
ItemSize DECIMAL(18, 2),
PackageSize DECIMAL(18, 2),
FamilyCode VARCHAR(16),
Family VARCHAR(512),
CategoryCode VARCHAR(16),
Category VARCHAR(512),
SubCategoryCode VARCHAR(16),
SubCategory VARCHAR(512),
ItemGroupCode VARCHAR(16),
ItemGroup VARCHAR(512),
PurchaseValue DECIMAL(18, 2),
UnitPurchaseValue DECIMAL(18, 2),
PackagePurchaseValue DECIMAL(18, 2),
FacilityCode VARCHAR(450),
CurrencyCode VARCHAR(4)
);

不幸的是,到目前为止没有明显的性能提升,和以前一样是 26-28 分钟


UPD 2
检查了执行计划——指数是我的祸根?


UPD 3
在我的 SP 末尾添加了 OPTION (RECOMPILE);,获得了轻微的提升,现在位于 ~25m 处 2.5kk

我想你的过程可能需要一些爱。没有看到执行计划很难确定,但这里有一些想法。

table 变量(table-valued-parameter 本质上是变量)总是被 SQL 服务器假定只包含 1 行(即使它不包含)。这在许多情况下是无关紧要的,但是您的插入列表中有两个相关的子查询,这是我关注的地方。由于基数估计,很可能会用一堆嵌套循环连接来锤击那个可怜的 table 变量。我会考虑将您的 TVP 中的行放入临时 table,使用 FacilityInstancesCurrencies 中的 ID 更新临时 table,然后从中进行最后的插入。

嗯...为什么不直接使用 SQL 批量复制? 有很多解决方案可以帮助您将实体集合转换为可以直接传递给 SqlBulkCopy 的 IDataReader 对象。

这是一个好的开始...

https://github.com/matthewschrager/Repository/blob/master/Repository.EntityFramework/EntityDataReader.cs

那么就变得很简单了...

SqlBulkCopy bulkCopy = new SqlBulkCopy(connection);
IDataReader dataReader = storeEntities.AsDataReader();
bulkCopy.WriteToServer(dataReader);

我使用过这段代码,需要注意的是您需要非常小心实体的定义。实体中属性的顺序决定了 IDataReader 公开的列的顺序,这需要与您要批量复制到的 table 中的列顺序相关联。

或者这里还有其他代码..

https://www.codeproject.com/Tips/1114089/Entity-Framework-Performance-Tuning-Using-SqlBulkC

你可以设置traceflag 2453:

FIX: Poor performance when you use table variables in SQL Server 2012 or SQL Server 2014

When you use a table variable in a batch or procedure, the query is compiled and optimized for the initial empty state of table variable. If this table variable is populated with many rows at runtime, the pre-compiled query plan may no longer be optimal. For example, the query may be joining a table variable with nested loop since it is usually more efficient for small number of rows. This query plan can be inefficient if the table variable has millions of rows. A hash join may be a better choice under such condition. To get a new query plan, it needs to be recompiled. Unlike other user or temporary tables, however, row count change in a table variable does not trigger a query recompile. Typically, you can work around this with OPTION (RECOMPILE), which has its own overhead cost. The trace flag 2453 allows the benefit of query recompile without OPTION (RECOMPILE). This trace flag differs from OPTION (RECOMPILE) in two main aspects. (1) It uses the same row count threshold as other tables. The query does not need to be compiled for every execution unlike OPTION (RECOMPILE). It would trigger recompile only when the row count change exceeds the predefined threshold. (2) OPTION (RECOMPILE) forces the query to peek parameters and optimize the query for them. This trace flag does not force parameter peeking.

You can turn on trace flag 2453 to allow a table variable to trigger recompile when enough number of rows are changed. This may allow the query optimizer to choose a more efficient plan

尝试使用以下存储过程:

CREATE PROCEDURE dbo.ImportBatchRecords (
    @BatchId INT,
    @ImportTable dbo.RecordImportStructure READONLY
)
AS
    SET NOCOUNT ON;

    DECLARE     @ErrorCode  int  
    DECLARE     @Step  varchar(200)


    CREATE TABLE #FacilityInstances
    (
        Id int NOT NULL,
        FacilityCode varchar(512) NOT NULL UNIQUE WITH (IGNORE_DUP_KEY=ON)
    );

    CREATE TABLE #Currencies
    (
        Id int NOT NULL,
        CurrencyCode varchar(512) NOT NULL UNIQUE WITH (IGNORE_DUP_KEY = ON)
    )

    INSERT INTO #FacilityInstances(Id, FacilityCode)
    SELECT Id, FacilityCode FROM dbo.FacilityInstances
    WHERE FacilityCode IS NOT NULL AND Id IS NOT NULL;

    INSERT INTO #Currencies(Id, CurrencyCode)
    SELECT Id, CurrencyCode FROM dbo.Currencies
    WHERE CurrencyCode IS NOT NULL AND Id IS NOT NULL


    INSERT INTO dbo.BatchRecords (
        ItemName,
        Supplier,
        Quantity,
        ItemUnit,
        EntityUnit,
        ItemSize,
        PackageSize,
        FamilyCode,
        Family,
        CategoryCode,
        Category,
        SubCategoryCode,
        SubCategory,
        ItemGroupCode,
        ItemGroup,
        PurchaseValue,
        UnitPurchaseValue,
        PackagePurchaseValue,
        DataBatchId,
        FacilityInstanceId,
        CurrencyId
    )
    OUTPUT INSERTED.Id
    SELECT
        ItemName,
        Supplier,
        Quantity,
        ItemUnit,
        EntityUnit,
        ItemSize,
        PackageSize,
        FamilyCode,
        Family,
        CategoryCode,
        Category,
        SubCategoryCode,
        SubCategory,
        ItemGroupCode,
        ItemGroup,
        PurchaseValue,
        UnitPurchaseValue,
        PackagePurchaseValue,
        @BatchId,
        F.Id,
        C.Id
    FROM   
        #FacilityInstances F RIGHT OUTER HASH JOIN 
        (
            #Currencies C 
            RIGHT OUTER HASH JOIN @ImportTable IT 
                ON C.CurrencyCode = IT.CurrencyCode
        )
        ON F.FacilityCode = IT.FacilityCode

这会强制执行计划使用散列匹配连接而不是嵌套循环。我认为性能不佳的罪魁祸首是第一个嵌套循环,它对 @ImportTable

中的每一行执行索引扫描

我不知道 CurrencyCodeCurrencies table 中是否是唯一的,所以我创建了具有唯一货币代码的时间 table #Currencies。

我不知道 FacilityCodeFacilities table 中是否是唯一的,所以我创建了具有唯一设施代码的临时 table #FacilityInstances。

如果它们是唯一的,则不需要临时 tables,您可以直接使用永久 tables。

假设 CurrencyCode 和 FacilityCode 是唯一的,以下存储过程会更好,因为它不会创建不必要的临时 tables:

CREATE PROCEDURE dbo.ImportBatchRecords (
    @BatchId INT,
    @ImportTable dbo.RecordImportStructure READONLY
)
AS
    SET NOCOUNT ON;

    DECLARE     @ErrorCode  int  
    DECLARE     @Step  varchar(200)



    INSERT INTO dbo.BatchRecords (
        ItemName,
        Supplier,
        Quantity,
        ItemUnit,
        EntityUnit,
        ItemSize,
        PackageSize,
        FamilyCode,
        Family,
        CategoryCode,
        Category,
        SubCategoryCode,
        SubCategory,
        ItemGroupCode,
        ItemGroup,
        PurchaseValue,
        UnitPurchaseValue,
        PackagePurchaseValue,
        DataBatchId,
        FacilityInstanceId,
        CurrencyId
    )
    OUTPUT INSERTED.Id
    SELECT
        ItemName,
        Supplier,
        Quantity,
        ItemUnit,
        EntityUnit,
        ItemSize,
        PackageSize,
        FamilyCode,
        Family,
        CategoryCode,
        Category,
        SubCategoryCode,
        SubCategory,
        ItemGroupCode,
        ItemGroup,
        PurchaseValue,
        UnitPurchaseValue,
        PackagePurchaseValue,
        @BatchId,
        F.Id,
        C.Id
    FROM   
        dbo.FacilityInstances F RIGHT OUTER HASH JOIN 
        (
            dbo.Currencies C 
            RIGHT OUTER HASH JOIN @ImportTable IT 
                ON C.CurrencyCode = IT.CurrencyCode
        )
        ON F.FacilityCode = IT.FacilityCode

我知道有一个可以接受的答案,但我无法抗拒。我相信你可以比接受的答案提高 20-50% 的性能。

关键是直接SqlBulkCopy到最后的tabledbo.BatchRecords

要做到这一点,您需要 FacilityInstanceIdCurrencyId,然后才能 SqlBulkCopy。要获取它们,请将 SELECT Id, FacilityCode FROM FacilityIntancesSELECT Id, CurrencyCode FROM Currencies 加载到 collection 中,然后构建字典:

var facilityIdByFacilityCode = facilitiesCollection.ToDictionary(x => x.FacilityCode, x => x.Id);
var currencyIdByCurrencyCode = currenciesCollection.ToDictionnary(x => x.CurrencyCode, x => x.Id);

一旦你有了字典,从代码中获取 id 就是恒定的时间成本。这与 SQL 服务器中的 HASH MATCH JOIN 等效且非常相似,但在客户端。

您需要拆除的另一个障碍是在 dbo.BatchRecords table 中获取新插入行的 Id 列。实际上你能在插入它们之前得到 Ids 吗?

使 Id 列 "sequence driven":

CREATE SEQUENCE BatchRecords_Id_Seq START WITH 1;
CREATE TABLE BatchRecords
(
   Id int NOT NULL CONSTRAINT DF_BatchRecords_Id DEFAULT (NEXT VALUE FOR BatchRecords_Id_Seq), 

 .....

   CONSTRAINT PK_BatchRecords PRIMARY KEY (Id)

)

一个你有BatchRecords collection,你知道里面有多少条记录。然后,您可以保留一系列连续的序列。执行以下T-SQL:

DECLARE @BatchCollectionCount int = 2500 -- Replace with the actual value
DECLARE @range_first_value sql_variant
DECLARE @range_last_value sql_variant

EXEC sp_sequence_get_range
     @sequence_name =  N'BatchRecords_Id_Seq', 
     @range_size =  @BatchCollectionCount,
     @range_first_value = @range_first_value OUTPUT, 
     @range_last_value = @range_last_value OUTPUT

SELECT 
    CAST(@range_first_value AS INT) AS range_first_value, 
    CAST(@range_last_value AS int) as range_last_value

这returnsrange_first_valuerange_last_value。您现在可以将 BatchRecord.Id 分配给每条记录:

int id = range_first_value;
foreach (var record in batchRecords)
{
   record.Id = id++;
} 

接下来可以SqlBulkCopy批记录collection直接进入最终tabledbo.BatchRecords.

要从 IEnumerable<T> 获取 DataReader 以馈送 SqlBulkCopy.WriteToServer you can use code like this,这是我开发的微型 ORM EntityLite 的一部分。

如果缓存 facilityIdByFacilityCodecurrencyIdByCurrencyCode,您可以使速度更快。为确保这些词典是最新的,您可以使用 SqlDependencyor techniques like this one.