ETL:跨具有不同字段类型的不同来源进行连接
ETL: doing a join across different sources with different field type
对于跨多个源执行 ETL 的应用程序,如何处理“值”可能相同但表示方式可能不同的连接。例如,让我们采用以下虚构的场景:
来源 1
- ProductID:
909
// 一个整数
- 产品:
"Soda"
来源 2
- ProductID:
"909"
// 字符串
- 动作类型:
"Click"
假设这些来自两个我无权修改的不同来源(例如,一个可能是 Salesforce 数据,另一个是公司数据库)。 ETL 应用程序如何处理可能以不同方式存储字段类型的连接?
- 从每个源读取数据
- 转换为相同的数据类型
- 加入
全部在您选择的 ETL 工具中。问题在哪里,因为我显然错过了它?...
以下是如何在 Informatica 中实现它。
我在很多遗留数据库中都遇到过这个问题。 tables 驻留在不同来源的事实无关紧要,因为我已经看到这种情况发生在相同的模式、不同的模式以及不同的数据库中。
问题有两个方面:可行性和性能。
可行性
我所知道的所有数据库都支持数据类型转换和转换。他们中的一些人在幕后默默地做着,他们往往做错了。例如,Oracle 在这方面是臭名昭著的,因为它倾向于向错误的方向转换。我建议 始终明确地执行此操作。
例如 (PostgreSQL):
create table a (product_id int, name varchar(10));
insert into a (product_id, name) values (909, 'soda');
create table b (product_id varchar(10), action_type varchar(10));
insert into b (product_id, action_type) values ('909', 'click');
在 PostgreSQL 中,以下三个查询有效,它们产生相同的结果集(性能不同):
select * from a join b on a.product_id = b.product_id; -- don't do this
select * from a join b on a.product_id = cast(b.product_id as int);
select * from a join b on cast(a.product_id as varchar) = b.product_id;
如果您键入第一个选项,引擎会在您不知情的情况下悄悄将其转换为第二个或第三个查询。这会产生 unexpected/unnecessary 错误,您可能无法正确解决。同样,始终进行显式转换。
性能
当发挥性能时,重要的是要决定哪个 table 是 驱动 table,哪个是 二级table.
如果您决定 a
成为驾驶者 table,那么您可能会做两件事:
在b
端转换,如:
select * from a join b on a.product_id = cast(b.product_id as int);
可选地通过(如果可以的话)在 cast(b.product_id as int)
上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:
create index ix1 on b ((cast(b.product_id as int)));
另一方面,如果您决定 b
是驾驶者 table,那么您可能会这样做:
在a
端转换,如:
select * from a join b on cast(a.product_id as varchar) = b.product_id;
可选地,通过(如果可以的话)在 cast(a.product_id as varchar)
上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:
create index ix2 on a ((cast(a.product_id as varchar))));
要决定哪个选项更好,您需要获得两者的执行计划,阅读估计成本,然后做出决定。有时估计的成本并不那么可靠:它们只是 估计的 ,而不是真实的。在关键情况下,我最终会 运行 两个选项进行比较。
我将采用以下方法:
- 创建识别列类型的功能
- 将列类型映射到文本表示形式
- 转换为保存值所需的最大文本长度
- 加入数据
我在一个数据库的上下文中有相同的场景并应用了这种方法。这是因为从数据库中查询动态列集并对它们执行各种 SQL 操作的功能。
其中一位运算符是UNPIVOT。 T-SQL语句构建是这样的:
SELECT *
FROM
(
SELCET RowID
,Col001
,Col002
...
,Col00X
FROM ...
) DS
UNPIVOT
(
[value] FOR [column] IN ([Col001], [Col002], ... , [Col00X])
) UNPVT;
问题是 UNPIVOT
IN
子句中的所有列必须具有相同的类型。当然,最简单的解决方法是将所有列 CAST/CONVERT
更改为 NVARCHAR(MAX)
,因为它几乎可以存储所有内容,我们也这样做了,但是查询的执行时间很长。
因此,如果最大的类型可以保留在 VARCHAR(X)
或 NVARCHNAR(X)
中以将列转换为它会更好。我编写了一个简单的 SQL CLR Aggregate 函数,因为我在 table 中拥有所有列名称和类型,并且希望快速清晰地获取目标类型。看起来像这样:
using System;
using System.Data;
using System.Data.Sql;
using Microsoft.SqlServer.Server;
using System.Data.SqlTypes;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Linq;
[Serializable]
[
Microsoft.SqlServer.Server.SqlUserDefinedAggregate
(
Microsoft.SqlServer.Server.Format.UserDefined,
IsInvariantToNulls = true,
IsInvariantToDuplicates = false,
IsInvariantToOrder = false,
MaxByteSize = -1
)
]
/// <summary>
/// Returns the data type with highest precedence. The date types comes in the "[system_type_name]" format.
/// </summary>
public class AnalysisCustomRollupsGetHighestDataTypeConverstionValue : Microsoft.SqlServer.Server.IBinarySerialize
{
private Dictionary<string, KeyValuePair<string, int>> dataTypesMapping;
private List<KeyValuePair<string, int>> destinationDataTypes;
private String[] dataTypesWithoutPredifinedLength;
public void Init()
{
// for the following data types the length is extracting from the input value
dataTypesWithoutPredifinedLength = new string[] {"nvarchar", "nchar", "varchar", "char"};
// each data type is mapped to its string corresponding value ("-1" is "MAX", "0" is defined by source)
dataTypesMapping = new Dictionary<string, KeyValuePair<string, int>> {
{"user-defined data types", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"sql_variant", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"xml", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"datetimeoffset", new KeyValuePair<string, int> ("VARCHAR", 34)},
{"datetime2", new KeyValuePair<string, int> ("VARCHAR", 27)},
{"datetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"smalldatetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"date", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"time", new KeyValuePair<string, int> ("VARCHAR", 16)},
{"float", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"real", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"decimal", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"money", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"smallmoney", new KeyValuePair<string, int> ("VARCHAR", 10)},
{"bigint", new KeyValuePair<string, int> ("VARCHAR", 26)},
{"int", new KeyValuePair<string, int> ("VARCHAR", 14)},
{"smallint", new KeyValuePair<string, int> ("VARCHAR", 7)},
{"tinyint", new KeyValuePair<string, int> ("VARCHAR", 3)},
{"bit", new KeyValuePair<string, int> ("VARCHAR", 1)},
{"ntext", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"text", new KeyValuePair<string, int> ("VARCHAR", -1)},
{"image", new KeyValuePair<string, int> ("VARCHAR", -1)},
{"timestamp", new KeyValuePair<string, int> ("VARCHAR", 8)},
{"uniqueidentifier", new KeyValuePair<string, int> ("VARCHAR", 36)},
{"nvarchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
{"nchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
{"varchar", new KeyValuePair<string, int> ("VARCHAR", 0)},
{"char", new KeyValuePair<string, int> ("VARCHAR", 0)},
{"varbinary", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"binary", new KeyValuePair<string, int> ("NVARCHAR", -1)}
};
destinationDataTypes = new List<KeyValuePair<string, int>>();
}
public void Accumulate(SqlString value)
{
string[] buffer;
string currentDataTypeName;
int currentDataLength;
if (value.IsNull)
{
return;
}
buffer = value.Value.Split('(', ')');
currentDataTypeName = buffer[0].ToLower();
// length is extracting from the source value
if (dataTypesWithoutPredifinedLength.Contains(currentDataTypeName))
{
if(buffer[1].ToUpper() == "MAX")
{
buffer[1] = "-1";
}
Int32.TryParse(buffer[1], out currentDataLength);
destinationDataTypes.Add(new KeyValuePair<string, int>(currentDataTypeName, currentDataLength));
}
// length is predefined
else
{
destinationDataTypes.Add(new KeyValuePair<string, int>(dataTypesMapping[currentDataTypeName].Key, dataTypesMapping[currentDataTypeName].Value));
}
}
public void Merge(AnalysisCustomRollupsGetHighestDataTypeConverstionValue other)
{
destinationDataTypes = destinationDataTypes.Union(other.destinationDataTypes).ToList();
}
public SqlString Terminate()
{
string output;
string length;
length = (destinationDataTypes.OrderBy(x => x.Value).First().Value == -1 ? "MAX" : destinationDataTypes.OrderByDescending(x => x.Value).First().Value.ToString());
output = (destinationDataTypes.Exists(x => String.Equals(x.Key.ToUpper(), "NVARCHAR")) ? "NVARCHAR" : "VARCHAR") + "(" + length + ")";
return new SqlString(output);
}
public void Read(BinaryReader r)
{
if (r == null) throw new ArgumentNullException("r");
int count = r.ReadInt32();
destinationDataTypes = new List<KeyValuePair<string, int>>(count);
for (int i = 0; i < count; i++)
{
destinationDataTypes.Add(new KeyValuePair<string, int> (r.ReadString(), r.ReadInt32()));
}
}
public void Write(BinaryWriter w)
{
if (w == null) throw new ArgumentNullException("w");
w.Write(destinationDataTypes.Count);
foreach (KeyValuePair<string, int> record in destinationDataTypes)
{
w.Write(record.Key);
w.Write(record.Value);
}
}
}
它允许我这样做:
SELECT [dbo].[AnalysisCustomRollupsGetHighestDataTypeConverstionValue] ([column_type])
FROM
(
VALUES ('VARCHAR(5)')
,('INT')
,('SMALLMONEY')
) DS ([column_type]);
其中 returns VARCHAR(14)
.
我想在您的 ETL 过程中这会更容易实现。
更难的是日期的处理。在我的上下文中,所有日期都以 YYYY-MM-DDTHH-MM-SS
这种格式作为字符串出现。如果您需要加入日期,并且其中一些日期以 2010 5th May
或 something crazy enter in input without any validation
等不同格式的字符串形式出现,您需要先将它们转换为日期,然后再转换为字符串。
对于跨多个源执行 ETL 的应用程序,如何处理“值”可能相同但表示方式可能不同的连接。例如,让我们采用以下虚构的场景:
来源 1
- ProductID:
909
// 一个整数 - 产品:
"Soda"
来源 2
- ProductID:
"909"
// 字符串 - 动作类型:
"Click"
假设这些来自两个我无权修改的不同来源(例如,一个可能是 Salesforce 数据,另一个是公司数据库)。 ETL 应用程序如何处理可能以不同方式存储字段类型的连接?
- 从每个源读取数据
- 转换为相同的数据类型
- 加入
全部在您选择的 ETL 工具中。问题在哪里,因为我显然错过了它?...
以下是如何在 Informatica 中实现它。
我在很多遗留数据库中都遇到过这个问题。 tables 驻留在不同来源的事实无关紧要,因为我已经看到这种情况发生在相同的模式、不同的模式以及不同的数据库中。
问题有两个方面:可行性和性能。
可行性
我所知道的所有数据库都支持数据类型转换和转换。他们中的一些人在幕后默默地做着,他们往往做错了。例如,Oracle 在这方面是臭名昭著的,因为它倾向于向错误的方向转换。我建议 始终明确地执行此操作。
例如 (PostgreSQL):
create table a (product_id int, name varchar(10));
insert into a (product_id, name) values (909, 'soda');
create table b (product_id varchar(10), action_type varchar(10));
insert into b (product_id, action_type) values ('909', 'click');
在 PostgreSQL 中,以下三个查询有效,它们产生相同的结果集(性能不同):
select * from a join b on a.product_id = b.product_id; -- don't do this
select * from a join b on a.product_id = cast(b.product_id as int);
select * from a join b on cast(a.product_id as varchar) = b.product_id;
如果您键入第一个选项,引擎会在您不知情的情况下悄悄将其转换为第二个或第三个查询。这会产生 unexpected/unnecessary 错误,您可能无法正确解决。同样,始终进行显式转换。
性能
当发挥性能时,重要的是要决定哪个 table 是 驱动 table,哪个是 二级table.
如果您决定 a
成为驾驶者 table,那么您可能会做两件事:
在
b
端转换,如:select * from a join b on a.product_id = cast(b.product_id as int);
可选地通过(如果可以的话)在
cast(b.product_id as int)
上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:create index ix1 on b ((cast(b.product_id as int)));
另一方面,如果您决定 b
是驾驶者 table,那么您可能会这样做:
在
a
端转换,如:select * from a join b on cast(a.product_id as varchar) = b.product_id;
可选地,通过(如果可以的话)在
cast(a.product_id as varchar)
上创建表达式索引(或索引虚拟索引)来进一步加快查询速度,如:create index ix2 on a ((cast(a.product_id as varchar))));
要决定哪个选项更好,您需要获得两者的执行计划,阅读估计成本,然后做出决定。有时估计的成本并不那么可靠:它们只是 估计的 ,而不是真实的。在关键情况下,我最终会 运行 两个选项进行比较。
我将采用以下方法:
- 创建识别列类型的功能
- 将列类型映射到文本表示形式
- 转换为保存值所需的最大文本长度
- 加入数据
我在一个数据库的上下文中有相同的场景并应用了这种方法。这是因为从数据库中查询动态列集并对它们执行各种 SQL 操作的功能。
其中一位运算符是UNPIVOT。 T-SQL语句构建是这样的:
SELECT *
FROM
(
SELCET RowID
,Col001
,Col002
...
,Col00X
FROM ...
) DS
UNPIVOT
(
[value] FOR [column] IN ([Col001], [Col002], ... , [Col00X])
) UNPVT;
问题是 UNPIVOT
IN
子句中的所有列必须具有相同的类型。当然,最简单的解决方法是将所有列 CAST/CONVERT
更改为 NVARCHAR(MAX)
,因为它几乎可以存储所有内容,我们也这样做了,但是查询的执行时间很长。
因此,如果最大的类型可以保留在 VARCHAR(X)
或 NVARCHNAR(X)
中以将列转换为它会更好。我编写了一个简单的 SQL CLR Aggregate 函数,因为我在 table 中拥有所有列名称和类型,并且希望快速清晰地获取目标类型。看起来像这样:
using System;
using System.Data;
using System.Data.Sql;
using Microsoft.SqlServer.Server;
using System.Data.SqlTypes;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Linq;
[Serializable]
[
Microsoft.SqlServer.Server.SqlUserDefinedAggregate
(
Microsoft.SqlServer.Server.Format.UserDefined,
IsInvariantToNulls = true,
IsInvariantToDuplicates = false,
IsInvariantToOrder = false,
MaxByteSize = -1
)
]
/// <summary>
/// Returns the data type with highest precedence. The date types comes in the "[system_type_name]" format.
/// </summary>
public class AnalysisCustomRollupsGetHighestDataTypeConverstionValue : Microsoft.SqlServer.Server.IBinarySerialize
{
private Dictionary<string, KeyValuePair<string, int>> dataTypesMapping;
private List<KeyValuePair<string, int>> destinationDataTypes;
private String[] dataTypesWithoutPredifinedLength;
public void Init()
{
// for the following data types the length is extracting from the input value
dataTypesWithoutPredifinedLength = new string[] {"nvarchar", "nchar", "varchar", "char"};
// each data type is mapped to its string corresponding value ("-1" is "MAX", "0" is defined by source)
dataTypesMapping = new Dictionary<string, KeyValuePair<string, int>> {
{"user-defined data types", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"sql_variant", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"xml", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"datetimeoffset", new KeyValuePair<string, int> ("VARCHAR", 34)},
{"datetime2", new KeyValuePair<string, int> ("VARCHAR", 27)},
{"datetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"smalldatetime", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"date", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"time", new KeyValuePair<string, int> ("VARCHAR", 16)},
{"float", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"real", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"decimal", new KeyValuePair<string, int> ("VARCHAR", 48)},
{"money", new KeyValuePair<string, int> ("VARCHAR", 19)},
{"smallmoney", new KeyValuePair<string, int> ("VARCHAR", 10)},
{"bigint", new KeyValuePair<string, int> ("VARCHAR", 26)},
{"int", new KeyValuePair<string, int> ("VARCHAR", 14)},
{"smallint", new KeyValuePair<string, int> ("VARCHAR", 7)},
{"tinyint", new KeyValuePair<string, int> ("VARCHAR", 3)},
{"bit", new KeyValuePair<string, int> ("VARCHAR", 1)},
{"ntext", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"text", new KeyValuePair<string, int> ("VARCHAR", -1)},
{"image", new KeyValuePair<string, int> ("VARCHAR", -1)},
{"timestamp", new KeyValuePair<string, int> ("VARCHAR", 8)},
{"uniqueidentifier", new KeyValuePair<string, int> ("VARCHAR", 36)},
{"nvarchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
{"nchar", new KeyValuePair<string, int> ("NVARCHAR", 0)},
{"varchar", new KeyValuePair<string, int> ("VARCHAR", 0)},
{"char", new KeyValuePair<string, int> ("VARCHAR", 0)},
{"varbinary", new KeyValuePair<string, int> ("NVARCHAR", -1)},
{"binary", new KeyValuePair<string, int> ("NVARCHAR", -1)}
};
destinationDataTypes = new List<KeyValuePair<string, int>>();
}
public void Accumulate(SqlString value)
{
string[] buffer;
string currentDataTypeName;
int currentDataLength;
if (value.IsNull)
{
return;
}
buffer = value.Value.Split('(', ')');
currentDataTypeName = buffer[0].ToLower();
// length is extracting from the source value
if (dataTypesWithoutPredifinedLength.Contains(currentDataTypeName))
{
if(buffer[1].ToUpper() == "MAX")
{
buffer[1] = "-1";
}
Int32.TryParse(buffer[1], out currentDataLength);
destinationDataTypes.Add(new KeyValuePair<string, int>(currentDataTypeName, currentDataLength));
}
// length is predefined
else
{
destinationDataTypes.Add(new KeyValuePair<string, int>(dataTypesMapping[currentDataTypeName].Key, dataTypesMapping[currentDataTypeName].Value));
}
}
public void Merge(AnalysisCustomRollupsGetHighestDataTypeConverstionValue other)
{
destinationDataTypes = destinationDataTypes.Union(other.destinationDataTypes).ToList();
}
public SqlString Terminate()
{
string output;
string length;
length = (destinationDataTypes.OrderBy(x => x.Value).First().Value == -1 ? "MAX" : destinationDataTypes.OrderByDescending(x => x.Value).First().Value.ToString());
output = (destinationDataTypes.Exists(x => String.Equals(x.Key.ToUpper(), "NVARCHAR")) ? "NVARCHAR" : "VARCHAR") + "(" + length + ")";
return new SqlString(output);
}
public void Read(BinaryReader r)
{
if (r == null) throw new ArgumentNullException("r");
int count = r.ReadInt32();
destinationDataTypes = new List<KeyValuePair<string, int>>(count);
for (int i = 0; i < count; i++)
{
destinationDataTypes.Add(new KeyValuePair<string, int> (r.ReadString(), r.ReadInt32()));
}
}
public void Write(BinaryWriter w)
{
if (w == null) throw new ArgumentNullException("w");
w.Write(destinationDataTypes.Count);
foreach (KeyValuePair<string, int> record in destinationDataTypes)
{
w.Write(record.Key);
w.Write(record.Value);
}
}
}
它允许我这样做:
SELECT [dbo].[AnalysisCustomRollupsGetHighestDataTypeConverstionValue] ([column_type])
FROM
(
VALUES ('VARCHAR(5)')
,('INT')
,('SMALLMONEY')
) DS ([column_type]);
其中 returns VARCHAR(14)
.
我想在您的 ETL 过程中这会更容易实现。
更难的是日期的处理。在我的上下文中,所有日期都以 YYYY-MM-DDTHH-MM-SS
这种格式作为字符串出现。如果您需要加入日期,并且其中一些日期以 2010 5th May
或 something crazy enter in input without any validation
等不同格式的字符串形式出现,您需要先将它们转换为日期,然后再转换为字符串。