ETL:跨具有不同字段类型的不同来源进行连接

ETL: doing a join across different sources with different field type

对于跨多个源执行 ETL 的应用程序,如何处理“值”可能相同但表示方式可能不同的连接。例如,让我们采用以下虚构的场景:

来源 1

来源 2


假设这些来自两个我无权修改的不同来源(例如,一个可能是 Salesforce 数据,另一个是公司数据库)。 ETL 应用程序如何处理可能以不同方式存储字段类型的连接?

  1. 从每个源读取数据
  2. 转换为相同的数据类型
  3. 加入

全部在您选择的 ETL 工具中。问题在哪里,因为我显然错过了它?...

以下是如何在 Informatica 中实现它。

我在很多遗留数据库中都遇到过这个问题。 tables 驻留在不同来源的事实无关紧要,因为我已经看到这种情况发生在相同的模式、不同的模式以及不同的数据库中。

问题有两个方面:可行性和性能。

可行性

我所知道的所有数据库都支持数据类型转换和转换。他们中的一些人在幕后默默地做着,他们往往做错了。例如,Oracle 在这方面是臭名昭著的,因为它倾向于向错误的方向转换。我建议 始终明确地执行此操作

例如 (PostgreSQL):

create table a (product_id int, name varchar(10));
                                             
insert into a (product_id, name) values (909, 'soda');
                                             
create table b (product_id varchar(10), action_type varchar(10));
                                             
insert into b (product_id, action_type) values ('909', 'click');

在 PostgreSQL 中,以下三个查询有效,它们产生相同的结果集(性能不同):

select * from a join b on a.product_id = b.product_id; -- don't do this

select * from a join b on a.product_id = cast(b.product_id as int);

select * from a join b on cast(a.product_id as varchar) = b.product_id;

如果您键入第一个选项,引擎会在您不知情的情况下悄悄将其转换为第二个或第三个查询。这会产生 unexpected/unnecessary 错误,您可能无法正确解决。同样,始终进行显式转换。

性能

当发挥性能时,重要的是要决定哪个 table 是 驱动 table,哪个是 二级table.

如果您决定 a 成为驾驶者 table,那么您可能会做两件事:

  • b端转换,如:

     select * from a join b on a.product_id = cast(b.product_id as int);
    
  • 可选地通过(如果可以的话)在 cast(b.product_id as int) 上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:

     create index ix1 on b ((cast(b.product_id as int)));
    

另一方面,如果您决定 b 是驾驶者 table,那么您可能会这样做:

  • a端转换,如:

     select * from a join b on cast(a.product_id as varchar) = b.product_id;
    
  • 可选地,通过(如果可以的话)在 cast(a.product_id as varchar) 上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:

     create index ix2 on a ((cast(a.product_id as varchar))));
    

要决定哪个选项更好,您需要获得两者的执行计划,阅读估计成本,然后做出决定。有时估计的成本并不那么可靠:它们只是 估计的 ,而不是真实的。在关键情况下,我最终会 运行 两个选项进行比较。

我将采用以下方法:

  • 创建识别列类型的功能
  • 将列类型映射到文本表示形式
  • 转换为保存值所需的最大文本长度
  • 加入数据

我在一个数据库的上下文中有相同的场景并应用了这种方法。这是因为从数据库中查询动态列集并对它们执行各种 SQL 操作的功能。

其中一位运算符是UNPIVOT。 T-SQL语句构建是这样的:

SELECT *
FROM 
(
     SELCET RowID
           ,Col001
           ,Col002
           ...
           ,Col00X
     FROM ...
) DS
UNPIVOT
(
  [value] FOR [column] IN ([Col001], [Col002], ... , [Col00X])
) UNPVT;

问题是 UNPIVOT IN 子句中的所有列必须具有相同的类型。当然,最简单的解决方法是将所有列 CAST/CONVERT 更改为 NVARCHAR(MAX),因为它几乎可以存储所有内容,我们也这样做了,但是查询的执行时间很长。

因此,如果最大的类型可以保留在 VARCHAR(X)NVARCHNAR(X) 中以将列转换为它会更好。我编写了一个简单的 SQL CLR Aggregate 函数,因为我在 table 中拥有所有列名称和类型,并且希望快速清晰地获取目标类型。看起来像这样:

using System;
using System.Data;
using System.Data.Sql;
using Microsoft.SqlServer.Server;
using System.Data.SqlTypes;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Linq;

[Serializable]
[
    Microsoft.SqlServer.Server.SqlUserDefinedAggregate
    (
        Microsoft.SqlServer.Server.Format.UserDefined,
        IsInvariantToNulls = true,
        IsInvariantToDuplicates = false,
        IsInvariantToOrder = false,
        MaxByteSize = -1
    )
]
/// <summary>
/// Returns the data type with highest precedence. The date types comes in the "[system_type_name]" format.
/// </summary>
public class AnalysisCustomRollupsGetHighestDataTypeConverstionValue : Microsoft.SqlServer.Server.IBinarySerialize
{
    private Dictionary<string, KeyValuePair<string, int>> dataTypesMapping;
    private List<KeyValuePair<string, int>> destinationDataTypes;
    private String[] dataTypesWithoutPredifinedLength;

