C# 数据库中的多个并行插入

C# multiple parallel inserts in database

我有一个大约有 3000 行的数据table。这些行中的每一行都需要插入数据库 table。目前,我正在运行一个 foreach 循环,如下所示:

obj_AseCommand.CommandText = sql_proc;
obj_AseCommand.CommandType = CommandType.StoredProcedure;
obj_AseCommand.Connection = db_Conn;
obj_AseCommand.Connection.Open();

foreach (DataRow dr in dt.Rows)                
{
    obj_AseCommand.Parameters.AddWithValue("@a", dr["a"]);
    obj_AseCommand.Parameters.AddWithValue("@b", dr["b"]);
    obj_AseCommand.Parameters.AddWithValue("@c", dr["c"]);

    obj_AseCommand.ExecuteNonQuery();
    obj_AseCommand.Parameters.Clear();
}

obj_AseCommand.Connection.Close();

请问我如何在数据库中并行执行 SP,因为上述方法大约需要 10 分钟才能插入 3000 行。

编辑

事后看来,使用 Parallel.ForEach 并行化数据库插入有点浪费,因为它还会为每个连接消耗一个线程。可以说,更好的并行解决方案是使用 System.Data Db 操作的异步版本,例如 ExecuteNonQueryAsync , start the executions (concurrently), and then use await Task.WhenAll() to wait upon completion - this will avoid the Thread overhead to the caller, although the overall Db performance won't likely be any quicker. More here

原始答案,多个并行插入数据库

您可以使用 TPL 并行执行此操作,例如特别是 Parallel.ForEach. You will almost certainly want to look at throttling the amount of parallelism by tweaking MaxDegreeOfParalelismlocalInit 重载,这样你就不会淹没你的数据库:

Parallel.ForEach(dt.Rows,
    // Adjust this for optimum throughput vs minimal impact to your other DB users
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    () =>
    {
        var con = new SqlConnection();
        var cmd = con.CreateCommand();
        cmd.CommandText = sql_proc;
        cmd.CommandType = CommandType.StoredProcedure;
        con.Open();

        cmd.Parameters.Add(new SqlParameter("@a", SqlDbType.Int));
        // NB : Size sensitive parameters must have size
        cmd.Parameters.Add(new SqlParameter("@b", SqlDbType.VarChar, 100));
        cmd.Parameters.Add(new SqlParameter("@c", SqlDbType.Bit));
        // Prepare won't help with SPROCs but can improve plan caching for adhoc sql
        // cmd.Prepare();
        return new {Conn = con, Cmd = cmd};
    },
    (dr, pls, localInit) =>
    {
        localInit.Cmd.Parameters["@a"] = dr["a"];
        localInit.Cmd.Parameters["@b"] = dr["b"];
        localInit.Cmd.Parameters["@c"] = dr["c"];
        localInit.Cmd.ExecuteNonQuery();
        return localInit;
    },
    (localInit) =>
    {
        localInit.Cmd.Dispose();
        localInit.Conn.Dispose();
    });

备注:

  • 除非你真的知道自己在做什么,一般情况下我们应该让TPL来决定并行度。但是,根据资源的争用程度(读取:数据库工作的锁),可能需要限制并发任务的上限(试错可能有用,例如尝试并发 4、8、16 个并发任务等以查看哪个提供了最大的吞吐量,并监控Sql服务器上的锁定和CPU负载。
  • 同样,保留 TPL 的默认分区程序通常足以跨任务对 DataRow 进行分区。
  • 每个任务都需要自己单独的 Sql 连接。
  • 与其在每次调用时创建和处理命令,不如为每个任务创建一次,然后继续重复使用相同的命令,每次只更新参数。
  • 使用 LocalInit / Local Finally lambda 执行每个任务的设置和清理,例如处理命令和连接。
  • 您也可以考虑使用 .Prepare() if you are using AdHoc Sql or Sql versions prior to 2005
  • 我假设枚举 DataTable's 行是线程安全的。当然,您需要仔细检查一下。

旁注:

3000 行 10 分钟是过多的,即使是宽 table 和单线程也是如此。你的程序是做什么的?我假设处理不是微不足道的,因此需要 SPROC,但如果你只是做简单的插入,根据 @3dd 的评论,SqlBulkCopy 将以合理的速度每分钟产生约 1M 行的插入缩小 table.

最好将整个数据table传入数据库

obj_AseCommand.CommandText = sql_proc;
obj_AseCommand.CommandType = CommandType.StoredProcedure;
obj_AseCommand.Connection = db_Conn;
obj_AseCommand.Connection.Open();
obj_AseCommand.Parameters.AddWithValue("@Parametername",DataTable);
obj_AseCommand.ExecuteNonQuery();

在数据库中,您必须创建与您的数据完全匹配的 table 类型 table

CREATE TYPE EmpType AS TABLE 
(
    ID INT, Name VARCHAR(3000), Address VARCHAR(8000), Operation SMALLINT //your columns
)

在存储过程中你可以做这样的事情...

create PROCEDURE demo

@Details EmpType READONLY // it must be read only
AS
BEGIN
    insert into yourtable   //insert data
    select * from @Details 
    END

您可以使用 SqlBulkCopy

指南是here

您可以使用 SqlBulkCopy。请参阅下面的示例代码。 WriteToServer方法,将datatable写入数据库,前提是它们是相同的映射

using (SqlBulkCopy bulkCopy = new SqlBulkCopy(ConSQL)) {
if (ConSQL.State == ConnectionState.Closed) {
    ConSQL.Open();
}

bulkCopy.ColumnMappings.Add(0, 0);
bulkCopy.ColumnMappings.Add(1, 1);
bulkCopy.ColumnMappings.Add(2, 2);

bulkCopy.DestinationTableName = "dbo.TableName";

bulkCopy.WriteToServer(dataTable);

bulkCopy.Close(); //redundant - since using will dispose the object

}