SSIS 违反主键约束错误,列由 HASHBYTES SHA2-256 函数生成

SSIS Violation of PRIMARY Key Constraint Error with Column Generated by HASHBYTES SHA2-256 Function

我遇到了一个超出我的谷歌搜索能力的问题,如果能得到任何帮助,我将不胜感激。

我创建了一个非常简单的 SSIS 包,用于使用 OLEDB 连接将数据从 Azure SQL 数据库中的源 table 复制到 MS SQL 数据库中的目标.目标 table 是根据源数据库生成的脚本创建的(在 SSMS 中,我右键单击数据库名称 -> 任务 -> 生成脚本),因此它们应该是相同的。 SSIS 包不执行任何 t运行sformations——它是 Target table 的简单截断,然后是 OleDBSource 连接到 OleDBDestination 的数据流任务。

但是,我在执行程序包时遇到以下错误:

Hresult: 0x80004005  Description: "Violation of PRIMARY KEY constraint 'PK_HashKey'. Cannot insert
duplicate key in object 'TargetTable'. The duplicate key value is (                          ).".

有问题的主键列的数据类型为 char(32),是使用 'SHA2_256' 算法使用 HASHBYTES 函数生成的。错误消息给出了一个错误的主键示例,基本上是一系列空格。当我查看 Source table 时,它看起来确实有多行,其中主键只是一系列空格。但我猜 Azure SQL 可以以某种方式区分它们,因为 Source table 上没有主键问题。只有当我尝试将数据复制到本地 MS SQL 服务器数据库时,我才遇到主键冲突。

我尝试过的事情:

  1. 已检查两个源上的归类设置是否相同 和目标 table 列、数据库和服务器。

  2. 检查了 SSIS 数据流源的高级编辑器和 确保代码页相同的目标。

  3. 删除了 Target table 上的主键约束,然后移动 使用 SSIS 的数据,然后 运行 查询来比较哈希 t运行sferred 由 SSIS 与完全生成新哈希。约8% table, SQL服务器没有想到t运行sferred hash 匹配新的散列,尽管散列通常看起来很相似。 这是查询:

SELECT CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey)), Hashkey, BusinessKey 
FROM TargetTable 
WHERE CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey)) <> HashKey

根据#3,我的猜测是发生了以下情况之一:

  1. SSIS 在从 从源到目标。

  2. Target column/table/database 有一些设置 以不同于源的方式存储 char(32) 数据 column/table/database.

  3. 某种错误。

有人有任何经验可以帮助阐明这个问题吗?

这个

CONVERT(CHAR(32), HASHBYTES('SHA2_256', BusinessKey))

会出问题。它采用 32 字节散列并将其存储为某种 varchar 排序规则中的 32 个代码点。许多代码点是非打印字符,因此检查这些值将一直是一个问题。

而且我不相信 SSIS 或任何其他外部程序会往返甚至显示此类值。许多编程环境使用以 null 结尾的字符串,其中字节 0x00 表示字符串的结尾。并且您的哈希将有大约 12% 的时间具有 0x00 代码点。 P=1-(255/256)^32

将哈希值存储为 BINARY(32) 或 Base64 编码字符串,您的问题就会消失。

这是从我正在处理的示例中提取的一个小实用程序,可能对您有用。我用存储在具有唯一约束的 char(32) 中的 200k 行 table 上的哈希对其进行了测试,它工作正常。

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Text;

namespace Microsoft.Samples.SqlServerDataMover
{
    public class Program
    {

        public static void Main(string[] args)
        {
            var srcConnectionString = args[0];
            var destConnectionString = args[1];

            using (var src = new SqlConnection(srcConnectionString))
            using (var dest = new SqlConnection(destConnectionString))
            {
                src.Open();
                dest.Open();

                var cmdTables = src.CreateCommand();
                cmdTables.CommandText = "select schema_name(schema_id) schema_name, name table_name from sys.tables where type_desc = 'USER_TABLE'";

                var tables = new List<(string, string)>();
                using (var rdr = cmdTables.ExecuteReader())
                {
                    while (rdr.Read())
                    {
                        var schema = rdr.GetString(0);
                        var table = rdr.GetString(1);
                        tables.Add((schema, table));
                    }
                }

                foreach (var t in tables)
                {
                    CopyTable(src, dest, t.Item2, t.Item1);
                }

            }
        }
        static string QuoteName(string identifier)
        {
            var sb = new StringBuilder(identifier.Length + 3, 1024);
            sb.Append('[');
            foreach (var c in identifier)
            {
                if (c == ']')
                    sb.Append(']');
                sb.Append(c);
            }
            sb.Append(']');
            return sb.ToString();

        }

        public static void CopyTable(SqlConnection src, SqlConnection dest, string tableName, string schemaName, int batchSize = 10000)
        {
            var opts = SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.TableLock;

            var sql = $"select * from {QuoteName(schemaName)}.{QuoteName(tableName)}";
            var cmd = src.CreateCommand();
            cmd.CommandTimeout = 0;
            cmd.CommandText = sql;

            using (var rdr = cmd.ExecuteReader())
            using (var bc = new SqlBulkCopy(dest, opts, null))
            {
                var schemaDt = rdr.GetSchemaTable();
                //schemaDt.WriteXml(Console.Out);

                var schema = new Dictionary<string, DataRow>();
                foreach (DataRow r in schemaDt.Rows)
                {
                    schema.Add(r[0].ToString(), r);
                }


                bc.BatchSize = batchSize;
                for (int i = 0; i < rdr.FieldCount; i++)
                {
                    var cn = rdr.GetName(i);
                    bool isreadonly = schema[cn].Field<bool>("IsReadOnly");

                    if (!isreadonly)
                    {
                        bc.ColumnMappings.Add(i, cn);
                    }

                }

                bc.NotifyAfter = 10000;

                bc.SqlRowsCopied += (s, a) =>
                {
                    Console.WriteLine($"[{schemaName}].[{tableName}] {a.RowsCopied} Rows Copied.");
                };

                bc.DestinationTableName = $"[{schemaName}].[{tableName}]";
                bc.WriteToServer(rdr);
                Console.WriteLine($"[{schemaName}].[{tableName}] Complete. {1} Rows Copied.");
            }

        }
    }
}

只需将两个连接字符串传递给 运行 它,首先是源,然后是目标。

例如

PS C:\temp\datamover> .\bin\debug\DataMover.exe "Server=xxxxxx.database.windows.net;database=adventureworks;User ID=xxxxxx;Password=xxxxxx" "Server=localhost;database=awcopy;integrated security=true"