多线程sqldatareader时间过期

multithreading sqldatareader time expired

场景如下: 我想从服务器 A 向服务器 B 插入数据 table。

数据量很大,我会用SqlBulkCopy来完成。

这是我的想法:

producer: get all page number

consumer: get page number, paging data from server A and write datareader to server

我遇到的问题:

当运行时间长了,会抛出异常:

Timeout expired. The timeout period elapsed prior to completion of the operation


[SqlException (0x80131904): Timeout expired.  The timeout period elapsed prior to completion of the operation or the server is not responding.] System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection) +1948826
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection) +4844747
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj) +194
System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj) +2392
System.Data.SqlClient.SqlDataReader.ConsumeMetaData() +33
System.Data.SqlClient.SqlDataReader.get_MetaData() +83
System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString) +297
System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async) +954
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, DbAsyncResult result) +162
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method) +32
System.Data.SqlClient.SqlCommand.ExecuteReader(CommandBehavior behavior, String method) +141
System.Data.SqlClient.SqlCommand.ExecuteDbDataReader(CommandBehavior behavior) +12
System.Data.Common.DbCommand.ExecuteReader() +12
System.Data.Linq.SqlClient.SqlProvider.Execute(Expression query, QueryInfo queryInfo, IObjectReaderFactory factory, Object[] parentArgs, Object[] userArgs, ICompiledSubQuery[] subQueries, Object lastResult) +332

我找不到我的代码有什么问题。

这是我的代码:

Program.cs

class Program
{
    static BlockingCollection<int> pcCollection = new BlockingCollection<int>();
    static string strRemoteConn = ConfigurationManager.AppSettings["RemoteConnStr"];
    static string strLocalConn = ConfigurationManager.AppSettings["LocalConnStr"];
    static string strCommandSql = ConfigurationManager.AppSettings["CommandSQL"];
    static string strTableName = ConfigurationManager.AppSettings["TableName"];
    static int batchSize = Int32.Parse(ConfigurationManager.AppSettings["CommitBatchSize"]);
    static int taskCount = Int32.Parse(ConfigurationManager.AppSettings["TaskCount"]);

    static object s_consumer = new object();

