如何使用 C#/SQL 批量更新 1000 条记录
How to bulk update 1000 records using C#/SQL
我正在从一个 table 中读取一百万条记录,我对某些列进行了一些修改,我想将它们批量保存回来。我想在每 10000 条记录后执行一次批量更新。
我正在使用 .Net Core 3.1
我的代码:
public void FixText()
{
SqlConnection cnn = new SqlConnection(strConn);
string queryString = "SELECT ClientID, ServerName, Text FROM [dbo].[Client]";
SqlCommand selectCmd = new SqlCommand(queryString, cnn);
cnn.Open();
int j = 0;
SqlDataReader reader = selectCmd.ExecuteReader();
List<Client> clients = new List<Client>();
try
{
while (reader.Read())
{
j++;
ClientID = (int)reader["ClientID"];
ServerName = reader["ServerName"].ToString();
Text = reader["Text"].ToString();
//Modify Text & set ServerName
string Text = UpdateText(Text);
if text.StartWith("doo"){ServerName = "re";}
//perform bulk update
if (j > 10000)
{
Client client = new Client()
{
ClientID = (int)ClientID,
ServerName = ServerName,
Text = Text,
};
clients.Add(client);
//I'm struggling here on what to use to do a bulk update
j = 0;
}
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
reader.Close();
}
cnn.Close();
}
感谢任何帮助!
解决这个问题有两种方法。
您可以使用 User-Defined Types
或 Sending array of record as JSON datatype through store procedure
您有两个选择,使用 MERGE
语句,或 UPDATE
.
我会选择 UPDATE
选项,因为这是最简单的选项。 (这需要 FastMember
nuget)。
private void ExecuteSql(SqlConnection connection , string sql , SqlParameter[] parameters = null)
{
if(connection == null)
{
throw new ArgumentNullException(nameof(connection));
}
if(string.IsNullOrWhiteSpace(sql))
{
throw new ArgumentNullException(nameof(sql));
}
using(var command = new SqlCommand(sql , connection))
{
if(parameters?.Length > 0)
{
command.Parameters.AddRange(parameters);
}
if(connection.State != ConnectionState.Open)
connection.Open();
command.ExecuteNonQuery();
}
}
private void ExecuteBulkCopy<T>(SqlConnection connection , IEnumerable<T> entries , string destinationTableName , string[] columns = null , int batchSize = 1000000)
{
if(connection == null)
{
throw new ArgumentNullException(nameof(connection));
}
if(entries == null)
{
throw new ArgumentNullException(nameof(entries));
}
if(string.IsNullOrWhiteSpace(destinationTableName))
{
throw new ArgumentNullException(nameof(destinationTableName));
}
if(connection.State != ConnectionState.Open)
connection.Open();
using(SqlBulkCopy sbc = new SqlBulkCopy(connection)
{
BulkCopyTimeout = 0 ,
DestinationTableName = destinationTableName ,
BatchSize = batchSize
})
{
using(var reader = ObjectReader.Create(entries , columns))
{
sbc.WriteToServer(reader);
}
}
}
private IEnumerable<Client> GetUpdatedClients(SqlConnection connection)
{
using(var command = new SqlCommand("SELECT ClientID, ServerName, Text FROM [dbo].[Client]", connection))
{
connection.Open();
using(SqlDataReader reader = _connection.ExecuteReader(query , parameters))
{
if(reader.HasRows)
{
while(reader.Read())
{
if(reader.IsDBNull(x)) { continue; }
var clientId = (int)reader["ClientID"];
var serverName = reader["ServerName"]?.ToString();
var text = reader["Text"]?.ToString();
//Modify Text & set ServerName
string textUpdated = UpdateText(text);
if(textUpdated.StartWith("doo"))
{
serverName = "re";
}
var client = new Client()
{
ClientID = clientId,
ServerName = serverName,
Text = textUpdated
};
yield return client;
}
}
}
}
}
private void BulkUpdateClients(SqlConnection connection, IEnumerable<Client> clients)
{
const string dropTempTable = "IF OBJECT_ID('[tempdb].[dbo].[##Client]') IS NOT NULL DROP TABLE [tempdb].[dbo].[##Client];";
// drop temp table if exists
ExecuteSql(connection ,dropTempTable);
// create the temp table
ExecuteSql($"SELECT TOP 1 [ClientID], [ServerName], [Text] INTO [tempdb].[dbo].[##Client] FROM [dbo].[Client];");
// copy rows to the temp table
ExecuteBulkCopy(connection, clients , "[tempdb].[dbo].[##Client]", new[] { "ClientID", "ServerName", "Text" });
// Use UPDATE JOIN
ExecuteSql("UPDATE t1 SET [ServerName] = t2.[ServerName], [Text] = t2.[Text] FROM [dbo].[Client] t1 JOIN [tempdb].[dbo].[##Client] t2 ON t1.[ClientID] = t2.[ClientID];");
// drop temp table
ExecuteSql(connection,dropTempTable);
}
public void BulkUpdateClients()
{
try
{
using(var connection = new SqlConnection(strConn))
{
connection.Open();
var clients = GetUpdatedClients(connection);
// it's important to use the same connection and keep it a live
// otherwise the temp table will be dropped.
BulkUpdate(connection, clients);
}
}
catch(Exception ex)
{
throw ex;
}
}
如果你不需要使用临时table,你可以将其更改为永久table(只需更改临时table名称)。
我正在从一个 table 中读取一百万条记录,我对某些列进行了一些修改,我想将它们批量保存回来。我想在每 10000 条记录后执行一次批量更新。
我正在使用 .Net Core 3.1
我的代码:
public void FixText()
{
SqlConnection cnn = new SqlConnection(strConn);
string queryString = "SELECT ClientID, ServerName, Text FROM [dbo].[Client]";
SqlCommand selectCmd = new SqlCommand(queryString, cnn);
cnn.Open();
int j = 0;
SqlDataReader reader = selectCmd.ExecuteReader();
List<Client> clients = new List<Client>();
try
{
while (reader.Read())
{
j++;
ClientID = (int)reader["ClientID"];
ServerName = reader["ServerName"].ToString();
Text = reader["Text"].ToString();
//Modify Text & set ServerName
string Text = UpdateText(Text);
if text.StartWith("doo"){ServerName = "re";}
//perform bulk update
if (j > 10000)
{
Client client = new Client()
{
ClientID = (int)ClientID,
ServerName = ServerName,
Text = Text,
};
clients.Add(client);
//I'm struggling here on what to use to do a bulk update
j = 0;
}
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
reader.Close();
}
cnn.Close();
}
感谢任何帮助!
解决这个问题有两种方法。
您可以使用 User-Defined Types
或 Sending array of record as JSON datatype through store procedure
您有两个选择,使用 MERGE
语句,或 UPDATE
.
我会选择 UPDATE
选项,因为这是最简单的选项。 (这需要 FastMember
nuget)。
private void ExecuteSql(SqlConnection connection , string sql , SqlParameter[] parameters = null)
{
if(connection == null)
{
throw new ArgumentNullException(nameof(connection));
}
if(string.IsNullOrWhiteSpace(sql))
{
throw new ArgumentNullException(nameof(sql));
}
using(var command = new SqlCommand(sql , connection))
{
if(parameters?.Length > 0)
{
command.Parameters.AddRange(parameters);
}
if(connection.State != ConnectionState.Open)
connection.Open();
command.ExecuteNonQuery();
}
}
private void ExecuteBulkCopy<T>(SqlConnection connection , IEnumerable<T> entries , string destinationTableName , string[] columns = null , int batchSize = 1000000)
{
if(connection == null)
{
throw new ArgumentNullException(nameof(connection));
}
if(entries == null)
{
throw new ArgumentNullException(nameof(entries));
}
if(string.IsNullOrWhiteSpace(destinationTableName))
{
throw new ArgumentNullException(nameof(destinationTableName));
}
if(connection.State != ConnectionState.Open)
connection.Open();
using(SqlBulkCopy sbc = new SqlBulkCopy(connection)
{
BulkCopyTimeout = 0 ,
DestinationTableName = destinationTableName ,
BatchSize = batchSize
})
{
using(var reader = ObjectReader.Create(entries , columns))
{
sbc.WriteToServer(reader);
}
}
}
private IEnumerable<Client> GetUpdatedClients(SqlConnection connection)
{
using(var command = new SqlCommand("SELECT ClientID, ServerName, Text FROM [dbo].[Client]", connection))
{
connection.Open();
using(SqlDataReader reader = _connection.ExecuteReader(query , parameters))
{
if(reader.HasRows)
{
while(reader.Read())
{
if(reader.IsDBNull(x)) { continue; }
var clientId = (int)reader["ClientID"];
var serverName = reader["ServerName"]?.ToString();
var text = reader["Text"]?.ToString();
//Modify Text & set ServerName
string textUpdated = UpdateText(text);
if(textUpdated.StartWith("doo"))
{
serverName = "re";
}
var client = new Client()
{
ClientID = clientId,
ServerName = serverName,
Text = textUpdated
};
yield return client;
}
}
}
}
}
private void BulkUpdateClients(SqlConnection connection, IEnumerable<Client> clients)
{
const string dropTempTable = "IF OBJECT_ID('[tempdb].[dbo].[##Client]') IS NOT NULL DROP TABLE [tempdb].[dbo].[##Client];";
// drop temp table if exists
ExecuteSql(connection ,dropTempTable);
// create the temp table
ExecuteSql($"SELECT TOP 1 [ClientID], [ServerName], [Text] INTO [tempdb].[dbo].[##Client] FROM [dbo].[Client];");
// copy rows to the temp table
ExecuteBulkCopy(connection, clients , "[tempdb].[dbo].[##Client]", new[] { "ClientID", "ServerName", "Text" });
// Use UPDATE JOIN
ExecuteSql("UPDATE t1 SET [ServerName] = t2.[ServerName], [Text] = t2.[Text] FROM [dbo].[Client] t1 JOIN [tempdb].[dbo].[##Client] t2 ON t1.[ClientID] = t2.[ClientID];");
// drop temp table
ExecuteSql(connection,dropTempTable);
}
public void BulkUpdateClients()
{
try
{
using(var connection = new SqlConnection(strConn))
{
connection.Open();
var clients = GetUpdatedClients(connection);
// it's important to use the same connection and keep it a live
// otherwise the temp table will be dropped.
BulkUpdate(connection, clients);
}
}
catch(Exception ex)
{
throw ex;
}
}
如果你不需要使用临时table,你可以将其更改为永久table(只需更改临时table名称)。