在使用 SqlBulkCopy 传输之前修改列内容
Modify column contents before transfer with SqlBulkCopy
我有一个包可以从 uri 下载 csv,然后通过 sqlBulkCopy 将其传输到数据库。
问题是第一列是 csv 中的 unix 时间戳,但在数据库中是日期时间列。列按位置映射,因此它始终是第一列。如何在批量复制过程中将第一个 csv 列从 unix 时间戳转换为 DateTime?
我的代码:
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.IO;
#endregion
namespace ST_60ba88432cf3439292163e13136c72ea
{
/// <summary>
/// ScriptMain is the entry point class of the script. Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region Help: Using Integration Services variables and parameters in a script
/* To use a variable in this script, first ensure that the variable has been added to
* either the list contained in the ReadOnlyVariables property or the list contained in
* the ReadWriteVariables property of this script task, according to whether or not your
* code needs to write to the variable. To add the variable, save this script, close this instance of
* Visual Studio, and update the ReadOnlyVariables and
* ReadWriteVariables properties in the Script Transformation Editor window.
* To use a parameter in this script, follow the same steps. Parameters are always read-only.
*
* Example of reading from a variable:
* DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
*
* Example of writing to a variable:
* Dts.Variables["User::myStringVariable"].Value = "new value";
*
* Example of reading from a package parameter:
* int batchId = (int) Dts.Variables["$Package::batchId"].Value;
*
* Example of reading from a project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].Value;
*
* Example of reading from a sensitive project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
* */
#endregion
#region Help: Firing Integration Services events from a script
/* This script task can fire events for logging purposes.
*
* Example of firing an error event:
* Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
*
* Example of firing an information event:
* Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
*
* Example of firing a warning event:
* Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
* */
#endregion
#region Help: Using Integration Services connection managers in a script
/* Some types of connection managers can be used in this script task. See the topic
* "Working with Connection Managers Programatically" for details.
*
* Example of using an ADO.Net connection manager:
* object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
* SqlConnection myADONETConnection = (SqlConnection)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
*
* Example of using a File connection manager
* object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
* string filePath = (string)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
* */
#endregion
/// <summary>
/// This method is called when this script task executes in the control flow.
/// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
/// To open Help, press F1.
/// </summary>
public void Main()
{
// TODO: Add your code here
string connectionString = Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString();
string FullFilePath = Dts.Variables["User::FullFilePath"].Value.ToString();
string Destination = "[dbo].[" + Dts.Variables["User::DestinationTableName"].Value.ToString() + "]";
GetDataTabletFromCSVFile(FullFilePath, connectionString, Destination);
Dts.TaskResult = (int)ScriptResults.Success;
}
DataTable GetDataTabletFromCSVFile(string csv_file_path, string connname, string destname)
{
bool fireAgain = true;
Dts.Events.FireInformation(3, "Create DataTable", "Creation has started", "", 0, ref fireAgain);
DataTable csvData = new DataTable();
Dts.Events.FireInformation(3, "Create DataTable", "Creation has completed", "", 0, ref fireAgain);
try
{
Dts.Events.FireInformation(3, "Opening file", "File: " + Dts.Variables["User::FullFilePath"].Value.ToString(), "", 0, ref fireAgain);
using (TextReader csvReader = File.OpenText(csv_file_path))
{
string line;
while ((line = csvReader.ReadLine()) != null)
{
string[] items = line.Trim().Split(',');
if (csvData.Columns.Count == 0)
{
csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
for (int i = 1; i < items.Length; i++)
{
csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
}
}
else
{
csvData.Rows.Add(items);
}
}
}
Dts.Events.FireInformation(3, "Creating SQLConnection", "String: " + Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString(), "", 0, ref fireAgain);
using (SqlConnection dbConnection = new SqlConnection(connname))
{
Dts.Events.FireInformation(3, "Open dbConnection", "connecting", "", 0, ref fireAgain);
dbConnection.Open();
Dts.Events.FireInformation(3, "Copying Data", "Copying", "", 0, ref fireAgain);
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = destname;
s.WriteToServer(csvData);
}
}
}
catch (Exception ex)
{
// Logging why Upload failed
Dts.Events.FireError(0, "Upload File", "Upload failed: " + ex.Message, string.Empty, 0);
// Quit Script Task unsuccesful
Dts.TaskResult = (int)ScriptResults.Failure;
}
return csvData;
}
#region ScriptResults declaration
/// <summary>
/// This enum provides a convenient shorthand within the scope of this class for setting the
/// result of the script.
///
/// This code was generated automatically.
/// </summary>
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
您需要在 SSIS 包中添加一个额外的步骤。将源作为 VARCHAR 值读取并包含一个派生列任务,然后再将其写入数据库。
在派生列任务中,您可以编写代码以在不同格式之间进行转换。
利用以下 SO post 将 unix 时间戳转换为日期时间。
public static DateTime UnixTimeStampToDateTime( double unixTimeStamp )
{
// Unix timestamp is seconds past epoch
System.DateTime dtDateTime = new DateTime(1970,1,1,0,0,0,0,System.DateTimeKind.Utc);
dtDateTime = dtDateTime.AddSeconds( unixTimeStamp ).ToLocalTime();
return dtDateTime;
}
现在,您需要在代码中调用此函数
if (csvData.Columns.Count == 0)
{
csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
for (int i = 1; i < items.Length; i++)
{
csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
}
}
else
{
items[0] = UnixTimeStampToDateTime(items[0]); //call function
csvData.Rows.Add(items);
}
我有一个包可以从 uri 下载 csv,然后通过 sqlBulkCopy 将其传输到数据库。 问题是第一列是 csv 中的 unix 时间戳,但在数据库中是日期时间列。列按位置映射,因此它始终是第一列。如何在批量复制过程中将第一个 csv 列从 unix 时间戳转换为 DateTime?
我的代码:
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.IO;
#endregion
namespace ST_60ba88432cf3439292163e13136c72ea
{
/// <summary>
/// ScriptMain is the entry point class of the script. Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
#region Help: Using Integration Services variables and parameters in a script
/* To use a variable in this script, first ensure that the variable has been added to
* either the list contained in the ReadOnlyVariables property or the list contained in
* the ReadWriteVariables property of this script task, according to whether or not your
* code needs to write to the variable. To add the variable, save this script, close this instance of
* Visual Studio, and update the ReadOnlyVariables and
* ReadWriteVariables properties in the Script Transformation Editor window.
* To use a parameter in this script, follow the same steps. Parameters are always read-only.
*
* Example of reading from a variable:
* DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
*
* Example of writing to a variable:
* Dts.Variables["User::myStringVariable"].Value = "new value";
*
* Example of reading from a package parameter:
* int batchId = (int) Dts.Variables["$Package::batchId"].Value;
*
* Example of reading from a project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].Value;
*
* Example of reading from a sensitive project parameter:
* int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
* */
#endregion
#region Help: Firing Integration Services events from a script
/* This script task can fire events for logging purposes.
*
* Example of firing an error event:
* Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
*
* Example of firing an information event:
* Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
*
* Example of firing a warning event:
* Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
* */
#endregion
#region Help: Using Integration Services connection managers in a script
/* Some types of connection managers can be used in this script task. See the topic
* "Working with Connection Managers Programatically" for details.
*
* Example of using an ADO.Net connection manager:
* object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
* SqlConnection myADONETConnection = (SqlConnection)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
*
* Example of using a File connection manager
* object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
* string filePath = (string)rawConnection;
* //Use the connection in some code here, then release the connection
* Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
* */
#endregion
/// <summary>
/// This method is called when this script task executes in the control flow.
/// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
/// To open Help, press F1.
/// </summary>
public void Main()
{
// TODO: Add your code here
string connectionString = Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString();
string FullFilePath = Dts.Variables["User::FullFilePath"].Value.ToString();
string Destination = "[dbo].[" + Dts.Variables["User::DestinationTableName"].Value.ToString() + "]";
GetDataTabletFromCSVFile(FullFilePath, connectionString, Destination);
Dts.TaskResult = (int)ScriptResults.Success;
}
DataTable GetDataTabletFromCSVFile(string csv_file_path, string connname, string destname)
{
bool fireAgain = true;
Dts.Events.FireInformation(3, "Create DataTable", "Creation has started", "", 0, ref fireAgain);
DataTable csvData = new DataTable();
Dts.Events.FireInformation(3, "Create DataTable", "Creation has completed", "", 0, ref fireAgain);
try
{
Dts.Events.FireInformation(3, "Opening file", "File: " + Dts.Variables["User::FullFilePath"].Value.ToString(), "", 0, ref fireAgain);
using (TextReader csvReader = File.OpenText(csv_file_path))
{
string line;
while ((line = csvReader.ReadLine()) != null)
{
string[] items = line.Trim().Split(',');
if (csvData.Columns.Count == 0)
{
csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
for (int i = 1; i < items.Length; i++)
{
csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
}
}
else
{
csvData.Rows.Add(items);
}
}
}
Dts.Events.FireInformation(3, "Creating SQLConnection", "String: " + Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString(), "", 0, ref fireAgain);
using (SqlConnection dbConnection = new SqlConnection(connname))
{
Dts.Events.FireInformation(3, "Open dbConnection", "connecting", "", 0, ref fireAgain);
dbConnection.Open();
Dts.Events.FireInformation(3, "Copying Data", "Copying", "", 0, ref fireAgain);
using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
{
s.DestinationTableName = destname;
s.WriteToServer(csvData);
}
}
}
catch (Exception ex)
{
// Logging why Upload failed
Dts.Events.FireError(0, "Upload File", "Upload failed: " + ex.Message, string.Empty, 0);
// Quit Script Task unsuccesful
Dts.TaskResult = (int)ScriptResults.Failure;
}
return csvData;
}
#region ScriptResults declaration
/// <summary>
/// This enum provides a convenient shorthand within the scope of this class for setting the
/// result of the script.
///
/// This code was generated automatically.
/// </summary>
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
您需要在 SSIS 包中添加一个额外的步骤。将源作为 VARCHAR 值读取并包含一个派生列任务,然后再将其写入数据库。
在派生列任务中,您可以编写代码以在不同格式之间进行转换。
利用以下 SO post 将 unix 时间戳转换为日期时间。
public static DateTime UnixTimeStampToDateTime( double unixTimeStamp )
{
// Unix timestamp is seconds past epoch
System.DateTime dtDateTime = new DateTime(1970,1,1,0,0,0,0,System.DateTimeKind.Utc);
dtDateTime = dtDateTime.AddSeconds( unixTimeStamp ).ToLocalTime();
return dtDateTime;
}
现在,您需要在代码中调用此函数
if (csvData.Columns.Count == 0)
{
csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
for (int i = 1; i < items.Length; i++)
{
csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
}
}
else
{
items[0] = UnixTimeStampToDateTime(items[0]); //call function
csvData.Rows.Add(items);
}