pentaho 数据集成中新插入或更新的行数
Newly inserted or updated row count in pentaho data integration
我是 Pentaho 数据集成的新手;我需要将一个数据库作为 ETL 作业集成到另一个位置。我想在 ETL 作业期间计算 insert/updat 的数量,并将该计数插入另一个 table 。谁能帮我解决这个问题?
我不认为有 built-in 功能可以返回迄今为止 PDI 中 Insert/Update 步骤的 受影响的行数 。
尽管如此,大多数数据库供应商都能够为您提供从给定操作中获取受影响的行数 的能力。
例如,在 PostgreSQL 中,它看起来像这样:
/* Count affected rows from INSERT */
WITH inserted_rows AS (
INSERT INTO ...
VALUES
...
RETURNING 1
)
SELECT count(*) FROM inserted_rows;
/* Count affected rows from UPDATE */
WITH updated_rows AS (
UPDATE ...
SET ...
WHERE ...
RETURNING 1
)
SELECT count(*) FROM updated_rows;
但是,您的目标是在 PDI 作业中执行此操作,因此我建议您尝试达到控制 SQL 脚本的程度。
建议:将源数据保存在目标数据库服务器上的文件中,然后使用它,也许具有批量加载功能,到insert/update,然后保存将受影响的行数放入 PDI 变量中。请注意,您可能需要在 Job 的范围内使用 SQL 脚本步骤。
编辑:实施是选择设计的问题,因此建议的解决方案是众多解决方案之一。在非常高的层次上,您可以执行以下操作。
- 转换 I - 从源中提取数据
- 从源头获取数据,无论是数据库还是其他任何东西
- 以适合目标数据库结构的方式准备输出
- 使用文件系统上的文本文件输出步骤保存 CSV 文件
- Parent 工作
- 如果PDI服务器与目标数据库服务器相同:
- 使用执行 SQL 脚本步骤来:
- 从文件中读取数据并执行INSERT/UPDATE
- 将受影响的行数写入 table(理想情况下,此 table 还可以包含操作的 time-stamp,以便您跟踪事情)
- 如果 PDI 服务器与目标数据库服务器不同:
- 上传源数据文件到服务器,例如使用 FTP/SFTP 文件上传步骤
- 使用执行 SQL 脚本步骤来:
- 从文件中读取数据并执行INSERT/UPDATE
- 将受影响的行数写入 table
编辑 2:另一个建议的解决方案
根据@user3123116 的建议,您可以使用比较字段 步骤(如果不是您环境的一部分,请检查市场)。
我看到的唯一缺点是你必须在inserting/updating之前查询目标数据库,这当然是性能较差的。
另请注意,您可以拆分源数据流的输入(COPY,而不是 DISTRIBUTE),然后执行您的 insert/update,但此流 必须等待 字段比较流以结束对目标数据库的查询,否则您可能会得到错误的统计信息。
"Compare Fields" 步骤将采用 2 个流作为比较输入,其输出为 "Identical"、“已更改”、"Added" 和 "Removed" 的 4 个不同流记录。您可以计算这 4 个,然后用 Insert/Update.
处理 "Changed"、"Added" 和 "Removed" 记录
您可以从“转换”设置中的“日志记录”选项执行此操作。请按照以下步骤操作:
- 单击编辑菜单 --> 设置
- 切换到日志记录 选项卡
- Select 左侧菜单中的步骤
- 提供 Log Connection & Log table 名称(Say StepLog )
- Select 记录所需的字段(LINES_OUTPUT - 用于插入计数 & LINES_UPDATED - 更新计数)
- 单击 SQL 按钮并通过单击 执行 按钮创建 table
- 现在所有的步骤都将记录到日志中table(StepLog),您可以使用它进行进一步的操作。
- 享受
我是 Pentaho 数据集成的新手;我需要将一个数据库作为 ETL 作业集成到另一个位置。我想在 ETL 作业期间计算 insert/updat 的数量,并将该计数插入另一个 table 。谁能帮我解决这个问题?
我不认为有 built-in 功能可以返回迄今为止 PDI 中 Insert/Update 步骤的 受影响的行数 。
尽管如此,大多数数据库供应商都能够为您提供从给定操作中获取受影响的行数 的能力。
例如,在 PostgreSQL 中,它看起来像这样:
/* Count affected rows from INSERT */
WITH inserted_rows AS (
INSERT INTO ...
VALUES
...
RETURNING 1
)
SELECT count(*) FROM inserted_rows;
/* Count affected rows from UPDATE */
WITH updated_rows AS (
UPDATE ...
SET ...
WHERE ...
RETURNING 1
)
SELECT count(*) FROM updated_rows;
但是,您的目标是在 PDI 作业中执行此操作,因此我建议您尝试达到控制 SQL 脚本的程度。
建议:将源数据保存在目标数据库服务器上的文件中,然后使用它,也许具有批量加载功能,到insert/update,然后保存将受影响的行数放入 PDI 变量中。请注意,您可能需要在 Job 的范围内使用 SQL 脚本步骤。
编辑:实施是选择设计的问题,因此建议的解决方案是众多解决方案之一。在非常高的层次上,您可以执行以下操作。
- 转换 I - 从源中提取数据
- 从源头获取数据,无论是数据库还是其他任何东西
- 以适合目标数据库结构的方式准备输出
- 使用文件系统上的文本文件输出步骤保存 CSV 文件
- Parent 工作
- 如果PDI服务器与目标数据库服务器相同:
- 使用执行 SQL 脚本步骤来:
- 从文件中读取数据并执行INSERT/UPDATE
- 将受影响的行数写入 table(理想情况下,此 table 还可以包含操作的 time-stamp,以便您跟踪事情)
- 使用执行 SQL 脚本步骤来:
- 如果 PDI 服务器与目标数据库服务器不同:
- 上传源数据文件到服务器,例如使用 FTP/SFTP 文件上传步骤
- 使用执行 SQL 脚本步骤来:
- 从文件中读取数据并执行INSERT/UPDATE
- 将受影响的行数写入 table
- 如果PDI服务器与目标数据库服务器相同:
编辑 2:另一个建议的解决方案
根据@user3123116 的建议,您可以使用比较字段 步骤(如果不是您环境的一部分,请检查市场)。
我看到的唯一缺点是你必须在inserting/updating之前查询目标数据库,这当然是性能较差的。
另请注意,您可以拆分源数据流的输入(COPY,而不是 DISTRIBUTE),然后执行您的 insert/update,但此流 必须等待 字段比较流以结束对目标数据库的查询,否则您可能会得到错误的统计信息。
"Compare Fields" 步骤将采用 2 个流作为比较输入,其输出为 "Identical"、“已更改”、"Added" 和 "Removed" 的 4 个不同流记录。您可以计算这 4 个,然后用 Insert/Update.
处理 "Changed"、"Added" 和 "Removed" 记录您可以从“转换”设置中的“日志记录”选项执行此操作。请按照以下步骤操作:
- 单击编辑菜单 --> 设置
- 切换到日志记录 选项卡
- Select 左侧菜单中的步骤
- 提供 Log Connection & Log table 名称(Say StepLog )
- Select 记录所需的字段(LINES_OUTPUT - 用于插入计数 & LINES_UPDATED - 更新计数)
- 单击 SQL 按钮并通过单击 执行 按钮创建 table
- 现在所有的步骤都将记录到日志中table(StepLog),您可以使用它进行进一步的操作。
- 享受