    static void Main(string[] args)
    {
        try
        {
            var watch = Stopwatch.StartNew();

            var tableCount = 0D;

            using (var connection = new SqlConnection(strRemoteConn))
            using (SqlCommand cmd = connection.CreateCommand())
            {
                connection.Open();
                cmd.CommandText = string.Format(@"SELECT
                                                    Total_Rows= SUM(st.row_count)
                                                FROM
                                                    sys.dm_db_partition_stats st
                                                WHERE
                                                    object_name(object_id) = '{0}' AND (index_id < 2)", strTableName);
                cmd.CommandTimeout = 300;

                tableCount = Double.Parse(cmd.ExecuteScalar().ToString());
            }

            var totalPages = (int)Math.Ceiling(tableCount / batchSize);

            var listPageRn = Enumerable.Range(1, totalPages);

            var listPartPage = listPageRn.Split(taskCount).ToList();

            var listProducerTask = new List<Task>();
            var listConsumerTask = new List<Task>();

            var consumerTask = taskCount;

            for (int i = 1; i <= consumerTask; i++)
            {
                var taskFlag = i;
                var consumer = Task.Factory.StartNew(() =>
                {
                    ConsumerAction(taskFlag.ToString());

                }, TaskCreationOptions.LongRunning);

                listConsumerTask.Add(consumer);
            }

            var producerTaskIndex = 1;
            foreach (var item in listPartPage)
            {
                var tmpIndex = producerTaskIndex.ToString();
                var producer = Task.Factory.StartNew(() =>
                {
                    ProducerAction(item, tmpIndex);
                });

                listProducerTask.Add(producer);
                producerTaskIndex++;
            }

            Task.WaitAll(listProducerTask.ToArray());
            pcCollection.CompleteAdding();
            Task.WaitAll(listConsumerTask.ToArray());

            watch.Stop();
            var mins = watch.ElapsedMilliseconds / 1000 / 60;
            Console.WriteLine("All Batch Insert Time Elapsed:\t {0} mins", mins);
        }
        catch (AggregateException ex)
        {
            using (StreamWriter writer = File.AppendText("BatchError.txt"))
            {
                writer.WriteLine("Error Time: {0}", DateTime.Now);
                foreach (var exception in ex.InnerExceptions)
                {
                    writer.WriteLine("Error: {0}", exception.Message);
                    writer.WriteLine("Source: {0}", exception.Source);
                    writer.WriteLine("Track: {0}", exception.StackTrace);
                }
            }
            throw;

        }
        catch (Exception ex)
        {
            using (StreamWriter writer = File.AppendText("BatchError.txt"))
            {
                writer.WriteLine("Error Time: {0}", DateTime.Now);
                writer.WriteLine("Error: {0}", ex.Message);
                writer.WriteLine("Source: {0}", ex.Source);
                writer.WriteLine("Track: {0}", ex.StackTrace);
            }
            throw;
        }

        Console.ReadLine();
    }

    static void ProducerAction(IEnumerable<int> source, string taskFlag = "1")
    {
        foreach (var item in source)
        {
            Console.WriteLine("Producer-{0} processing item batch {1}", taskFlag, item);

            pcCollection.Add(item);
        }
    }

    static void ConsumerAction(string taskFlag = "")
    {
        foreach (var item in pcCollection.GetConsumingEnumerable())
        {
            Console.WriteLine("consumer-{0} processing item", taskFlag);
            var processing = new ManageBatchProcessing
            {
                LocalConnStr = strLocalConn,
                RemoteConnStr = strRemoteConn,
                BatchSize = batchSize,
                TableName = strTableName,
                CommandSql = strCommandSql
            };

            processing.ProcessDatabase(item);
        }
    }

ManageBatchProcessing.cs

public class ManageBatchProcessing
{
    public string LocalConnStr { get; set; }
    public string RemoteConnStr { get; set; }

    public string CommandSql { get; set; }
    public int BatchSize { get; set; }
    public string TableName { get; set; }

    public void ProcessDatabase(int item)
    {
        var watch = new Stopwatch();
        watch.Start();

        var start = (item - 1) * this.BatchSize + 1;
        var end = item * this.BatchSize;
        var strCommandSql = string.Format(this.CommandSql, start, end);
        using (var remoteConn = new SqlConnection(this.RemoteConnStr))
        using (var localConn = new SqlConnection(this.LocalConnStr))
        {
            remoteConn.Open();
            localConn.Open();

            using (var command = new SqlCommand(strCommandSql, remoteConn))
            using (var dataReader = command.ExecuteReader())
            {
                command.CommandTimeout = 0;
                using (var bulkCopy = new SqlBulkCopy(localConn))
                {
                    bulkCopy.DestinationTableName = this.TableName;
                    bulkCopy.BulkCopyTimeout = 0;
                    bulkCopy.WriteToServer(dataReader);
                    bulkCopy.Close();
                }
            }

            remoteConn.Close();
            localConn.Close();
        }

        watch.Stop();

        var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
        Console.WriteLine("\t\t\t -------------------------------------------------");
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
        Console.ResetColor();
        Console.WriteLine("\t\t\t -------------------------------------------------");
    }

    public DataTable RetriveToDatabase(int item)
    {
        var start = (item - 1) * this.BatchSize + 1;
        var end = item * this.BatchSize;
        var dataTable = new DataTable();

        using (var connection = new SqlConnection(this.RemoteConnStr))
        {
            var watch = new Stopwatch();
            watch.Start();

            using (var adapter = new SqlDataAdapter(string.Format(this.CommandSql, start, end), connection))
            {
                adapter.SelectCommand.CommandTimeout = 3600;
                adapter.Fill(dataTable);
            }

            watch.Stop();

            var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
            Console.WriteLine("\t\t\t -------------------------------------------------");
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("\t\t\t convert datareader to table done {0} s", totalSeconds.ToString("#.##"));
            Console.ResetColor();
            Console.WriteLine("\t\t\t -------------------------------------------------");
            return dataTable;
        }
    }

    public void WriteToDatabase(IDataReader reader)
    {
        using (var connection = new SqlConnection(this.LocalConnStr))
        {
            var watch = new Stopwatch();
            watch.Start();

            connection.Open();

            using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
            {
                bulkCopy.DestinationTableName = this.TableName;
                bulkCopy.BulkCopyTimeout = 0;
                bulkCopy.WriteToServer(reader);
            }

            connection.Close();

            watch.Stop();

            var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
            Console.WriteLine("\t\t\t -------------------------------------------------");
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
            Console.ResetColor();
            Console.WriteLine("\t\t\t -------------------------------------------------");
        }
    }

    public void WriteToDatabase(DataTable dataTable)
    {
        using (var connection = new SqlConnection(this.LocalConnStr))
        {
            var watch = new Stopwatch();
            watch.Start();

            connection.Open();

            using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
            {
                bulkCopy.DestinationTableName = this.TableName;
                bulkCopy.BulkCopyTimeout = 0;
                bulkCopy.WriteToServer(dataTable);
            }

            connection.Close();

            watch.Stop();

            var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
            Console.WriteLine("\t\t\t -------------------------------------------------");
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
            Console.ResetColor();
            Console.WriteLine("\t\t\t -------------------------------------------------");
        }
    }

}

我终于知道我的代码出了什么问题了。

command.CommandTimeout = 0;

放错地方了。

这里是正确的版本:

using (var command = new SqlCommand(strCommandSql, remoteConn))
            {
                command.CommandTimeout = 0;

                using (var dataReader = command.ExecuteReader())
                {
                    using (var bulkCopy = new SqlBulkCopy(this.LocalConnStr, SqlBulkCopyOptions.TableLock  | SqlBulkCopyOptions.UseInternalTransaction))
                    {
                        bulkCopy.DestinationTableName = this.TableName;
                        bulkCopy.BulkCopyTimeout = 0;
                        bulkCopy.WriteToServer(dataReader);
                        bulkCopy.Close();
                    }
                }
            }