使用 SqlBulkCopy 时提供流作为二进制列的数据源
Supplying stream as a source of data for a binary column when SqlBulkCopy is used
如果需要以流方式从SqlServer 中读取数据,可以使用一些功能。例如将 SqlDataReader
与 CommandBehavior.SequentialAccess
一起使用,特别是当需要访问二进制列数据时,可以使用 GetStream(int)
方法:
var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";
using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
dr.Read();
var stream = dr.GetStream(0);
// access stream
}
但是当需要使用 SqlBulkCopy
向 向 SqlServer 提供数据时,如果需要将流作为二进制列的数据源?
我尝试关注
var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();
using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
sbc.DestinationTableName = "#Test";
sbc.EnableStreaming = true;
sbc.ColumnMappings.Add(0, "ID");
sbc.ColumnMappings.Add(1, "Data");
sbc.WriteToServer(new TestDataReader());
}
其中 TestDataReader
实现 IDataReader
如下:
class TestDataReader : IDataReader
{
public int FieldCount { get { return 2; } }
int rowCount = 1;
public bool Read() { return (rowCount++) < 3; }
public bool IsDBNull(int i) { return false; }
public object GetValue(int i)
{
switch (i)
{
case 0: return rowCount;
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
default: throw new Exception();
}
}
//the rest members of IDataReader
}
它按预期工作。
然而变化
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
至
case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });
导致异常 System.InvalidOperationException
消息
The given value of type MemoryStream from the data source cannot be
converted to type varbinary of the specified target column.
有没有一种方法可以将 Stream
从 IDataReader
(或者可能 DbDataReader
)提供给 SqlBulkCopy
作为二进制列的数据源,而无需复制所有它的数据先进入内存(字节数组)?
见以下代码
static int SendOrders(int totalToSend)
{
using (SqlConnection con = new SqlConnection(connectionString))
{
con.Open();
using (SqlTransaction tran = con.BeginTransaction())
{
var newOrders =
from i in Enumerable.Range(0, totalToSend)
select new Order
{
customer_name = "Customer " + i % 100,
quantity = i % 9,
order_id = i,
order_entry_date = DateTime.Now
};
SqlBulkCopy bc = new SqlBulkCopy(con,
SqlBulkCopyOptions.CheckConstraints |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.KeepNulls, tran);
bc.BatchSize = 1000;
bc.DestinationTableName = "order_queue";
bc.WriteToServer(newOrders.AsDataReader());
tran.Commit();
}
con.Close();
}
return totalToSend;
}
不确定这在任何地方都有记录,但如果对 SqlBulkCopy
源代码进行简短检查,您可能会发现它以不同的方式处理不同的数据读取器。
首先,SqlBulkCopy
确实支持流和 GetStream
,但您可能会注意到 IDataReader
接口不包含 GetStream
方法。因此,当您将自定义 IDataReader
实现提供给 SqlBulkCopy
- 它不会将二进制列视为流式处理,并且不会接受 Stream
类型的值。
另一方面 - DbDataReader
有这种方法。如果您向 SqlBulkCopy
提供 DbDataReader
-inherited class 的实例 - 它将以流式处理方式处理所有二进制列,并将调用 DbDataReader.GetStream
.
所以要解决您的问题 - 像这样从 DbDataReader
继承:
class TestDataReader : DbDataReader
{
public override bool IsDBNull(int ordinal) {
return false;
}
public override int FieldCount { get; } = 2;
int rowCount = 1;
public override bool HasRows { get; } = true;
public override bool IsClosed { get; } = false;
public override bool Read()
{
return (rowCount++) < 3;
}
public override object GetValue(int ordinal) {
switch (ordinal) {
// do not return anything for binary column here - it will not be called
case 0:
return rowCount;
default:
throw new Exception();
}
}
public override Stream GetStream(int ordinal) {
// instead - return your stream here
if (ordinal == 1)
return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
throw new Exception();
}
// bunch of irrelevant stuff
}
如果需要以流方式从SqlServer 中读取数据,可以使用一些功能。例如将 SqlDataReader
与 CommandBehavior.SequentialAccess
一起使用,特别是当需要访问二进制列数据时,可以使用 GetStream(int)
方法:
var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";
using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
dr.Read();
var stream = dr.GetStream(0);
// access stream
}
但是当需要使用 SqlBulkCopy
向 向 SqlServer 提供数据时,如果需要将流作为二进制列的数据源?
我尝试关注
var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();
using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
sbc.DestinationTableName = "#Test";
sbc.EnableStreaming = true;
sbc.ColumnMappings.Add(0, "ID");
sbc.ColumnMappings.Add(1, "Data");
sbc.WriteToServer(new TestDataReader());
}
其中 TestDataReader
实现 IDataReader
如下:
class TestDataReader : IDataReader
{
public int FieldCount { get { return 2; } }
int rowCount = 1;
public bool Read() { return (rowCount++) < 3; }
public bool IsDBNull(int i) { return false; }
public object GetValue(int i)
{
switch (i)
{
case 0: return rowCount;
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
default: throw new Exception();
}
}
//the rest members of IDataReader
}
它按预期工作。
然而变化
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
至
case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });
导致异常 System.InvalidOperationException
消息
The given value of type MemoryStream from the data source cannot be converted to type varbinary of the specified target column.
有没有一种方法可以将 Stream
从 IDataReader
(或者可能 DbDataReader
)提供给 SqlBulkCopy
作为二进制列的数据源,而无需复制所有它的数据先进入内存(字节数组)?
见以下代码
static int SendOrders(int totalToSend)
{
using (SqlConnection con = new SqlConnection(connectionString))
{
con.Open();
using (SqlTransaction tran = con.BeginTransaction())
{
var newOrders =
from i in Enumerable.Range(0, totalToSend)
select new Order
{
customer_name = "Customer " + i % 100,
quantity = i % 9,
order_id = i,
order_entry_date = DateTime.Now
};
SqlBulkCopy bc = new SqlBulkCopy(con,
SqlBulkCopyOptions.CheckConstraints |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.KeepNulls, tran);
bc.BatchSize = 1000;
bc.DestinationTableName = "order_queue";
bc.WriteToServer(newOrders.AsDataReader());
tran.Commit();
}
con.Close();
}
return totalToSend;
}
不确定这在任何地方都有记录,但如果对 SqlBulkCopy
源代码进行简短检查,您可能会发现它以不同的方式处理不同的数据读取器。
首先,SqlBulkCopy
确实支持流和 GetStream
,但您可能会注意到 IDataReader
接口不包含 GetStream
方法。因此,当您将自定义 IDataReader
实现提供给 SqlBulkCopy
- 它不会将二进制列视为流式处理,并且不会接受 Stream
类型的值。
另一方面 - DbDataReader
有这种方法。如果您向 SqlBulkCopy
提供 DbDataReader
-inherited class 的实例 - 它将以流式处理方式处理所有二进制列,并将调用 DbDataReader.GetStream
.
所以要解决您的问题 - 像这样从 DbDataReader
继承:
class TestDataReader : DbDataReader
{
public override bool IsDBNull(int ordinal) {
return false;
}
public override int FieldCount { get; } = 2;
int rowCount = 1;
public override bool HasRows { get; } = true;
public override bool IsClosed { get; } = false;
public override bool Read()
{
return (rowCount++) < 3;
}
public override object GetValue(int ordinal) {
switch (ordinal) {
// do not return anything for binary column here - it will not be called
case 0:
return rowCount;
default:
throw new Exception();
}
}
public override Stream GetStream(int ordinal) {
// instead - return your stream here
if (ordinal == 1)
return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
throw new Exception();
}
// bunch of irrelevant stuff
}