使用 SqlBulkCopy 时提供流作为二进制列的数据源

Supplying stream as a source of data for a binary column when SqlBulkCopy is used

如果需要以流方式从SqlServer 中读取数据,可以使用一些功能。例如将 SqlDataReaderCommandBehavior.SequentialAccess 一起使用,特别是当需要访问二进制列数据时,可以使用 GetStream(int) 方法:

var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";

using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
    dr.Read();

    var stream = dr.GetStream(0);
    // access stream
}

但是当需要使用 SqlBulkCopy SqlServer 提供数据时,如果需要将流作为二进制列的数据源?

我尝试关注

var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();

using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
    sbc.DestinationTableName = "#Test";
    sbc.EnableStreaming = true;

    sbc.ColumnMappings.Add(0, "ID");
    sbc.ColumnMappings.Add(1, "Data");

    sbc.WriteToServer(new TestDataReader());
}

其中 TestDataReader 实现 IDataReader 如下:

class TestDataReader : IDataReader
{
    public int FieldCount { get { return 2; } }
    int rowCount = 1;
    public bool Read() { return (rowCount++) < 3; }
    public bool IsDBNull(int i) { return false; }

    public object GetValue(int i)
    {
        switch (i)
        {
            case 0: return rowCount;
            case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
            default: throw new Exception();
        }
    }

    //the rest members of IDataReader
}

它按预期工作。

然而变化

case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };

case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });

导致异常 System.InvalidOperationException 消息

The given value of type MemoryStream from the data source cannot be converted to type varbinary of the specified target column.

有没有一种方法可以将 StreamIDataReader(或者可能 DbDataReader)提供给 SqlBulkCopy 作为二进制列的数据源,而无需复制所有它的数据先进入内存(字节数组)?

见以下代码

static int SendOrders(int totalToSend)
    {
      using (SqlConnection con = new SqlConnection(connectionString))
      {
        con.Open();
        using (SqlTransaction tran = con.BeginTransaction())
        {
          var newOrders =
                  from i in Enumerable.Range(0, totalToSend)
                  select new Order
                  {
                    customer_name = "Customer " + i % 100,
                    quantity = i % 9,
                    order_id = i,
                    order_entry_date = DateTime.Now
                  };

          SqlBulkCopy bc = new SqlBulkCopy(con,
            SqlBulkCopyOptions.CheckConstraints |
            SqlBulkCopyOptions.FireTriggers |
            SqlBulkCopyOptions.KeepNulls, tran);

          bc.BatchSize = 1000;
          bc.DestinationTableName = "order_queue";
          bc.WriteToServer(newOrders.AsDataReader()); 

          tran.Commit();
        }
        con.Close();

      }

      return totalToSend;

    }

不确定这在任何地方都有记录,但如果对 SqlBulkCopy 源代码进行简短检查,您可能会发现它以不同的方式处理不同的数据读取器。 首先,SqlBulkCopy 确实支持流和 GetStream,但您可能会注意到 IDataReader 接口不包含 GetStream 方法。因此,当您将自定义 IDataReader 实现提供给 SqlBulkCopy - 它不会将二进制列视为流式处理,并且不会接受 Stream 类型的值。

另一方面 - DbDataReader 有这种方法。如果您向 SqlBulkCopy 提供 DbDataReader-inherited class 的实例 - 它将以流式处理方式处理所有二进制列,并将调用 DbDataReader.GetStream.

所以要解决您的问题 - 像这样从 DbDataReader 继承:

class TestDataReader : DbDataReader
{
    public override bool IsDBNull(int ordinal) {
        return false;
    }

    public override int FieldCount { get; } = 2;
    int rowCount = 1;

    public override bool HasRows { get; } = true;
    public override bool IsClosed { get; } = false;

    public override bool Read()
    {
        return (rowCount++) < 3;
    }

    public override object GetValue(int ordinal) {
        switch (ordinal) {
            // do not return anything for binary column here - it will not be called
            case 0:
                return rowCount;
            default:
                throw new Exception();
        }
    }

    public override Stream GetStream(int ordinal) {
        // instead - return your stream here
        if (ordinal == 1)
            return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
        throw new Exception();
    }
    // bunch of irrelevant stuff

}