Dapper 批量插入返回序列号

Dapper Bulk Insert Returning Serial IDs

我正在尝试使用 Dapper 通过 Npgsql 执行批量插入,returns 新插入行的 ID。我的两个示例中都使用了以下插入语句:

var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";

首先,我尝试添加一个对象数组 "Value" 属性:

var values = new[] {
    new { Value = 0.0 },
    new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);

但是,NpgsqlException 失败:"ERROR: 42703: column "值“不存在”。看完this question,我想也许我必须传递一个DataTable对象而不是对象数组:

var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);

但是,此操作失败并出现完全相同的异常。我如何执行批量插入并通过 Npgsql 从 Dapper 中获取生成的序列号?

我确实注意到异常的大小写与列名不匹配,但我确定我在 table 和列名周围有引号,所以我不确定为什么它说 "value" 而不是异常中的 "Value" 。只是想我会提到它以防它与错误有关,因为它很容易忽略大小写。

-- 编辑--

澄清一下,这是 SQL 创建 table

CREATE TABLE "MyTable" (
    "ID" SERIAL PRIMARY KEY,
    "Value" DOUBLE PRECISION NOT NULL
);

并使用上面定义的变量 "query" 和 "values",这是在每行基础上工作的代码:

var ids = new List<int>();
foreach (var valueObj in values) {
    var queryParams = new DynamicParamaters();
    queryParams.Add("Value", valueObj.Value);
    ids.AddRange(connection.Query<int>(query, queryParams));
}

问题是我需要能够每秒向"MyTable"插入数百行(在不久的将来可能是数千行),所以等待这个循环迭代地将每个值发送到数据库是繁琐且(我假设,但尚未进行基准测试)耗时。此外,我对可能会或可能不会导致额外插入的值执行额外的计算,我需要对 "MyTable" 条目的外键引用。

由于这些问题,我正在寻找一种替代方法,将单个语句中的所有值发送到数据库,以减少网络流量和处理延迟。同样,我还没有对迭代方法进行基准测试……我正在寻找的是一种执行批量插入的替代方法,这样我就可以将这两种方法相互进行基准测试。

最终,我想出了四种不同的方法来解决这个问题。我生成了 500 个随机值以插入到 MyTable 中,并对四种方法中的每一种进行计时(包括启动和回滚它所在的事务 运行)。在我的测试中,数据库位于本地主机上。然而,具有最佳性能的解决方案也只需要往返数据库服务器一次,因此我发现的最佳解决方案在部署到与数据库不同的服务器时仍应优于其他方案。

注意变量connectiontransaction在下面的代码中被使用,并被假定为有效的Npgsql数据对象。另请注意,符号 Nx slower 表示操作花费的时间等于最优解乘以 N.

方法 #1(1,494 毫秒 = 慢 18.7 倍):将数组展开为单独的参数

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES ");

    // Create the dictionary used to store the query parameters
    var queryParams = new DynamicParameters();

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add a unique parameter for each id
    var paramIdx = 0;
    foreach (var entry in result)
    {
        var paramName = string.Format("value{1:D6}", paramIdx);
        if (0 < paramIdx++) query.Append(',');
        query.AppendFormat("(:{0})", paramName);
        queryParams.Add(paramName, entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, queryParams, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

我真的不确定为什么这是最慢的,因为它只需要一次数据库往返,但确实如此。

方法 #2(267 毫秒 = 慢 3.3 倍): 标准循环迭代

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        entry.ID = connection.Query<int>(
            query, queryParams, transaction);
    }

    // Return the result
    return result;
}

令我震惊的是,这仅比最佳解决方案慢 3.3 倍,但我预计在真实环境中情况会变得更糟,因为此解决方案需要向服务器串行发送 500 条消息。然而,这也是最简单的解决方案。

