使用 npgsql 将 iEnumerable 批量插入/复制到 table

Bulk insert / copy iEnumerable into table with npgsql

我有一个接受 IEnumerable 的方法,循环遍历 IEnumerable 并将每个插入到数据库中的 table 中。像这样:

    public void Write(IEnumerable<foo> fooData)
    {
            for (var index = 0; index < fooData.Count(); index++)
            {
               var sql = @"insert into foo (col_id, col_name) values (@col_id, col_name)";
            }
    }

foo 是一个 class 反映了 db:

中的 table
      public class Foo
      {
        public int col_id {get;set;}
        public string col_name {get;set;}
      }

我发现如果我的行有数千个条目,使用 for 循环效率不高。当涉及 IEnumerable 时,复制所有这些数据的更有效方法是什么?

我写了一个 class,其行为类似于 Postgres 的内置 SqlBulkCopy class。它包装了 COPY 命令以提供快速上传。 IEnumerable 的方法如下所示(DataTable 也有类似的方法)。

public void WriteToServer<T>(IEnumerable<T> data)
{
    try
    {
        if (DestinationTableName == null || DestinationTableName == "")
        {
            throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
        }
        PropertyInfo[] properties = typeof(T).GetProperties();
        int colCount = properties.Length;

        NpgsqlDbType[] types = new NpgsqlDbType[colCount];
        int[] lengths = new int[colCount];
        string[] fieldNames = new string[colCount];

        using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
        {
            using (var rdr = cmd.ExecuteReader())
            {
                if (rdr.FieldCount != colCount)
                {
                    throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
                }
                var columns = rdr.GetColumnSchema();
                for (int i = 0; i < colCount; i++)
                {
                    types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
                    lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
                    fieldNames[i] = columns[i].ColumnName;
                }
            }

        }
        var sB = new StringBuilder(fieldNames[0]);
        for (int p = 1; p < colCount; p++)
        {
            sB.Append(", " + fieldNames[p]);
        }
        using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
        {
            foreach (var t in data)
            {
                writer.StartRow();

                for (int i = 0; i < colCount; i++)
                {
                    if (properties[i].GetValue(t) == null)
                    {
                        writer.WriteNull();
                    }
                    else
                    {
                        switch (types[i])
                        {
                            case NpgsqlDbType.Bigint:
                                writer.Write((long)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Bit:
                                if (lengths[i] > 1)
                                {
                                    writer.Write((byte[])properties[i].GetValue(t), types[i]);
                                }
                                else
                                {
                                    writer.Write((byte)properties[i].GetValue(t), types[i]);
                                }
                                break;
                            case NpgsqlDbType.Boolean:
                                writer.Write((bool)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Bytea:
                                writer.Write((byte[])properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Char:
                                if (properties[i].GetType() == typeof(string))
                                {
                                    writer.Write((string)properties[i].GetValue(t), types[i]);
                                }
                                else if (properties[i].GetType() == typeof(Guid))
                                {
                                    var value = properties[i].GetValue(t).ToString();
                                    writer.Write(value, types[i]);
                                }


                                else if (lengths[i] > 1)
                                {
                                    writer.Write((char[])properties[i].GetValue(t), types[i]);
                                }
                                else
                                {

                                    var s = ((string)properties[i].GetValue(t).ToString()).ToCharArray();
                                    writer.Write(s[0], types[i]);
                                }
                                break;
                            case NpgsqlDbType.Time:
                            case NpgsqlDbType.Timestamp:
                            case NpgsqlDbType.TimestampTz:
                            case NpgsqlDbType.Date:
                                writer.Write((DateTime)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Double:
                                writer.Write((double)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Integer:
                                try
                                {
                                    if (properties[i].GetType() == typeof(int))
                                    {
                                        writer.Write((int)properties[i].GetValue(t), types[i]);
                                        break;
                                    }
                                    else if (properties[i].GetType() == typeof(string))
                                    {
                                        var swap = Convert.ToInt32(properties[i].GetValue(t));
                                        writer.Write((int)swap, types[i]);
                                        break;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    string sh = ex.Message;
                                }

                                writer.Write((object)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Interval:
                                writer.Write((TimeSpan)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Numeric:
                            case NpgsqlDbType.Money:
                                writer.Write((decimal)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Real:
                                writer.Write((Single)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Smallint:

                                try
                                {
                                    if (properties[i].GetType() == typeof(byte))
                                    {
                                        var swap = Convert.ToInt16(properties[i].GetValue(t));
                                        writer.Write((short)swap, types[i]);
                                        break;
                                    }
                                    writer.Write((short)properties[i].GetValue(t), types[i]);
                                }
                                catch (Exception ex)
                                {
                                    string ms = ex.Message;
                                }

                                break;
                            case NpgsqlDbType.Varchar:
                            case NpgsqlDbType.Text:
                                writer.Write((string)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Uuid:
                                writer.Write((Guid)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Xml:
                                writer.Write((string)properties[i].GetValue(t), types[i]);
                                break;
                        }
                    }
                }
            }
            writer.Complete();
        }
    }
    catch (Exception ex)
    {
        throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer().  See inner exception for details", ex);
    }
}

您需要先设置 属性 DestinationTableName 并且 conn 需要是一个打开的连接。

本质上,该方法使用Reflection来获取传递列表类型的属性。显然被填充的table的数据类型必须匹配!编写器通过遍历列表来构建,然后在末尾进行一次批量插入。我可能没有处理您需要的所有类型,但应该清楚如何添加它们,缺少它们的地方。

编辑

已要求共享它,这是 DataTable 的等效项:

public void WriteToServer(DataTable dataTable)
{
    try
    {
        if (DestinationTableName == null || DestinationTableName == "")
        {
            throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
        }
        int colCount = dataTable.Columns.Count;

        NpgsqlDbType[] types = new NpgsqlDbType[colCount];
        int[] lengths = new int[colCount];
        string[] fieldNames = new string[colCount];

        using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
        {
            using (var rdr = cmd.ExecuteReader())
            {
                if (rdr.FieldCount != colCount)
                {
                    throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
                }
                var columns = rdr.GetColumnSchema();
                for (int i = 0; i < colCount; i++)
                {
                    types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
                    lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
                    fieldNames[i] = columns[i].ColumnName;
                }
            }

        }
        var sB = new StringBuilder(fieldNames[0]);
        for (int p = 1; p < colCount; p++)
        {
            sB.Append(", " + fieldNames[p]);
        }
        using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
        {
            for (int j = 0; j < dataTable.Rows.Count; j++)
            {
                DataRow dR = dataTable.Rows[j];
                writer.StartRow();

                for (int i = 0; i < colCount; i++)
                {
                    if (dR[i] == DBNull.Value)
                    {
                        writer.WriteNull();
                    }
                    else
                    {
                        switch (types[i])
                        {
                            case NpgsqlDbType.Bigint:
                                writer.Write((long)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Bit:
                                if (lengths[i] > 1)
                                {
                                    writer.Write((byte[])dR[i], types[i]);
                                }
                                else
                                {
                                    writer.Write((byte)dR[i], types[i]);
                                }
                                break;
                            case NpgsqlDbType.Boolean:
                                writer.Write((bool)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Bytea:
                                writer.Write((byte[])dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Char:
                                if (dR[i] is string)
                                {
                                    writer.Write((string)dR[i], types[i]);
                                }
                                else if (dR[i] is Guid)
                                {
                                    var value = dR[i].ToString();
                                    writer.Write(value, types[i]);
                                }


                                else if (lengths[i] > 1)
                                {
                                    writer.Write((char[])dR[i], types[i]);
                                }
                                else
                                {

                                    var s = ((string)dR[i].ToString()).ToCharArray();
                                    writer.Write(s[0], types[i]);
                                }
                                break;
                            case NpgsqlDbType.Time:
                            case NpgsqlDbType.Timestamp:
                            case NpgsqlDbType.TimestampTz:
                            case NpgsqlDbType.Date:
                                writer.Write((DateTime)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Double:
                                writer.Write((double)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Integer:
                                try
                                {
                                    if (dR[i] is int)
                                    {
                                        writer.Write((int)dR[i], types[i]);
                                        break;
                                    }
                                    else if (dR[i] is string)
                                    {
                                        var swap = Convert.ToInt32(dR[i]);
                                        writer.Write((int)swap, types[i]);
                                        break;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    string sh = ex.Message;
                                }

                                writer.Write((object)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Interval:
                                writer.Write((TimeSpan)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Numeric:
                            case NpgsqlDbType.Money:
                                writer.Write((decimal)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Real:
                                writer.Write((Single)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Smallint:

                                try
                                {
                                    if (dR[i] is byte)
                                    {
                                        var swap = Convert.ToInt16(dR[i]);
                                        writer.Write((short)swap, types[i]);
                                        break;
                                    }
                                    writer.Write((short)dR[i], types[i]);
                                }
                                catch (Exception ex)
                                {
                                    string ms = ex.Message;
                                }

                                break;
                            case NpgsqlDbType.Varchar:
                            case NpgsqlDbType.Text:
                                writer.Write((string)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Uuid:
                                writer.Write((Guid)dR[i], types[i]);
                                break;
                            case NpgsqlDbType.Xml:
                                writer.Write((string)dR[i], types[i]);
                                break;
                        }
                    }
                }
            }
            writer.Complete();
        }
    }
    catch (Exception ex)
    {
        throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer().  See inner exception for details", ex);
    }
}

IEnumerable实现一样,可能缺少一些数据类型,但很容易扩展。

我用这个 https://github.com/PostgreSQLCopyHelper/PostgreSQLCopyHelper 它非常快。

这是一个例子,我正在加载我的自定义 class 的实例列表(称为 single_bar,它有 6 个变量,而不是你的 Foo class 和你的 2 ). Copy 助手将 class 中的变量映射到您希望写入的列。

            //  We make a copy helper, to upload to the DB
            PostgreSQLCopyHelper<single_bar> copyHelper = new PostgreSQLCopyHelper<single_bar>(temp_table_name)
                                                            .MapTimeStamp("bar_time", x => x.bar_time)
                                                            .MapReal("open_px", x => x.open_px)
                                                            .MapReal("high_px", x => x.high_px)
                                                            .MapReal("low_px", x => x.low_px)
                                                            .MapReal("close_px", x => x.close_px)
                                                            .MapReal("trade_volume", x => x.getrealvol());

            //  Upload the data to the temp table.
            ulong NoUploaded = copyHelper.SaveAll(conn, dataset);
            result = Convert.ToInt32(NoUploaded);

我创建了一个临时 table 作为复制操作的目标,然后对最终的 table 进行更新插入。