在使用 SqlBulkCopy 传输之前修改列内容

Modify column contents before transfer with SqlBulkCopy

我有一个包可以从 uri 下载 csv,然后通过 sqlBulkCopy 将其传输到数据库。 问题是第一列是 csv 中的 unix 时间戳,但在数据库中是日期时间列。列按位置映射,因此它始终是第一列。如何在批量复制过程中将第一个 csv 列从 unix 时间戳转换为 DateTime?

我的代码:

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.IO;
#endregion

namespace ST_60ba88432cf3439292163e13136c72ea
{
   /// <summary>
   /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
   /// or parent of this class.
   /// </summary>
   [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
   public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
   {
       #region Help:  Using Integration Services variables and parameters in a script
       /* To use a variable in this script, first ensure that the variable has been added to 
        * either the list contained in the ReadOnlyVariables property or the list contained in 
        * the ReadWriteVariables property of this script task, according to whether or not your
        * code needs to write to the variable.  To add the variable, save this script, close this instance of
        * Visual Studio, and update the ReadOnlyVariables and 
        * ReadWriteVariables properties in the Script Transformation Editor window.
        * To use a parameter in this script, follow the same steps. Parameters are always read-only.
        * 
        * Example of reading from a variable:
        *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
        * 
        * Example of writing to a variable:
        *  Dts.Variables["User::myStringVariable"].Value = "new value";
        * 
        * Example of reading from a package parameter:
        *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
        *  
        * Example of reading from a project parameter:
        *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
        * 
        * Example of reading from a sensitive project parameter:
        *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
        * */

       #endregion

       #region Help:  Firing Integration Services events from a script
       /* This script task can fire events for logging purposes.
        * 
        * Example of firing an error event:
        *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
        * 
        * Example of firing an information event:
        *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
        * 
        * Example of firing a warning event:
        *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
        * */
       #endregion

       #region Help:  Using Integration Services connection managers in a script
       /* Some types of connection managers can be used in this script task.  See the topic 
        * "Working with Connection Managers Programatically" for details.
        * 
        * Example of using an ADO.Net connection manager:
        *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
        *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
        *  //Use the connection in some code here, then release the connection
        *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
        *
        * Example of using a File connection manager
        *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
        *  string filePath = (string)rawConnection;
        *  //Use the connection in some code here, then release the connection
        *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
        * */
       #endregion


       /// <summary>
       /// This method is called when this script task executes in the control flow.
       /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
       /// To open Help, press F1.
       /// </summary>
       public void Main()
       {
           // TODO: Add your code here
           string connectionString = Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString();
           string FullFilePath = Dts.Variables["User::FullFilePath"].Value.ToString();
           string Destination = "[dbo].[" + Dts.Variables["User::DestinationTableName"].Value.ToString() + "]";
           GetDataTabletFromCSVFile(FullFilePath, connectionString, Destination);
           Dts.TaskResult = (int)ScriptResults.Success;
       }
        DataTable GetDataTabletFromCSVFile(string csv_file_path, string connname, string destname)
       {
           bool fireAgain = true;
           Dts.Events.FireInformation(3, "Create DataTable", "Creation has started", "", 0, ref fireAgain);
           DataTable csvData = new DataTable();
           Dts.Events.FireInformation(3, "Create DataTable", "Creation has completed", "", 0, ref fireAgain);
           try
           {
               Dts.Events.FireInformation(3, "Opening file", "File: " + Dts.Variables["User::FullFilePath"].Value.ToString(), "", 0, ref fireAgain);
               using (TextReader csvReader = File.OpenText(csv_file_path))
               {
                   string line;
                   while ((line = csvReader.ReadLine()) != null)
                   {
                       string[] items = line.Trim().Split(',');
                       if (csvData.Columns.Count == 0)
                       {
                           csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
                           for (int i = 1; i < items.Length; i++)
                           {
                               csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
                           }
                       }
                       else
                       {
                           csvData.Rows.Add(items);
                       }
                   }
               }
               Dts.Events.FireInformation(3, "Creating SQLConnection", "String: " + Dts.Variables["User::SqlBulkCopyConnection"].Value.ToString(), "", 0, ref fireAgain);
               using (SqlConnection dbConnection = new SqlConnection(connname))
               {
                   Dts.Events.FireInformation(3, "Open dbConnection", "connecting", "", 0, ref fireAgain);
                   dbConnection.Open();
                   Dts.Events.FireInformation(3, "Copying Data", "Copying", "", 0, ref fireAgain);
                   using (SqlBulkCopy s = new SqlBulkCopy(dbConnection))
                   {
                       s.DestinationTableName = destname;
                       s.WriteToServer(csvData);
                   }
               }
           }
           catch (Exception ex)
           {
               // Logging why Upload failed
               Dts.Events.FireError(0, "Upload File", "Upload failed: " + ex.Message, string.Empty, 0);

               // Quit Script Task unsuccesful
               Dts.TaskResult = (int)ScriptResults.Failure;
           }
           return csvData;
       }

       #region ScriptResults declaration
       /// <summary>
       /// This enum provides a convenient shorthand within the scope of this class for setting the
       /// result of the script.
       /// 
       /// This code was generated automatically.
       /// </summary>
       enum ScriptResults
       {
           Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
           Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
       };
       #endregion

   }
}

您需要在 SSIS 包中添加一个额外的步骤。将源作为 VARCHAR 值读取并包含一个派生列任务,然后再将其写入数据库。

在派生列任务中,您可以编写代码以在不同格式之间进行转换。

利用以下 SO post 将 unix 时间戳转换为日期时间。

public static DateTime UnixTimeStampToDateTime( double unixTimeStamp )
{
    // Unix timestamp is seconds past epoch
    System.DateTime dtDateTime = new DateTime(1970,1,1,0,0,0,0,System.DateTimeKind.Utc);
    dtDateTime = dtDateTime.AddSeconds( unixTimeStamp ).ToLocalTime();
    return dtDateTime;
}

现在,您需要在代码中调用此函数

 if (csvData.Columns.Count == 0)
                       {
                           csvData.Columns.Add(new DataColumn(items[0], typeof(DateTime)));
                           for (int i = 1; i < items.Length; i++)
                           {
                               csvData.Columns.Add(new DataColumn(items[i], typeof(float)));
                           }
                       }
                       else
                       {
                           items[0] = UnixTimeStampToDateTime(items[0]); //call function
                           csvData.Rows.Add(items);
                       }