    public void Init()
    {
        // for the following data types the length is extracting from the input value
        dataTypesWithoutPredifinedLength = new string[] {"nvarchar", "nchar", "varchar", "char"};

        // each data type is mapped to its string corresponding value ("-1" is "MAX", "0" is defined by source)
        dataTypesMapping = new Dictionary<string, KeyValuePair<string, int>> {
                                                                                {"user-defined data types", new KeyValuePair<string, int> ("NVARCHAR", -1)},
                                                                                {"sql_variant", new KeyValuePair<string, int> ("NVARCHAR", -1)},
                                                                                {"xml", new KeyValuePair<string, int> ("NVARCHAR", -1)},
                                                                                {"datetimeoffset", new KeyValuePair<string, int> ("VARCHAR", 34)},
                                                                                {"datetime2", new KeyValuePair<string, int> ("VARCHAR", 27)},
                                                                                {"datetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
                                                                                {"smalldatetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
                                                                                {"date", new KeyValuePair<string, int> ("VARCHAR", 19)},
                                                                                {"time", new KeyValuePair<string, int> ("VARCHAR", 16)},
                                                                                {"float", new KeyValuePair<string, int> ("VARCHAR", 48)},
                                                                                {"real", new KeyValuePair<string, int> ("VARCHAR", 48)},
                                                                                {"decimal", new KeyValuePair<string, int> ("VARCHAR", 48)},
                                                                                {"money", new KeyValuePair<string, int> ("VARCHAR", 19)},
                                                                                {"smallmoney", new KeyValuePair<string, int> ("VARCHAR", 10)},
                                                                                {"bigint", new KeyValuePair<string, int> ("VARCHAR", 26)},
                                                                                {"int", new KeyValuePair<string, int> ("VARCHAR", 14)},
                                                                                {"smallint", new KeyValuePair<string, int> ("VARCHAR", 7)},
                                                                                {"tinyint", new KeyValuePair<string, int> ("VARCHAR", 3)},
                                                                                {"bit", new KeyValuePair<string, int> ("VARCHAR", 1)},
                                                                                {"ntext", new KeyValuePair<string, int> ("NVARCHAR", -1)},
                                                                                {"text", new KeyValuePair<string, int> ("VARCHAR", -1)},
                                                                                {"image", new KeyValuePair<string, int> ("VARCHAR", -1)},
                                                                                {"timestamp", new KeyValuePair<string, int> ("VARCHAR", 8)},
                                                                                {"uniqueidentifier", new KeyValuePair<string, int> ("VARCHAR", 36)},
                                                                                {"nvarchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
                                                                                {"nchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
                                                                                {"varchar", new KeyValuePair<string, int> ("VARCHAR", 0)},
                                                                                {"char", new KeyValuePair<string, int> ("VARCHAR", 0)},
                                                                                {"varbinary", new KeyValuePair<string, int> ("NVARCHAR", -1)},
                                                                                {"binary", new KeyValuePair<string, int> ("NVARCHAR", -1)}
                                                                            };

        destinationDataTypes = new List<KeyValuePair<string, int>>();

    }

    public void Accumulate(SqlString value)
    {
        string[] buffer;
        string currentDataTypeName;
        int currentDataLength;
        
        if (value.IsNull)
        {
            return;
        }

        buffer = value.Value.Split('(', ')');
     
        currentDataTypeName = buffer[0].ToLower();
        
        // length is extracting from the source value
        if (dataTypesWithoutPredifinedLength.Contains(currentDataTypeName))
        {
            if(buffer[1].ToUpper() == "MAX")
            {
                buffer[1] = "-1";
            }

            Int32.TryParse(buffer[1], out currentDataLength);

            destinationDataTypes.Add(new KeyValuePair<string, int>(currentDataTypeName, currentDataLength));
        }
        // length is predefined
        else
        {
            destinationDataTypes.Add(new KeyValuePair<string, int>(dataTypesMapping[currentDataTypeName].Key, dataTypesMapping[currentDataTypeName].Value));
        }
    }

    public void Merge(AnalysisCustomRollupsGetHighestDataTypeConverstionValue other)
    {
        destinationDataTypes = destinationDataTypes.Union(other.destinationDataTypes).ToList();
    }

    public SqlString Terminate()
    {
        string output;
        string length;
  
        length = (destinationDataTypes.OrderBy(x => x.Value).First().Value == -1 ? "MAX" : destinationDataTypes.OrderByDescending(x => x.Value).First().Value.ToString());
        
        output = (destinationDataTypes.Exists(x => String.Equals(x.Key.ToUpper(), "NVARCHAR")) ? "NVARCHAR" : "VARCHAR") + "(" + length + ")";
      
        return new SqlString(output);
    }

    public void Read(BinaryReader r)
    {
        if (r == null) throw new ArgumentNullException("r");

        int count = r.ReadInt32();
        destinationDataTypes = new List<KeyValuePair<string, int>>(count);

        for (int i = 0; i < count; i++)
        {
            destinationDataTypes.Add(new KeyValuePair<string, int> (r.ReadString(), r.ReadInt32()));
        }
    }

    public void Write(BinaryWriter w)
    {
        if (w == null) throw new ArgumentNullException("w");

        w.Write(destinationDataTypes.Count);
        foreach (KeyValuePair<string, int> record in destinationDataTypes)
        {
            w.Write(record.Key);
            w.Write(record.Value);
        }
    }
}

它允许我这样做:

SELECT [dbo].[AnalysisCustomRollupsGetHighestDataTypeConverstionValue] ([column_type])
FROM 
(
    VALUES ('VARCHAR(5)')
          ,('INT')
          ,('SMALLMONEY')
) DS ([column_type]);

其中 returns VARCHAR(14).

我想在您的 ETL 过程中这会更容易实现。

更难的是日期的处理。在我的上下文中,所有日期都以 YYYY-MM-DDTHH-MM-SS 这种格式作为字符串出现。如果您需要加入日期,并且其中一些日期以 2010 5th Maysomething crazy enter in input without any validation 等不同格式的字符串形式出现,您需要先将它们转换为日期,然后再转换为字符串。