如何在 SSIS 中进行内部连接而不是每个循环?

How to do an inner join rather than for each loop in SSIS?

在 ETL 服务器上我有一个 DW 用户 table。

在产品 OLTP 服务器上,我有销售数据库。我只想为 ETL 服务器上的用户 table 中存在的用户提取销售额。

目前我正在使用执行 SQL 任务将 DW 用户提取到 SSIS System.Object 变量中。然后使用 for each 循环遍历此变量中的每个项目 (userid),并通过数据流任务获取每个用户的 OLTP 销售额 table 并将其转储到 DW 暂存 table。 for each 需要很长时间才能 运行.

我希望能够进行内部联接以便响应更快,但我不能这样做,因为它们位于不同的服务器上。出于同样的原因,我也不能使用全局临时 table 进行内部连接。

我尝试将 DW 用户收集到逗号分隔的字符串变量中,然后使用它(通过 string_split)查询 OLTP,但这在预执行阶段也需要更多时间(不是确定为什么)即使对于少数用户也是如此。

我也知道查找转换,但这也会导致将所有 oltp 行带入 dw etl 服务器以测试查找条件。

是否有任何替代方法能够通过将用户列表放入源中来进行内部联接?

注意:我没有对 OLTP 数据库的写入权限。

根据评论,我认为我们可以使用临时table来解决这个问题。

Can you help me understand this restriction? "Neither can I use a global temp table to make the inner join, for the same reason."

The restriction is since oltp server and dw server are separate so can't have global temp table common to both servers. Hope makes sense.

