多线程sqldatareader时间过期
multithreading sqldatareader time expired
场景如下:
我想从服务器 A 向服务器 B 插入数据 table。
数据量很大,我会用SqlBulkCopy来完成。
这是我的想法:
producer: get all page number
consumer: get page number, paging data from server A and write
datareader to server
我遇到的问题:
当运行时间长了,会抛出异常:
Timeout expired. The timeout period elapsed prior to completion of the operation
[SqlException (0x80131904): Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.] System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection) +1948826
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection) +4844747
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj) +194
System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj) +2392
System.Data.SqlClient.SqlDataReader.ConsumeMetaData() +33
System.Data.SqlClient.SqlDataReader.get_MetaData() +83
System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString) +297
System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async) +954
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, DbAsyncResult result) +162
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method) +32
System.Data.SqlClient.SqlCommand.ExecuteReader(CommandBehavior behavior, String method) +141
System.Data.SqlClient.SqlCommand.ExecuteDbDataReader(CommandBehavior behavior) +12
System.Data.Common.DbCommand.ExecuteReader() +12
System.Data.Linq.SqlClient.SqlProvider.Execute(Expression query, QueryInfo queryInfo, IObjectReaderFactory factory, Object[] parentArgs, Object[] userArgs, ICompiledSubQuery[] subQueries, Object lastResult) +332
我找不到我的代码有什么问题。
这是我的代码:
Program.cs
class Program
{
static BlockingCollection<int> pcCollection = new BlockingCollection<int>();
static string strRemoteConn = ConfigurationManager.AppSettings["RemoteConnStr"];
static string strLocalConn = ConfigurationManager.AppSettings["LocalConnStr"];
static string strCommandSql = ConfigurationManager.AppSettings["CommandSQL"];
static string strTableName = ConfigurationManager.AppSettings["TableName"];
static int batchSize = Int32.Parse(ConfigurationManager.AppSettings["CommitBatchSize"]);
static int taskCount = Int32.Parse(ConfigurationManager.AppSettings["TaskCount"]);
static object s_consumer = new object();
static void Main(string[] args)
{
try
{
var watch = Stopwatch.StartNew();
var tableCount = 0D;
using (var connection = new SqlConnection(strRemoteConn))
using (SqlCommand cmd = connection.CreateCommand())
{
connection.Open();
cmd.CommandText = string.Format(@"SELECT
Total_Rows= SUM(st.row_count)
FROM
sys.dm_db_partition_stats st
WHERE
object_name(object_id) = '{0}' AND (index_id < 2)", strTableName);
cmd.CommandTimeout = 300;
tableCount = Double.Parse(cmd.ExecuteScalar().ToString());
}
var totalPages = (int)Math.Ceiling(tableCount / batchSize);
var listPageRn = Enumerable.Range(1, totalPages);
var listPartPage = listPageRn.Split(taskCount).ToList();
var listProducerTask = new List<Task>();
var listConsumerTask = new List<Task>();
var consumerTask = taskCount;
for (int i = 1; i <= consumerTask; i++)
{
var taskFlag = i;
var consumer = Task.Factory.StartNew(() =>
{
ConsumerAction(taskFlag.ToString());
}, TaskCreationOptions.LongRunning);
listConsumerTask.Add(consumer);
}
var producerTaskIndex = 1;
foreach (var item in listPartPage)
{
var tmpIndex = producerTaskIndex.ToString();
var producer = Task.Factory.StartNew(() =>
{
ProducerAction(item, tmpIndex);
});
listProducerTask.Add(producer);
producerTaskIndex++;
}
Task.WaitAll(listProducerTask.ToArray());
pcCollection.CompleteAdding();
Task.WaitAll(listConsumerTask.ToArray());
watch.Stop();
var mins = watch.ElapsedMilliseconds / 1000 / 60;
Console.WriteLine("All Batch Insert Time Elapsed:\t {0} mins", mins);
}
catch (AggregateException ex)
{
using (StreamWriter writer = File.AppendText("BatchError.txt"))
{
writer.WriteLine("Error Time: {0}", DateTime.Now);
foreach (var exception in ex.InnerExceptions)
{
writer.WriteLine("Error: {0}", exception.Message);
writer.WriteLine("Source: {0}", exception.Source);
writer.WriteLine("Track: {0}", exception.StackTrace);
}
}
throw;
}
catch (Exception ex)
{
using (StreamWriter writer = File.AppendText("BatchError.txt"))
{
writer.WriteLine("Error Time: {0}", DateTime.Now);
writer.WriteLine("Error: {0}", ex.Message);
writer.WriteLine("Source: {0}", ex.Source);
writer.WriteLine("Track: {0}", ex.StackTrace);
}
throw;
}
Console.ReadLine();
}
static void ProducerAction(IEnumerable<int> source, string taskFlag = "1")
{
foreach (var item in source)
{
Console.WriteLine("Producer-{0} processing item batch {1}", taskFlag, item);
pcCollection.Add(item);
}
}
static void ConsumerAction(string taskFlag = "")
{
foreach (var item in pcCollection.GetConsumingEnumerable())
{
Console.WriteLine("consumer-{0} processing item", taskFlag);
var processing = new ManageBatchProcessing
{
LocalConnStr = strLocalConn,
RemoteConnStr = strRemoteConn,
BatchSize = batchSize,
TableName = strTableName,
CommandSql = strCommandSql
};
processing.ProcessDatabase(item);
}
}
ManageBatchProcessing.cs
public class ManageBatchProcessing
{
public string LocalConnStr { get; set; }
public string RemoteConnStr { get; set; }
public string CommandSql { get; set; }
public int BatchSize { get; set; }
public string TableName { get; set; }
public void ProcessDatabase(int item)
{
var watch = new Stopwatch();
watch.Start();
var start = (item - 1) * this.BatchSize + 1;
var end = item * this.BatchSize;
var strCommandSql = string.Format(this.CommandSql, start, end);
using (var remoteConn = new SqlConnection(this.RemoteConnStr))
using (var localConn = new SqlConnection(this.LocalConnStr))
{
remoteConn.Open();
localConn.Open();
using (var command = new SqlCommand(strCommandSql, remoteConn))
using (var dataReader = command.ExecuteReader())
{
command.CommandTimeout = 0;
using (var bulkCopy = new SqlBulkCopy(localConn))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataReader);
bulkCopy.Close();
}
}
remoteConn.Close();
localConn.Close();
}
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
public DataTable RetriveToDatabase(int item)
{
var start = (item - 1) * this.BatchSize + 1;
var end = item * this.BatchSize;
var dataTable = new DataTable();
using (var connection = new SqlConnection(this.RemoteConnStr))
{
var watch = new Stopwatch();
watch.Start();
using (var adapter = new SqlDataAdapter(string.Format(this.CommandSql, start, end), connection))
{
adapter.SelectCommand.CommandTimeout = 3600;
adapter.Fill(dataTable);
}
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("\t\t\t convert datareader to table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
return dataTable;
}
}
public void WriteToDatabase(IDataReader reader)
{
using (var connection = new SqlConnection(this.LocalConnStr))
{
var watch = new Stopwatch();
watch.Start();
connection.Open();
using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(reader);
}
connection.Close();
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
}
public void WriteToDatabase(DataTable dataTable)
{
using (var connection = new SqlConnection(this.LocalConnStr))
{
var watch = new Stopwatch();
watch.Start();
connection.Open();
using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataTable);
}
connection.Close();
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
}
}
我终于知道我的代码出了什么问题了。
command.CommandTimeout = 0;
放错地方了。
这里是正确的版本:
using (var command = new SqlCommand(strCommandSql, remoteConn))
{
command.CommandTimeout = 0;
using (var dataReader = command.ExecuteReader())
{
using (var bulkCopy = new SqlBulkCopy(this.LocalConnStr, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.UseInternalTransaction))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataReader);
bulkCopy.Close();
}
}
}
场景如下: 我想从服务器 A 向服务器 B 插入数据 table。
数据量很大,我会用SqlBulkCopy来完成。
这是我的想法:
producer: get all page number
consumer: get page number, paging data from server A and write datareader to server
我遇到的问题:
当运行时间长了,会抛出异常:
Timeout expired. The timeout period elapsed prior to completion of the operation
[SqlException (0x80131904): Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.] System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection) +1948826
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection) +4844747
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj) +194
System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj) +2392
System.Data.SqlClient.SqlDataReader.ConsumeMetaData() +33
System.Data.SqlClient.SqlDataReader.get_MetaData() +83
System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString) +297
System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async) +954
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, DbAsyncResult result) +162
System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method) +32
System.Data.SqlClient.SqlCommand.ExecuteReader(CommandBehavior behavior, String method) +141
System.Data.SqlClient.SqlCommand.ExecuteDbDataReader(CommandBehavior behavior) +12
System.Data.Common.DbCommand.ExecuteReader() +12
System.Data.Linq.SqlClient.SqlProvider.Execute(Expression query, QueryInfo queryInfo, IObjectReaderFactory factory, Object[] parentArgs, Object[] userArgs, ICompiledSubQuery[] subQueries, Object lastResult) +332
我找不到我的代码有什么问题。
这是我的代码:
Program.cs
class Program
{
static BlockingCollection<int> pcCollection = new BlockingCollection<int>();
static string strRemoteConn = ConfigurationManager.AppSettings["RemoteConnStr"];
static string strLocalConn = ConfigurationManager.AppSettings["LocalConnStr"];
static string strCommandSql = ConfigurationManager.AppSettings["CommandSQL"];
static string strTableName = ConfigurationManager.AppSettings["TableName"];
static int batchSize = Int32.Parse(ConfigurationManager.AppSettings["CommitBatchSize"]);
static int taskCount = Int32.Parse(ConfigurationManager.AppSettings["TaskCount"]);
static object s_consumer = new object();
static void Main(string[] args)
{
try
{
var watch = Stopwatch.StartNew();
var tableCount = 0D;
using (var connection = new SqlConnection(strRemoteConn))
using (SqlCommand cmd = connection.CreateCommand())
{
connection.Open();
cmd.CommandText = string.Format(@"SELECT
Total_Rows= SUM(st.row_count)
FROM
sys.dm_db_partition_stats st
WHERE
object_name(object_id) = '{0}' AND (index_id < 2)", strTableName);
cmd.CommandTimeout = 300;
tableCount = Double.Parse(cmd.ExecuteScalar().ToString());
}
var totalPages = (int)Math.Ceiling(tableCount / batchSize);
var listPageRn = Enumerable.Range(1, totalPages);
var listPartPage = listPageRn.Split(taskCount).ToList();
var listProducerTask = new List<Task>();
var listConsumerTask = new List<Task>();
var consumerTask = taskCount;
for (int i = 1; i <= consumerTask; i++)
{
var taskFlag = i;
var consumer = Task.Factory.StartNew(() =>
{
ConsumerAction(taskFlag.ToString());
}, TaskCreationOptions.LongRunning);
listConsumerTask.Add(consumer);
}
var producerTaskIndex = 1;
foreach (var item in listPartPage)
{
var tmpIndex = producerTaskIndex.ToString();
var producer = Task.Factory.StartNew(() =>
{
ProducerAction(item, tmpIndex);
});
listProducerTask.Add(producer);
producerTaskIndex++;
}
Task.WaitAll(listProducerTask.ToArray());
pcCollection.CompleteAdding();
Task.WaitAll(listConsumerTask.ToArray());
watch.Stop();
var mins = watch.ElapsedMilliseconds / 1000 / 60;
Console.WriteLine("All Batch Insert Time Elapsed:\t {0} mins", mins);
}
catch (AggregateException ex)
{
using (StreamWriter writer = File.AppendText("BatchError.txt"))
{
writer.WriteLine("Error Time: {0}", DateTime.Now);
foreach (var exception in ex.InnerExceptions)
{
writer.WriteLine("Error: {0}", exception.Message);
writer.WriteLine("Source: {0}", exception.Source);
writer.WriteLine("Track: {0}", exception.StackTrace);
}
}
throw;
}
catch (Exception ex)
{
using (StreamWriter writer = File.AppendText("BatchError.txt"))
{
writer.WriteLine("Error Time: {0}", DateTime.Now);
writer.WriteLine("Error: {0}", ex.Message);
writer.WriteLine("Source: {0}", ex.Source);
writer.WriteLine("Track: {0}", ex.StackTrace);
}
throw;
}
Console.ReadLine();
}
static void ProducerAction(IEnumerable<int> source, string taskFlag = "1")
{
foreach (var item in source)
{
Console.WriteLine("Producer-{0} processing item batch {1}", taskFlag, item);
pcCollection.Add(item);
}
}
static void ConsumerAction(string taskFlag = "")
{
foreach (var item in pcCollection.GetConsumingEnumerable())
{
Console.WriteLine("consumer-{0} processing item", taskFlag);
var processing = new ManageBatchProcessing
{
LocalConnStr = strLocalConn,
RemoteConnStr = strRemoteConn,
BatchSize = batchSize,
TableName = strTableName,
CommandSql = strCommandSql
};
processing.ProcessDatabase(item);
}
}
ManageBatchProcessing.cs
public class ManageBatchProcessing
{
public string LocalConnStr { get; set; }
public string RemoteConnStr { get; set; }
public string CommandSql { get; set; }
public int BatchSize { get; set; }
public string TableName { get; set; }
public void ProcessDatabase(int item)
{
var watch = new Stopwatch();
watch.Start();
var start = (item - 1) * this.BatchSize + 1;
var end = item * this.BatchSize;
var strCommandSql = string.Format(this.CommandSql, start, end);
using (var remoteConn = new SqlConnection(this.RemoteConnStr))
using (var localConn = new SqlConnection(this.LocalConnStr))
{
remoteConn.Open();
localConn.Open();
using (var command = new SqlCommand(strCommandSql, remoteConn))
using (var dataReader = command.ExecuteReader())
{
command.CommandTimeout = 0;
using (var bulkCopy = new SqlBulkCopy(localConn))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataReader);
bulkCopy.Close();
}
}
remoteConn.Close();
localConn.Close();
}
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
public DataTable RetriveToDatabase(int item)
{
var start = (item - 1) * this.BatchSize + 1;
var end = item * this.BatchSize;
var dataTable = new DataTable();
using (var connection = new SqlConnection(this.RemoteConnStr))
{
var watch = new Stopwatch();
watch.Start();
using (var adapter = new SqlDataAdapter(string.Format(this.CommandSql, start, end), connection))
{
adapter.SelectCommand.CommandTimeout = 3600;
adapter.Fill(dataTable);
}
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("\t\t\t convert datareader to table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
return dataTable;
}
}
public void WriteToDatabase(IDataReader reader)
{
using (var connection = new SqlConnection(this.LocalConnStr))
{
var watch = new Stopwatch();
watch.Start();
connection.Open();
using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(reader);
}
connection.Close();
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
}
public void WriteToDatabase(DataTable dataTable)
{
using (var connection = new SqlConnection(this.LocalConnStr))
{
var watch = new Stopwatch();
watch.Start();
connection.Open();
using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataTable);
}
connection.Close();
watch.Stop();
var totalSeconds = (double)watch.ElapsedMilliseconds / 1000;
Console.WriteLine("\t\t\t -------------------------------------------------");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t\t\t insert target table done {0} s", totalSeconds.ToString("#.##"));
Console.ResetColor();
Console.WriteLine("\t\t\t -------------------------------------------------");
}
}
}
我终于知道我的代码出了什么问题了。
command.CommandTimeout = 0;
放错地方了。
这里是正确的版本:
using (var command = new SqlCommand(strCommandSql, remoteConn))
{
command.CommandTimeout = 0;
using (var dataReader = command.ExecuteReader())
{
using (var bulkCopy = new SqlBulkCopy(this.LocalConnStr, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.UseInternalTransaction))
{
bulkCopy.DestinationTableName = this.TableName;
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(dataReader);
bulkCopy.Close();
}
}
}