SqlBulkCopy.WriteToServerAsync 不尊重 `await` 关键字。为什么?
SqlBulkCopy.WriteToServerAsync does not respect the `await` keyword. Why?
SqlBulkCopy.WriteToServerAsync 不尊重 await
关键字。为什么?
这是我的代码:
public async Task UpdateDBWithXML(Action<Func<DataTable, Task>> readXmlInBatches, string hashKey, string hash)
{
using (var transaction = this.Context.Database.BeginTransaction(IsolationLevel.ReadUncommitted))
using (var bulk = new SqlBulkCopy((SqlConnection)this.Connection, SqlBulkCopyOptions.Default, (SqlTransaction)transaction.UnderlyingTransaction))
{
//this.Context.Database.ExecuteSqlCommand("DELETE FROM [dbo].[LegalContractorTemps]");
bulk.DestinationTableName = "LegalContractorTemps";
readXmlInBatches(async (DataTable table) =>
{
if (bulk.ColumnMappings.Count == 0)
{
foreach (DataColumn column in table.Columns)
{
bulk.ColumnMappings.Add(new SqlBulkCopyColumnMapping(column.ColumnName, column.ColumnName));
}
}
await bulk.WriteToServerAsync(table);
});
await this.Context.Database.ExecuteSqlCommandAsync(
"EXECUTE dbo.LegalContractorsDataSynchronize @hashKey, @hash",
new SqlParameter("@hashKey", hashKey),
new SqlParameter("@hash", hash)
);
transaction.Commit();
}
}
在 readXmlInBatches
参数中,我将以下函数作为参数传递:
public void ReadXMLInBatches(Func<DataTable, Task> processBatch)
{
int batchSize = 10000;
var table = new DataTable();
foreach (var col in columnNames)
{
table.Columns.Add(col);
}
using (var reader = new StreamReader(pathToXml, Encoding.GetEncoding(encoding)))
using (var xmlReader = XmlReader.Create(reader))
{
string lastElement = null;
DataRow lastRow = null;
while (xmlReader.Read())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
if (xmlReader.Name == "RECORD")
{
if (table.Rows.Count >= batchSize)
{
processBatch(table);
table.Rows.Clear();
}
lastRow = table.Rows.Add();
}
lastElement = xmlReader.Name;
break;
case XmlNodeType.Text:
ReadMember(lastRow, lastElement, xmlReader.Value);
break;
}
}
if (table.Rows.Count > 0)
{
processBatch(table);
table.Rows.Clear();
}
}
}
我在 XML 中有大约 170 万条记录。在我的程序读取了几批后,我收到错误:
System.Data.RowNotInTableException: 'This row has been removed from a table and does not have any data. BeginEdit() will allow creation of new data in this row.'
我研究了 the source SqlBulkCopy
的代码。并找到抛出错误的方法:
public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) {
Task resultTask = null;
SqlConnection.ExecutePermission.Demand();
if (table == null) {
throw new ArgumentNullException("table");
}
if (_isBulkCopyingInProgress){
throw SQL.BulkLoadPendingOperation();
}
SqlStatistics statistics = Statistics;
try {
statistics = SqlStatistics.StartTimer(Statistics);
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_SqlDataReaderRowSource = null;
_dataTableSource = table;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
}
finally {
SqlStatistics.StopTimer(statistics);
}
return resultTask;
}
我注意到了字段_isBulkCopyingInProgress
,并决定在调试时检查它。我发现当错误被抛出时,字段是 true
。这怎么可能?我希望批量插入首先发生(在执行继续之前 WriteToServerAsync
将被第二次调用)因为我在此处添加了 await
:await bulk.WriteToServerAsync(table);
.
我可能遗漏了什么?
您正在将一个异步函数传递给 ReadXMLInBatches
,但您的方法内并未等待它的执行,因此 ReadXMLInBatches
可能会在对 WriteToServerAsync
的所有调用完成之前终止.
尝试以下更改:
public async Task ReadXMLInBatchesAsync(Func<DataTable, Task> processBatch)
{
//...
await processBatch(table);
//...
}
public async Task UpdateDBWithXML(Func<Func<DataTable, Task>, Task> readXmlInBatches, string hashKey, string hash)
{
//...
await readXmlInBatches(async (DataTable table) =>
//...
}
SqlBulkCopy.WriteToServerAsync 不尊重 await
关键字。为什么?
这是我的代码:
public async Task UpdateDBWithXML(Action<Func<DataTable, Task>> readXmlInBatches, string hashKey, string hash)
{
using (var transaction = this.Context.Database.BeginTransaction(IsolationLevel.ReadUncommitted))
using (var bulk = new SqlBulkCopy((SqlConnection)this.Connection, SqlBulkCopyOptions.Default, (SqlTransaction)transaction.UnderlyingTransaction))
{
//this.Context.Database.ExecuteSqlCommand("DELETE FROM [dbo].[LegalContractorTemps]");
bulk.DestinationTableName = "LegalContractorTemps";
readXmlInBatches(async (DataTable table) =>
{
if (bulk.ColumnMappings.Count == 0)
{
foreach (DataColumn column in table.Columns)
{
bulk.ColumnMappings.Add(new SqlBulkCopyColumnMapping(column.ColumnName, column.ColumnName));
}
}
await bulk.WriteToServerAsync(table);
});
await this.Context.Database.ExecuteSqlCommandAsync(
"EXECUTE dbo.LegalContractorsDataSynchronize @hashKey, @hash",
new SqlParameter("@hashKey", hashKey),
new SqlParameter("@hash", hash)
);
transaction.Commit();
}
}
在 readXmlInBatches
参数中,我将以下函数作为参数传递:
public void ReadXMLInBatches(Func<DataTable, Task> processBatch)
{
int batchSize = 10000;
var table = new DataTable();
foreach (var col in columnNames)
{
table.Columns.Add(col);
}
using (var reader = new StreamReader(pathToXml, Encoding.GetEncoding(encoding)))
using (var xmlReader = XmlReader.Create(reader))
{
string lastElement = null;
DataRow lastRow = null;
while (xmlReader.Read())
{
switch (xmlReader.NodeType)
{
case XmlNodeType.Element:
if (xmlReader.Name == "RECORD")
{
if (table.Rows.Count >= batchSize)
{
processBatch(table);
table.Rows.Clear();
}
lastRow = table.Rows.Add();
}
lastElement = xmlReader.Name;
break;
case XmlNodeType.Text:
ReadMember(lastRow, lastElement, xmlReader.Value);
break;
}
}
if (table.Rows.Count > 0)
{
processBatch(table);
table.Rows.Clear();
}
}
}
我在 XML 中有大约 170 万条记录。在我的程序读取了几批后,我收到错误:
System.Data.RowNotInTableException: 'This row has been removed from a table and does not have any data. BeginEdit() will allow creation of new data in this row.'
我研究了 the source SqlBulkCopy
的代码。并找到抛出错误的方法:
public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) {
Task resultTask = null;
SqlConnection.ExecutePermission.Demand();
if (table == null) {
throw new ArgumentNullException("table");
}
if (_isBulkCopyingInProgress){
throw SQL.BulkLoadPendingOperation();
}
SqlStatistics statistics = Statistics;
try {
statistics = SqlStatistics.StartTimer(Statistics);
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_SqlDataReaderRowSource = null;
_dataTableSource = table;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
}
finally {
SqlStatistics.StopTimer(statistics);
}
return resultTask;
}
我注意到了字段_isBulkCopyingInProgress
,并决定在调试时检查它。我发现当错误被抛出时,字段是 true
。这怎么可能?我希望批量插入首先发生(在执行继续之前 WriteToServerAsync
将被第二次调用)因为我在此处添加了 await
:await bulk.WriteToServerAsync(table);
.
我可能遗漏了什么?
您正在将一个异步函数传递给 ReadXMLInBatches
,但您的方法内并未等待它的执行,因此 ReadXMLInBatches
可能会在对 WriteToServerAsync
的所有调用完成之前终止.
尝试以下更改:
public async Task ReadXMLInBatchesAsync(Func<DataTable, Task> processBatch)
{
//...
await processBatch(table);
//...
}
public async Task UpdateDBWithXML(Func<Func<DataTable, Task>, Task> readXmlInBatches, string hashKey, string hash)
{
//...
await readXmlInBatches(async (DataTable table) =>
//...
}