使用 npgsql 将 iEnumerable 批量插入/复制到 table
Bulk insert / copy iEnumerable into table with npgsql
我有一个接受 IEnumerable 的方法,循环遍历 IEnumerable 并将每个插入到数据库中的 table 中。像这样:
public void Write(IEnumerable<foo> fooData)
{
for (var index = 0; index < fooData.Count(); index++)
{
var sql = @"insert into foo (col_id, col_name) values (@col_id, col_name)";
}
}
foo 是一个 class 反映了 db:
中的 table
public class Foo
{
public int col_id {get;set;}
public string col_name {get;set;}
}
我发现如果我的行有数千个条目,使用 for 循环效率不高。当涉及 IEnumerable 时,复制所有这些数据的更有效方法是什么?
我写了一个 class,其行为类似于 Postgres 的内置 SqlBulkCopy class。它包装了 COPY
命令以提供快速上传。 IEnumerable 的方法如下所示(DataTable
也有类似的方法)。
public void WriteToServer<T>(IEnumerable<T> data)
{
try
{
if (DestinationTableName == null || DestinationTableName == "")
{
throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
}
PropertyInfo[] properties = typeof(T).GetProperties();
int colCount = properties.Length;
NpgsqlDbType[] types = new NpgsqlDbType[colCount];
int[] lengths = new int[colCount];
string[] fieldNames = new string[colCount];
using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (rdr.FieldCount != colCount)
{
throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
}
var columns = rdr.GetColumnSchema();
for (int i = 0; i < colCount; i++)
{
types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
fieldNames[i] = columns[i].ColumnName;
}
}
}
var sB = new StringBuilder(fieldNames[0]);
for (int p = 1; p < colCount; p++)
{
sB.Append(", " + fieldNames[p]);
}
using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
{
foreach (var t in data)
{
writer.StartRow();
for (int i = 0; i < colCount; i++)
{
if (properties[i].GetValue(t) == null)
{
writer.WriteNull();
}
else
{
switch (types[i])
{
case NpgsqlDbType.Bigint:
writer.Write((long)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bit:
if (lengths[i] > 1)
{
writer.Write((byte[])properties[i].GetValue(t), types[i]);
}
else
{
writer.Write((byte)properties[i].GetValue(t), types[i]);
}
break;
case NpgsqlDbType.Boolean:
writer.Write((bool)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bytea:
writer.Write((byte[])properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Char:
if (properties[i].GetType() == typeof(string))
{
writer.Write((string)properties[i].GetValue(t), types[i]);
}
else if (properties[i].GetType() == typeof(Guid))
{
var value = properties[i].GetValue(t).ToString();
writer.Write(value, types[i]);
}
else if (lengths[i] > 1)
{
writer.Write((char[])properties[i].GetValue(t), types[i]);
}
else
{
var s = ((string)properties[i].GetValue(t).ToString()).ToCharArray();
writer.Write(s[0], types[i]);
}
break;
case NpgsqlDbType.Time:
case NpgsqlDbType.Timestamp:
case NpgsqlDbType.TimestampTz:
case NpgsqlDbType.Date:
writer.Write((DateTime)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Double:
writer.Write((double)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Integer:
try
{
if (properties[i].GetType() == typeof(int))
{
writer.Write((int)properties[i].GetValue(t), types[i]);
break;
}
else if (properties[i].GetType() == typeof(string))
{
var swap = Convert.ToInt32(properties[i].GetValue(t));
writer.Write((int)swap, types[i]);
break;
}
}
catch (Exception ex)
{
string sh = ex.Message;
}
writer.Write((object)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Interval:
writer.Write((TimeSpan)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Numeric:
case NpgsqlDbType.Money:
writer.Write((decimal)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Real:
writer.Write((Single)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Smallint:
try
{
if (properties[i].GetType() == typeof(byte))
{
var swap = Convert.ToInt16(properties[i].GetValue(t));
writer.Write((short)swap, types[i]);
break;
}
writer.Write((short)properties[i].GetValue(t), types[i]);
}
catch (Exception ex)
{
string ms = ex.Message;
}
break;
case NpgsqlDbType.Varchar:
case NpgsqlDbType.Text:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Uuid:
writer.Write((Guid)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Xml:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
}
}
}
}
writer.Complete();
}
}
catch (Exception ex)
{
throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer(). See inner exception for details", ex);
}
}
您需要先设置 属性 DestinationTableName 并且 conn 需要是一个打开的连接。
本质上,该方法使用Reflection
来获取传递列表类型的属性。显然被填充的table的数据类型必须匹配!编写器通过遍历列表来构建,然后在末尾进行一次批量插入。我可能没有处理您需要的所有类型,但应该清楚如何添加它们,缺少它们的地方。
编辑
已要求共享它,这是 DataTable 的等效项:
public void WriteToServer(DataTable dataTable)
{
try
{
if (DestinationTableName == null || DestinationTableName == "")
{
throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
}
int colCount = dataTable.Columns.Count;
NpgsqlDbType[] types = new NpgsqlDbType[colCount];
int[] lengths = new int[colCount];
string[] fieldNames = new string[colCount];
using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (rdr.FieldCount != colCount)
{
throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
}
var columns = rdr.GetColumnSchema();
for (int i = 0; i < colCount; i++)
{
types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
fieldNames[i] = columns[i].ColumnName;
}
}
}
var sB = new StringBuilder(fieldNames[0]);
for (int p = 1; p < colCount; p++)
{
sB.Append(", " + fieldNames[p]);
}
using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
{
for (int j = 0; j < dataTable.Rows.Count; j++)
{
DataRow dR = dataTable.Rows[j];
writer.StartRow();
for (int i = 0; i < colCount; i++)
{
if (dR[i] == DBNull.Value)
{
writer.WriteNull();
}
else
{
switch (types[i])
{
case NpgsqlDbType.Bigint:
writer.Write((long)dR[i], types[i]);
break;
case NpgsqlDbType.Bit:
if (lengths[i] > 1)
{
writer.Write((byte[])dR[i], types[i]);
}
else
{
writer.Write((byte)dR[i], types[i]);
}
break;
case NpgsqlDbType.Boolean:
writer.Write((bool)dR[i], types[i]);
break;
case NpgsqlDbType.Bytea:
writer.Write((byte[])dR[i], types[i]);
break;
case NpgsqlDbType.Char:
if (dR[i] is string)
{
writer.Write((string)dR[i], types[i]);
}
else if (dR[i] is Guid)
{
var value = dR[i].ToString();
writer.Write(value, types[i]);
}
else if (lengths[i] > 1)
{
writer.Write((char[])dR[i], types[i]);
}
else
{
var s = ((string)dR[i].ToString()).ToCharArray();
writer.Write(s[0], types[i]);
}
break;
case NpgsqlDbType.Time:
case NpgsqlDbType.Timestamp:
case NpgsqlDbType.TimestampTz:
case NpgsqlDbType.Date:
writer.Write((DateTime)dR[i], types[i]);
break;
case NpgsqlDbType.Double:
writer.Write((double)dR[i], types[i]);
break;
case NpgsqlDbType.Integer:
try
{
if (dR[i] is int)
{
writer.Write((int)dR[i], types[i]);
break;
}
else if (dR[i] is string)
{
var swap = Convert.ToInt32(dR[i]);
writer.Write((int)swap, types[i]);
break;
}
}
catch (Exception ex)
{
string sh = ex.Message;
}
writer.Write((object)dR[i], types[i]);
break;
case NpgsqlDbType.Interval:
writer.Write((TimeSpan)dR[i], types[i]);
break;
case NpgsqlDbType.Numeric:
case NpgsqlDbType.Money:
writer.Write((decimal)dR[i], types[i]);
break;
case NpgsqlDbType.Real:
writer.Write((Single)dR[i], types[i]);
break;
case NpgsqlDbType.Smallint:
try
{
if (dR[i] is byte)
{
var swap = Convert.ToInt16(dR[i]);
writer.Write((short)swap, types[i]);
break;
}
writer.Write((short)dR[i], types[i]);
}
catch (Exception ex)
{
string ms = ex.Message;
}
break;
case NpgsqlDbType.Varchar:
case NpgsqlDbType.Text:
writer.Write((string)dR[i], types[i]);
break;
case NpgsqlDbType.Uuid:
writer.Write((Guid)dR[i], types[i]);
break;
case NpgsqlDbType.Xml:
writer.Write((string)dR[i], types[i]);
break;
}
}
}
}
writer.Complete();
}
}
catch (Exception ex)
{
throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer(). See inner exception for details", ex);
}
}
与IEnumerable
实现一样,可能缺少一些数据类型,但很容易扩展。
我用这个 https://github.com/PostgreSQLCopyHelper/PostgreSQLCopyHelper 它非常快。
这是一个例子,我正在加载我的自定义 class 的实例列表(称为 single_bar,它有 6 个变量,而不是你的 Foo class 和你的 2 ). Copy 助手将 class 中的变量映射到您希望写入的列。
// We make a copy helper, to upload to the DB
PostgreSQLCopyHelper<single_bar> copyHelper = new PostgreSQLCopyHelper<single_bar>(temp_table_name)
.MapTimeStamp("bar_time", x => x.bar_time)
.MapReal("open_px", x => x.open_px)
.MapReal("high_px", x => x.high_px)
.MapReal("low_px", x => x.low_px)
.MapReal("close_px", x => x.close_px)
.MapReal("trade_volume", x => x.getrealvol());
// Upload the data to the temp table.
ulong NoUploaded = copyHelper.SaveAll(conn, dataset);
result = Convert.ToInt32(NoUploaded);
我创建了一个临时 table 作为复制操作的目标,然后对最终的 table 进行更新插入。
我有一个接受 IEnumerable 的方法,循环遍历 IEnumerable 并将每个插入到数据库中的 table 中。像这样:
public void Write(IEnumerable<foo> fooData)
{
for (var index = 0; index < fooData.Count(); index++)
{
var sql = @"insert into foo (col_id, col_name) values (@col_id, col_name)";
}
}
foo 是一个 class 反映了 db:
中的 table public class Foo
{
public int col_id {get;set;}
public string col_name {get;set;}
}
我发现如果我的行有数千个条目,使用 for 循环效率不高。当涉及 IEnumerable 时,复制所有这些数据的更有效方法是什么?
我写了一个 class,其行为类似于 Postgres 的内置 SqlBulkCopy class。它包装了 COPY
命令以提供快速上传。 IEnumerable 的方法如下所示(DataTable
也有类似的方法)。
public void WriteToServer<T>(IEnumerable<T> data)
{
try
{
if (DestinationTableName == null || DestinationTableName == "")
{
throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
}
PropertyInfo[] properties = typeof(T).GetProperties();
int colCount = properties.Length;
NpgsqlDbType[] types = new NpgsqlDbType[colCount];
int[] lengths = new int[colCount];
string[] fieldNames = new string[colCount];
using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (rdr.FieldCount != colCount)
{
throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
}
var columns = rdr.GetColumnSchema();
for (int i = 0; i < colCount; i++)
{
types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
fieldNames[i] = columns[i].ColumnName;
}
}
}
var sB = new StringBuilder(fieldNames[0]);
for (int p = 1; p < colCount; p++)
{
sB.Append(", " + fieldNames[p]);
}
using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
{
foreach (var t in data)
{
writer.StartRow();
for (int i = 0; i < colCount; i++)
{
if (properties[i].GetValue(t) == null)
{
writer.WriteNull();
}
else
{
switch (types[i])
{
case NpgsqlDbType.Bigint:
writer.Write((long)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bit:
if (lengths[i] > 1)
{
writer.Write((byte[])properties[i].GetValue(t), types[i]);
}
else
{
writer.Write((byte)properties[i].GetValue(t), types[i]);
}
break;
case NpgsqlDbType.Boolean:
writer.Write((bool)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bytea:
writer.Write((byte[])properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Char:
if (properties[i].GetType() == typeof(string))
{
writer.Write((string)properties[i].GetValue(t), types[i]);
}
else if (properties[i].GetType() == typeof(Guid))
{
var value = properties[i].GetValue(t).ToString();
writer.Write(value, types[i]);
}
else if (lengths[i] > 1)
{
writer.Write((char[])properties[i].GetValue(t), types[i]);
}
else
{
var s = ((string)properties[i].GetValue(t).ToString()).ToCharArray();
writer.Write(s[0], types[i]);
}
break;
case NpgsqlDbType.Time:
case NpgsqlDbType.Timestamp:
case NpgsqlDbType.TimestampTz:
case NpgsqlDbType.Date:
writer.Write((DateTime)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Double:
writer.Write((double)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Integer:
try
{
if (properties[i].GetType() == typeof(int))
{
writer.Write((int)properties[i].GetValue(t), types[i]);
break;
}
else if (properties[i].GetType() == typeof(string))
{
var swap = Convert.ToInt32(properties[i].GetValue(t));
writer.Write((int)swap, types[i]);
break;
}
}
catch (Exception ex)
{
string sh = ex.Message;
}
writer.Write((object)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Interval:
writer.Write((TimeSpan)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Numeric:
case NpgsqlDbType.Money:
writer.Write((decimal)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Real:
writer.Write((Single)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Smallint:
try
{
if (properties[i].GetType() == typeof(byte))
{
var swap = Convert.ToInt16(properties[i].GetValue(t));
writer.Write((short)swap, types[i]);
break;
}
writer.Write((short)properties[i].GetValue(t), types[i]);
}
catch (Exception ex)
{
string ms = ex.Message;
}
break;
case NpgsqlDbType.Varchar:
case NpgsqlDbType.Text:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Uuid:
writer.Write((Guid)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Xml:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
}
}
}
}
writer.Complete();
}
}
catch (Exception ex)
{
throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer(). See inner exception for details", ex);
}
}
您需要先设置 属性 DestinationTableName 并且 conn 需要是一个打开的连接。
本质上,该方法使用Reflection
来获取传递列表类型的属性。显然被填充的table的数据类型必须匹配!编写器通过遍历列表来构建,然后在末尾进行一次批量插入。我可能没有处理您需要的所有类型,但应该清楚如何添加它们,缺少它们的地方。
编辑
已要求共享它,这是 DataTable 的等效项:
public void WriteToServer(DataTable dataTable)
{
try
{
if (DestinationTableName == null || DestinationTableName == "")
{
throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
}
int colCount = dataTable.Columns.Count;
NpgsqlDbType[] types = new NpgsqlDbType[colCount];
int[] lengths = new int[colCount];
string[] fieldNames = new string[colCount];
using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (rdr.FieldCount != colCount)
{
throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
}
var columns = rdr.GetColumnSchema();
for (int i = 0; i < colCount; i++)
{
types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
fieldNames[i] = columns[i].ColumnName;
}
}
}
var sB = new StringBuilder(fieldNames[0]);
for (int p = 1; p < colCount; p++)
{
sB.Append(", " + fieldNames[p]);
}
using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
{
for (int j = 0; j < dataTable.Rows.Count; j++)
{
DataRow dR = dataTable.Rows[j];
writer.StartRow();
for (int i = 0; i < colCount; i++)
{
if (dR[i] == DBNull.Value)
{
writer.WriteNull();
}
else
{
switch (types[i])
{
case NpgsqlDbType.Bigint:
writer.Write((long)dR[i], types[i]);
break;
case NpgsqlDbType.Bit:
if (lengths[i] > 1)
{
writer.Write((byte[])dR[i], types[i]);
}
else
{
writer.Write((byte)dR[i], types[i]);
}
break;
case NpgsqlDbType.Boolean:
writer.Write((bool)dR[i], types[i]);
break;
case NpgsqlDbType.Bytea:
writer.Write((byte[])dR[i], types[i]);
break;
case NpgsqlDbType.Char:
if (dR[i] is string)
{
writer.Write((string)dR[i], types[i]);
}
else if (dR[i] is Guid)
{
var value = dR[i].ToString();
writer.Write(value, types[i]);
}
else if (lengths[i] > 1)
{
writer.Write((char[])dR[i], types[i]);
}
else
{
var s = ((string)dR[i].ToString()).ToCharArray();
writer.Write(s[0], types[i]);
}
break;
case NpgsqlDbType.Time:
case NpgsqlDbType.Timestamp:
case NpgsqlDbType.TimestampTz:
case NpgsqlDbType.Date:
writer.Write((DateTime)dR[i], types[i]);
break;
case NpgsqlDbType.Double:
writer.Write((double)dR[i], types[i]);
break;
case NpgsqlDbType.Integer:
try
{
if (dR[i] is int)
{
writer.Write((int)dR[i], types[i]);
break;
}
else if (dR[i] is string)
{
var swap = Convert.ToInt32(dR[i]);
writer.Write((int)swap, types[i]);
break;
}
}
catch (Exception ex)
{
string sh = ex.Message;
}
writer.Write((object)dR[i], types[i]);
break;
case NpgsqlDbType.Interval:
writer.Write((TimeSpan)dR[i], types[i]);
break;
case NpgsqlDbType.Numeric:
case NpgsqlDbType.Money:
writer.Write((decimal)dR[i], types[i]);
break;
case NpgsqlDbType.Real:
writer.Write((Single)dR[i], types[i]);
break;
case NpgsqlDbType.Smallint:
try
{
if (dR[i] is byte)
{
var swap = Convert.ToInt16(dR[i]);
writer.Write((short)swap, types[i]);
break;
}
writer.Write((short)dR[i], types[i]);
}
catch (Exception ex)
{
string ms = ex.Message;
}
break;
case NpgsqlDbType.Varchar:
case NpgsqlDbType.Text:
writer.Write((string)dR[i], types[i]);
break;
case NpgsqlDbType.Uuid:
writer.Write((Guid)dR[i], types[i]);
break;
case NpgsqlDbType.Xml:
writer.Write((string)dR[i], types[i]);
break;
}
}
}
}
writer.Complete();
}
}
catch (Exception ex)
{
throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer(). See inner exception for details", ex);
}
}
与IEnumerable
实现一样,可能缺少一些数据类型,但很容易扩展。
我用这个 https://github.com/PostgreSQLCopyHelper/PostgreSQLCopyHelper 它非常快。
这是一个例子,我正在加载我的自定义 class 的实例列表(称为 single_bar,它有 6 个变量,而不是你的 Foo class 和你的 2 ). Copy 助手将 class 中的变量映射到您希望写入的列。
// We make a copy helper, to upload to the DB
PostgreSQLCopyHelper<single_bar> copyHelper = new PostgreSQLCopyHelper<single_bar>(temp_table_name)
.MapTimeStamp("bar_time", x => x.bar_time)
.MapReal("open_px", x => x.open_px)
.MapReal("high_px", x => x.high_px)
.MapReal("low_px", x => x.low_px)
.MapReal("close_px", x => x.close_px)
.MapReal("trade_volume", x => x.getrealvol());
// Upload the data to the temp table.
ulong NoUploaded = copyHelper.SaveAll(conn, dataset);
result = Convert.ToInt32(NoUploaded);
我创建了一个临时 table 作为复制操作的目标,然后对最终的 table 进行更新插入。