请求 dataTable.NewRow() 时的 DataTable "Array dimensions exceeded supported range"
DataTable "Array dimensions exceeded supported range" when requesting dataTable.NewRow()
出于某些疯狂的原因,我在将数据以合理的块形式传输到 SQL 时遇到了 OutOfMemoryException,并且几乎没有使用任何内存:
System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
at System.Data.DataTable.NewRowArray(Int32 size)
at System.Data.RecordManager.GrowRecordCapacity()
at System.Data.RecordManager.NewRecordBase()
at System.Data.DataTable.NewRecord(Int32 sourceRecord)
at Company.PA.Data.PADbContext.d__22`1.MoveNext() in D:\Agent_A\_work\s\Company.PA.DataLayer\Company.PA.Data\BulkInsert\StreamedSqlBulkCopy.cs:line 46
在下面的 while 循环中调用 dataTable.NewRow()
时发生错误,一旦我超过第 30 百万行:
/// <summary>Helper to stream a large number of records into SQL without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in SQL, for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DataRows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DataRows to generate in memory before passing them to SqlBulkCopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by SqlBulkCopy utility.</param>
public async Task StreamedSqlBulkCopy<T>(
string destinationTableName, DataTable dataTable,
IEnumerable<T> sourceData, Func<T, DataRow, DataRow> populateRow,
int memoryBatchSize = 1000000, int insertBatchSize = 5000)
{
using (SqlConnection connection = new SqlConnection(Database.Connection.ConnectionString))
{
connection.Open();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
{
// Configure the single SqlBulkCopy instance that will be used to copy all "batches"
bulkCopy.DestinationTableName = destinationTableName;
bulkCopy.BatchSize = insertBatchSize;
bulkCopy.BulkCopyTimeout = _bulkInsertTimeOut;
foreach (DataColumn column in dataTable.Columns)
bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
// Begin enumerating over all records, preparing batches no larger than "memoryBatchSize"
bool hasNext = true;
while (hasNext)
{
DataRow[] batch = new DataRow[memoryBatchSize];
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
batch[filled++] = populateRow(enumerator.Current, dataTable.NewRow());
// When we reach the end of the enumerable, we need to shrink the final buffer array
if (filled < memoryBatchSize)
Array.Resize(ref batch, filled);
await bulkCopy.WriteToServerAsync(batch);
}
}
}
}
正如希望清楚的那样,上述助手的目的是使用 SqlBulkCopy
将(非常大的)IEnumerable<T>
数据流式传输到 SQL table reader 和一个将为给定元素填充一行的委托。
示例用法为:
public async Task SaveExchangeRates(List<FxRate> fxRates)
{
var createDate = DateTimeOffset.UtcNow;
await StreamedSqlBulkCopy("RefData.ExchangeRate",
GetExchangeRateDataTable(), fxRates, (fx, newRow) =>
{
newRow["BaseCurrency"] = "USD";
newRow["TargetCurrency"] = fx.CurrencyCode;
newRow["ExchangeDate"] = fx.ExchangeRateDate;
newRow["DollarValue"] = fx.ValueInUsd;
return newRow;
});
}
private DataTable GetExchangeRateDataTable()
{
var dataTable = new DataTable();
dataTable.Columns.Add("ExchangeDate", typeof(DateTime));
dataTable.Columns.Add("BaseCurrency", typeof(string));
dataTable.Columns.Add("TargetCurrency", typeof(string));
dataTable.Columns.Add("DollarValue", typeof(double));
return dataTable;
}
事实证明,即使您只是将 DataTable
实例用作模式目的的空结构,即使您从未调用 dataTable.Rows.Add()
实际将其添加到 table,在它里面它会在你每次调用 NewRow
时增加一个计数器,显然甚至会增加一个占位符数组,它希望你最终插入所有这些行?
无论如何,解决方法是通过用自身的克隆覆盖它来定期“重置”您的模板:
dataTable = dataTable.Clone();
确实不优雅,但比尝试实现您自己的 IDataReader
更容易,后者是利用 SQLBulkCopy 的唯一其他方式。 (这就是说 - 对于任何其他试图流式传输到 SQL 批量复制但 没有 限制以避免像我一样避免第 3 方库的人,请从 FastMember package and this answer: )
另一种简化的方法(但以额外的开销为代价)是接受我们的命运并使用 DataTable
class 而不是 DataRow
的数组 - 但创建 Clone()
定期复制原始 table 以避免明显的最大硬限制 16,777,216 行。
我不喜欢 DataTable
为您使用它创建的所有行维护一个数组,即使它们最终没有被添加 - 所以我们不妨利用而不是分配我们的拥有。
使用 DataTable
的一些开销可以通过设置其初始容量以确保它不会增长(内存分配)和禁用尽可能多的事件来抵消:
相关变更如下:
bool hasNext = true;
while (hasNext)
{
using (DataTable tableChunk = dataTable.Clone())
{
tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
tableChunk.Rows.Add(populateRow(enumerator.Current, tableChunk.NewRow()));
await bulkCopy.WriteToServerAsync(tableChunk);
}
}
出于某些疯狂的原因,我在将数据以合理的块形式传输到 SQL 时遇到了 OutOfMemoryException,并且几乎没有使用任何内存:
System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
at System.Data.DataTable.NewRowArray(Int32 size)
at System.Data.RecordManager.GrowRecordCapacity()
at System.Data.RecordManager.NewRecordBase()
at System.Data.DataTable.NewRecord(Int32 sourceRecord)
at Company.PA.Data.PADbContext.d__22`1.MoveNext() in D:\Agent_A\_work\s\Company.PA.DataLayer\Company.PA.Data\BulkInsert\StreamedSqlBulkCopy.cs:line 46
在下面的 while 循环中调用 dataTable.NewRow()
时发生错误,一旦我超过第 30 百万行:
/// <summary>Helper to stream a large number of records into SQL without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in SQL, for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DataRows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DataRows to generate in memory before passing them to SqlBulkCopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by SqlBulkCopy utility.</param>
public async Task StreamedSqlBulkCopy<T>(
string destinationTableName, DataTable dataTable,
IEnumerable<T> sourceData, Func<T, DataRow, DataRow> populateRow,
int memoryBatchSize = 1000000, int insertBatchSize = 5000)
{
using (SqlConnection connection = new SqlConnection(Database.Connection.ConnectionString))
{
connection.Open();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
{
// Configure the single SqlBulkCopy instance that will be used to copy all "batches"
bulkCopy.DestinationTableName = destinationTableName;
bulkCopy.BatchSize = insertBatchSize;
bulkCopy.BulkCopyTimeout = _bulkInsertTimeOut;
foreach (DataColumn column in dataTable.Columns)
bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
// Begin enumerating over all records, preparing batches no larger than "memoryBatchSize"
bool hasNext = true;
while (hasNext)
{
DataRow[] batch = new DataRow[memoryBatchSize];
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
batch[filled++] = populateRow(enumerator.Current, dataTable.NewRow());
// When we reach the end of the enumerable, we need to shrink the final buffer array
if (filled < memoryBatchSize)
Array.Resize(ref batch, filled);
await bulkCopy.WriteToServerAsync(batch);
}
}
}
}
正如希望清楚的那样,上述助手的目的是使用 SqlBulkCopy
将(非常大的)IEnumerable<T>
数据流式传输到 SQL table reader 和一个将为给定元素填充一行的委托。
示例用法为:
public async Task SaveExchangeRates(List<FxRate> fxRates)
{
var createDate = DateTimeOffset.UtcNow;
await StreamedSqlBulkCopy("RefData.ExchangeRate",
GetExchangeRateDataTable(), fxRates, (fx, newRow) =>
{
newRow["BaseCurrency"] = "USD";
newRow["TargetCurrency"] = fx.CurrencyCode;
newRow["ExchangeDate"] = fx.ExchangeRateDate;
newRow["DollarValue"] = fx.ValueInUsd;
return newRow;
});
}
private DataTable GetExchangeRateDataTable()
{
var dataTable = new DataTable();
dataTable.Columns.Add("ExchangeDate", typeof(DateTime));
dataTable.Columns.Add("BaseCurrency", typeof(string));
dataTable.Columns.Add("TargetCurrency", typeof(string));
dataTable.Columns.Add("DollarValue", typeof(double));
return dataTable;
}
事实证明,即使您只是将 DataTable
实例用作模式目的的空结构,即使您从未调用 dataTable.Rows.Add()
实际将其添加到 table,在它里面它会在你每次调用 NewRow
时增加一个计数器,显然甚至会增加一个占位符数组,它希望你最终插入所有这些行?
无论如何,解决方法是通过用自身的克隆覆盖它来定期“重置”您的模板:
dataTable = dataTable.Clone();
确实不优雅,但比尝试实现您自己的 IDataReader
更容易,后者是利用 SQLBulkCopy 的唯一其他方式。 (这就是说 - 对于任何其他试图流式传输到 SQL 批量复制但 没有 限制以避免像我一样避免第 3 方库的人,请从 FastMember package and this answer:
另一种简化的方法(但以额外的开销为代价)是接受我们的命运并使用 DataTable
class 而不是 DataRow
的数组 - 但创建 Clone()
定期复制原始 table 以避免明显的最大硬限制 16,777,216 行。
我不喜欢 DataTable
为您使用它创建的所有行维护一个数组,即使它们最终没有被添加 - 所以我们不妨利用而不是分配我们的拥有。
使用 DataTable
的一些开销可以通过设置其初始容量以确保它不会增长(内存分配)和禁用尽可能多的事件来抵消:
相关变更如下:
bool hasNext = true;
while (hasNext)
{
using (DataTable tableChunk = dataTable.Clone())
{
tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
int filled = 0;
while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
tableChunk.Rows.Add(populateRow(enumerator.Current, tableChunk.NewRow()));
await bulkCopy.WriteToServerAsync(tableChunk);
}
}