我们要做的一般模式是

  1. 执行 SQL 任务以在 OLTP 服务器上创建一个临时 table
  2. 填充新临时文件的数据流任务table。来源 = DW。目标 = OLTP。确保延迟验证 = True
  3. 修改现有的数据流。将源修改为使用临时 table 的查询,即 SELECT S.* FROM oltp.sales AS S WHERE EXISTS (SELECT * FROM #SalesPerson AS SP WHERE SP.UserId = S.UserId); Ensure Delay Validation = True

关于使用临时 tables 的长格式回答(全局设置元数据,此后常规)

临时tables,住在tempdb。您的 OLTP 和 DW 连接管理器可能不指向 tempdb。为了能够在 SSIS 中引用本地或全局的临时 table,您需要为明确指向 tempdb 的同一服务器定义一个额外的连接管理器,以便您可以使用 source/destination 组件(技术上准确但愚蠢)。或者,您使用 SSIS 变量来保存 table 的名称,并在 source/destination 组件中使用 ~From Variable~ 命名选项(最佳选项,最大灵活性)。

详尽示例

我将使用 WideWorldImporters 作为我的 OLTP 系统,使用 WideWorldImportersDW 作为我的 DW 系统。

一次性任务

打开 SQL Server Management Studio、SSMS,然后连接到您的 OLTP 系统。定义一个具有唯一名称和预期结构的全局临时 table。保持连接打开,以便 table 结构在初始开发期间保持完整。

我使用了下面的语句。

DROP TABLE IF EXISTS #SO_70530036; 
CREATE TABLE #SO_70530036(EmployeeId int NOT NULL);

跟踪您的查询,因为我们稍后会使用它,但正如我在我的 SSIS 答案中所提倡的那样,执行最小的任务,测试它是否有效,然后继续下一个。这是调试的唯一方法。

连接管理器

定义两个 OLE DB 连接管理器。 WWI_DW 使用指向命名实例 DEV2019UTF8,WWI_OLTP 指向 DEV2019EXPRESS。右键单击 WWI_OLTP 和 select Properties。找到 属性 RetainSameConnection 并将其从默认的 False 翻转为 True。这确保在整个包中使用相同的连接。由于连接消失时临时 table 超出范围,关闭并重新打开包中的连接将导致致命错误。

这两个数据库在不同的实例上,所以我们不能欺骗和直接混合数据。

变量

在 SSIS 中定义 4 个变量,均为 String 类型。

  • TempTableName - 我使用了值 ##SO_70530036 但使用您在一次性任务部分指定的任何值。

  • QuerySourceEmployees - 这将是查询您运行 生成的候选数据集以进入临时table。我用了 SELECT TOP (3) E.[WWI Employee ID] AS EmployeeId FROM Dimension.Employee AS E WHERE E.[Is SalesPerson] = CAST(1 AS bit);

  • QueryDefineTables - 还记得准时任务中的 drop/create 语句吗?我们将使用它们的本质,但使用表达式生成器让我们动态交换 table 名称。我在“表达式”部分单击了省略号,...,并使用了以下内容 "DROP TABLE IF EXISTS " + @[User::TempTableName] + "; CREATE TABLE " + @[User::TempTableName] + "( EmployeeId int NOT NULL);" 您应该能够从行中复制值并将其粘贴到 SSMS 中以确认其有效。

  • QuerySales - 这是您要用来提取过滤后的销售数据集的实际查询。同样,我们将使用表达式允许我们动态引用临时 table 名称。表达式的美化版本看起来像

    SELECT SI.InvoiceID , SI.SalespersonPersonID , SO.OrderID , SOL.StockItemID , SOL.Quantity , SOL.OrderLineID 从 Sales.Invoices 作为 SI 内部联接 Sales.Orders 这样 ON SO.OrderID = SI.OrderID 内部联接 Sales.OrderLines作为太阳 ON SO.OrderID = SOL.OrderID 在哪里 EXISTS (SELECT * FROM " + @[User::TempTableName] + " AS TT WHERE TT.EmployeeID = SI.SalespersonPersonID);"

同样,您应该能够从三个查询和 运行 中独立提取值并验证它们是否有效。

执行SQL任务

将执行 SQL 任务添加到控制流。我将我的命名为 SQL Create temporary table 我的连接管理器是 WWI_OLTP 我将 SQLSourceType 更改为 Variable 并且 SourceVariable 是 User::QueryDefineTables

每次你的包 运行s,它要做的第一件事就是创建临时文件 table。这很好,因为 SSIS 是元数据驱动的 ETL 引擎,如果 table 不存在,接下来的两个步骤将失败。

数据流任务 - 启动泵

此数据流是我们将 DW 数据传输回 OLTP 系统的地方,因此可以在源系统中进行过滤。

将数据流任务拖到控制流上。我将我的命名为 DFT Load Temp,在你点击它之前,右键单击任务并找到 DelayValidation 属性 并将其从默认的 False 更改为 True。通常,包会在实际执行开始之前验证所有元数据,因为您希望在任何数据开始移动之前知道一切都很好。由于我们使用的是临时 tables,我们需要告诉执行引擎“相信我们,它会准备好”

在数据流任务中双击。

添加 OLE DB 源。我将我的命名为 OLESRC SourceEmployees 我使用连接管理器 WWI_DW。我的数据访问模式更改为 SQL command from variable 然后我 select 我的变量 User::QuerySourceEmployees

添加 OLE DB 目标。我将我的命名为 OLEDST TempTableName 并双击进行配置。连接管理器是 WWI_OLTP,而且由于 table 存在于 tempdb 中,我们不能从下拉列表中 select。将数据访问模式更改为 Table name or view name variable - fast load,然后 select 您的变量名称 User::TempTableName。单击“映射”选项卡并确保源列映射到目标列。

数据流任务 - 传输数据

最后,我们将提取源数据,并根据目标系统中的数据进行很好的过滤。

添加 OLE DB 源。我把它命名为OLESRC QuerySales。连接管理器是 WWI_OLTP。数据访问模式再次更改为 SQL command from variable,变量名称为 User::QuerySales

从这里开始,做任何你需要做的事情来让奇迹发生。

而不是 27 万行未过滤的查询

我有67k,因为临时工只有3个员工table。

参考包

等等,还有更多!

关闭 visual studio,重新打开并尝试触及数据流中的某些内容。突然,到处都是红色的X!每当您关闭数据流组件时,它都会触发重新验证元数据操作并猜测是什么,它无法执行此操作,因为与临时 table 的连接已消失。 该包 运行 很好,它不会抛出 VS_NEEDSNEWMETADATA 但 editing/maintenance 变得很痛苦。

如果您从全局临时 table 切换到本地,请将 table 名称变量的值切换回全局,然后 运行 SSMS 中的定义语句。一旦完成,您就可以继续编辑包了。

我向您保证,一旦您设置了元数据并通过变量查询 source/destination.

,本地临时 table 就可以正常工作

不需要全局临时 table hack,或 SET FMTONLY OFF hack(不再有效)。

只需在 SQL 查询中使用 WITH RESULT SETS 指定结果集元数据。例如

EXEC ('

create table #t
( 
  ID INT,
  Name VARCHAR(150),
  Number VARCHAR(15)
)
insert into #t (Id, Name, Number)
select object_id, name, 12
from sys.objects

select * from #t

')
WITH RESULT SETS
(
 ( 
  ID INT,
  Name VARCHAR(150),
  Number VARCHAR(15)
 ) 
) 

如果您需要参数化查询,则需要注意一点,因为 SSIS 发现参数的方式存在一些限制。 SSIS 运行 sp_describe_undeclared_parameters, which doesn't really work with batches that call sp_executesql,因为 sp_executesql 有一种非常独特的参数处理方式,您无法使用用户存储过程复制这种方式。

因此,要参数化查询,您需要使用“从变量查询”和 SSIS 表达式将参数值传递到查询中,或者将所有这些 TSQL 推送到存储过程中。