方法 #3(223 毫秒 = 慢 2.8 倍): 异步循环迭代

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database asynchronously
    var taskList = new List<Task<IEnumerable<int>>>();
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        taskList.Add(connection.QueryAsync<int>(
            query, queryParams, transaction));
    }

    // Now that all queries have been sent, start reading the results
    for (var i = 0; i < result.Count; ++i)
    {
        result[i].ID = taskList[i].Result.First();
    }

    // Return the result
    return result;
}

这越来越好,但仍未达到最佳状态,因为我们只能将与线程池中可用线程一样多的插入排队。然而,这几乎和非线程方法一样简单,因此它是速度和可读性之间的一个很好的折衷。

方法 #4(134 毫秒 = 慢 1.7 倍): 批量插入

此方法需要在 运行 其下方的代码段之前定义以下 Postgres SQL:

CREATE TYPE "MyTableType" AS (
    "Value" DOUBLE PRECISION
);

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
            'VALUES () RETURNING "ID"';
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY EXECUTE insertCmd USING entry."Value";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

以及关联代码:

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

我对这种方法有两个问题。首先是我必须对 MyTableType 成员的排序进行硬编码。如果该顺序发生变化,我必须修改此代码以匹配。第二个是我必须在将所有输入值发送到 postgres 之前将它们转换为字符串(在实际代码中,我有不止一列,所以我不能只更改数据库的签名函数采用双精度[],除非我传入 N 个数组,其中 N 是 MyTableType 上的字段数)。

尽管存在这些缺陷,这已接近理想状态,并且只需要往返数据库一次。

-- 开始编辑 --

自最初 post 以来,我提出了另外四种方法,它们都比上面列出的方法更快。我修改了 Nx slower 数字以反映新的最快方法,如下所示。

方法 #5(105 毫秒 = 慢 1.3 倍): 与 #4 相同,没有动态查询

此方法与方法 #4 的唯一区别是对 "InsertIntoMyTable" 函数的以下更改:

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY INSERT INTO "MyTable" ("Value")
                VALUES (entry."Value") RETURNING "ID";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

除了 方法 #4 的问题外,这样做的缺点是,在生产环境中,"MyTable" 是分区的。使用这种方法,每个目标分区需要一种方法。

方法 #6(89ms = 慢 1.1 倍): 插入带有数组参数的语句

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
            "UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

唯一的缺点与 方法 #4 的第一个问题相同。也就是说,它将实现与 "MyTableType" 的顺序相结合。不过,我发现这是我第二喜欢的方法,因为它非常快,并且不需要任何数据库函数即可正常工作。

方法 #7(80 毫秒 = 非常慢): 与 #1 相同,但没有参数

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES");

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each row directly into the insert statement
    for (var i = 0; i < result.Count; ++i)
    {
        entry = result[i];
        query.Append(i == 0 ? ' ' : ',');
        query.AppendFormat("({0:E16})", entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, null, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

这是我最喜欢的方法。它只比最快的慢一点(即使有 4000 条记录,它仍然 运行s 在 1 秒内),但不需要特殊的数据库函数或类型。我唯一不喜欢的是我必须对双精度值进行字符串化,只能由 Postgres 再次解析。最好以二进制形式发送值,这样它们会占用 8 个字节,而不是我为它们分配的大量 20 个左右的字节。

方法 #8(80 毫秒): 与 #5 相同,但纯 sql

此方法与方法 #5 的唯一区别是对 "InsertIntoMyTable" 函数的以下更改:

CREATE FUNCTION "InsertIntoMyTable"(
    entries "MyTableType"[]) RETURNS SETOF INT AS $$

    INSERT INTO "MyTable" ("Value")
        SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;

这种方法与 #5 一样,每个 "MyTable" 分区需要一个函数。这是最快的,因为可以为每个函数生成一次查询计划,然后再使用。在其他方法中,查询必须被解析,然后计划,然后执行。尽管这是最快的,但我没有选择它,因为在数据库方面比 方法 #7 有额外的要求,而且速度优势很小。