Pentaho Data Integration 动态连接(从数据库读取连接)
Pentaho Data Integration dynamic connection (read connection from database)
Pentaho 数据集成:CE 6.1.0.1-196
我是 Pentaho 数据集成的新手。
我需要 运行 在多个数据库中进行相同的查询。
我在master数据库中创建了一个table来存放需要查阅的其他数据库的连接信息。
table 结构下方。
SQL> desc database_connection;
Name Type Nullable Default Comments
------------- ------------- -------- ------- --------
DATABASE_NAME VARCHAR2(32) Y
JDBC_URL VARCHAR2(512) Y
USERNAME VARCHAR2(32) Y
PASSWORD VARCHAR2(32) Y
ENABLED VARCHAR2(1) Y
示例数据
DATABASE_NAME: XPTO
JDBC_URL: (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = xptosrv.xyz.com)(PORT = 1521))(LOAD_BALANCE = ON)(FAILOVER = ON)(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = XPTO.XYZ.COM)(FAILOVER_MODE = (TYPE = SELECT)(METHOD = BASIC)(RETRIES = 180)(DELAY = 5))))
USERNAME: SYSTEM
PASSWORD: blablabla
ENABLED: Y
我的 .ktr 文件:
(set_variables.ktr)
Table 输入 ---> 将行复制到结果
master 数据库中与输入 table 运行 关联的查询。
select database_name, jdbc_url, username, password from database_connection where enabled = 'Y'
(db_query.ktr)
Table 输入 ---> Table 输出
与 table 输入相关的查询 运行 o(多个数据库)并将数据存储在 table 输出(主数据库)
我的 .kjb 文件:
(run_for_each_row.kjb)
开始--->改造--->成功
转换文件名:${Internal.Job.Filename.Directory}/db_query.ktr
作业属性参数:
DATABASE_NAME
JDBC_URL
密码
用户名
(master_job.kjb)
开始 ---> 转换 ---> 每行的作业 ---> 成功
转换文件名:${Internal.Job.Filename.Directory}/set_variables.ktr
每行文件名的作业:${Internal.Job.Filename.Directory}/run_for_each_row.kjb
每一行的作业...“高级”选项卡
将以前的结果复制到参数 -> 检查
对每个输入行执行 -> checked
每一行的作业...参数:DATABASE_NAME、JDBC_URL、密码、用户名
执行日志:
2016/10/06 10:36:15 - Spoon - Iniciando o job...
2016/10/06 10:36:15 - master_job - Início da execução do job
2016/10/06 10:36:15 - master_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/set_variables.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)
2016/10/06 10:36:15 - master_job - Starting entry [Job for each row]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - slave_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/db_query.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - db_query - Expedindo in?cio para transforma??o [db_query]
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : An error occurred, processing will be stopped:
2016/10/06 10:36:15 - Table input.0 - Error occurred while trying to connect to the database
2016/10/06 10:36:15 - Table input.0 -
2016/10/06 10:36:15 - Table input.0 - Error connecting to database: (using class oracle.jdbc.driver.OracleDriver)
2016/10/06 10:36:15 - Table input.0 - Erro de ES: Connect identifier was empty.
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Erro inicializando step [Table input]
2016/10/06 10:36:15 - Table output.0 - Connected to database [REPORT] (commit=1000)
2016/10/06 10:36:15 - db_query - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Step [Table input.0] falhou durante inicializa??o!
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Unable to prepare for execution of the transformation
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : org.pentaho.di.core.exception.KettleException:
2016/10/06 10:36:15 - Transformation - Falhou a inicializa??o de pelo menos um step. A Execu??o n?o pode sere iniciada!
2016/10/06 10:36:15 - Transformation -
2016/10/06 10:36:15 - Transformation -
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1142)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.trans.Trans.execute(Trans.java:612)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.entries.trans.JobEntryTrans.execute(JobEntryTrans.java:1097)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:723)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:864)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:608)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.entries.job.JobEntryJobRunner.run(JobEntryJobRunner.java:69)
2016/10/06 10:36:15 - Transformation - at java.lang.Thread.run(Thread.java:745)
2016/10/06 10:36:15 - slave_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Job for each row] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Job execution finished
2016/10/06 10:36:15 - Spoon - O Job finalizou.
正在读取来自 database_connection table 的数据
2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)
但我不知道我做错了什么,这些数据没有作为参数传递。
感谢任何帮助,因为几天前我已经解决了这个问题。
我在 Whosebug 和 pentaho 论坛上找到的例子对我帮助不大。
项目文件(https://github.com/scarlosantos/pdi)
谢谢
在 "set_variables.ktr" 中使用设置变量步骤而不是复制结果,并在连接属性中使用变量,它将在 运行 时替换那些变量,并且您将拥有动态数据库连接。
这个确切的用例在 FAQ Beginner Section 中有很好的解释。
简而言之:
0) 检查你有所有驱动程序。
1) 不要忘记在转换s 和作业中指定这些变量的名称 (right-click anywhere, Properties, Parameters
)。而且它们是在工作范围级别定义的。
2) 重要:您转到视图(在左侧窗格中,您很可能在设计),并共享连接以便 PDI 知道您在任何位置的连接transformation/job.
3) 编辑连接,并在主机名、数据库名...框中,写入${HOST}
、${DATABASE_NAME}
...或您为变量指定的任何名称。如果您执行了步骤 (1),只需在下拉菜单中按 Crtl-Space 和 select。
4) 使用记事本编辑名为 C:\Users\yourname\.kettle\shared.xml
的文件。如果您保留上一个工作版本的副本,甚至会很有趣。而且,如果您足够勇敢,您甚至可以使用 PDI 生成此文件。
现在你提出了一个有趣的问题:你似乎连接了 jdbc-url,你可以在 PDI 中做到这一点(使用 Generic Database Connection
),但是方法 PDI 不知道您使用的是哪种 sql-方言。因此,如果您在流程中遇到一些有趣的错误,请确保您 SELECT *
,不要使用惰性转换并查看带有 Right-click/Output Fields
.
的类型
Pentaho 数据集成:CE 6.1.0.1-196
我是 Pentaho 数据集成的新手。 我需要 运行 在多个数据库中进行相同的查询。 我在master数据库中创建了一个table来存放需要查阅的其他数据库的连接信息。 table 结构下方。
SQL> desc database_connection;
Name Type Nullable Default Comments
------------- ------------- -------- ------- --------
DATABASE_NAME VARCHAR2(32) Y
JDBC_URL VARCHAR2(512) Y
USERNAME VARCHAR2(32) Y
PASSWORD VARCHAR2(32) Y
ENABLED VARCHAR2(1) Y
示例数据
DATABASE_NAME: XPTO
JDBC_URL: (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = xptosrv.xyz.com)(PORT = 1521))(LOAD_BALANCE = ON)(FAILOVER = ON)(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = XPTO.XYZ.COM)(FAILOVER_MODE = (TYPE = SELECT)(METHOD = BASIC)(RETRIES = 180)(DELAY = 5))))
USERNAME: SYSTEM
PASSWORD: blablabla
ENABLED: Y
我的 .ktr 文件:
(set_variables.ktr)
Table 输入 ---> 将行复制到结果
master 数据库中与输入 table 运行 关联的查询。
select database_name, jdbc_url, username, password from database_connection where enabled = 'Y'
(db_query.ktr)
Table 输入 ---> Table 输出
与 table 输入相关的查询 运行 o(多个数据库)并将数据存储在 table 输出(主数据库)
我的 .kjb 文件:
(run_for_each_row.kjb)
开始--->改造--->成功
转换文件名:${Internal.Job.Filename.Directory}/db_query.ktr
作业属性参数:
DATABASE_NAME JDBC_URL 密码 用户名
(master_job.kjb)
开始 ---> 转换 ---> 每行的作业 ---> 成功
转换文件名:${Internal.Job.Filename.Directory}/set_variables.ktr
每行文件名的作业:${Internal.Job.Filename.Directory}/run_for_each_row.kjb
每一行的作业...“高级”选项卡 将以前的结果复制到参数 -> 检查 对每个输入行执行 -> checked
每一行的作业...参数:DATABASE_NAME、JDBC_URL、密码、用户名
执行日志:
2016/10/06 10:36:15 - Spoon - Iniciando o job...
2016/10/06 10:36:15 - master_job - Início da execução do job
2016/10/06 10:36:15 - master_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/set_variables.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)
2016/10/06 10:36:15 - master_job - Starting entry [Job for each row]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - slave_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/db_query.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - db_query - Expedindo in?cio para transforma??o [db_query]
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : An error occurred, processing will be stopped:
2016/10/06 10:36:15 - Table input.0 - Error occurred while trying to connect to the database
2016/10/06 10:36:15 - Table input.0 -
2016/10/06 10:36:15 - Table input.0 - Error connecting to database: (using class oracle.jdbc.driver.OracleDriver)
2016/10/06 10:36:15 - Table input.0 - Erro de ES: Connect identifier was empty.
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Erro inicializando step [Table input]
2016/10/06 10:36:15 - Table output.0 - Connected to database [REPORT] (commit=1000)
2016/10/06 10:36:15 - db_query - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Step [Table input.0] falhou durante inicializa??o!
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Unable to prepare for execution of the transformation
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : org.pentaho.di.core.exception.KettleException:
2016/10/06 10:36:15 - Transformation - Falhou a inicializa??o de pelo menos um step. A Execu??o n?o pode sere iniciada!
2016/10/06 10:36:15 - Transformation -
2016/10/06 10:36:15 - Transformation -
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1142)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.trans.Trans.execute(Trans.java:612)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.entries.trans.JobEntryTrans.execute(JobEntryTrans.java:1097)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:723)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:864)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.Job.execute(Job.java:608)
2016/10/06 10:36:15 - Transformation - at org.pentaho.di.job.entries.job.JobEntryJobRunner.run(JobEntryJobRunner.java:69)
2016/10/06 10:36:15 - Transformation - at java.lang.Thread.run(Thread.java:745)
2016/10/06 10:36:15 - slave_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Job for each row] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Job execution finished
2016/10/06 10:36:15 - Spoon - O Job finalizou.
正在读取来自 database_connection table 的数据
2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)
但我不知道我做错了什么,这些数据没有作为参数传递。
感谢任何帮助,因为几天前我已经解决了这个问题。
我在 Whosebug 和 pentaho 论坛上找到的例子对我帮助不大。
项目文件(https://github.com/scarlosantos/pdi)
谢谢
在 "set_variables.ktr" 中使用设置变量步骤而不是复制结果,并在连接属性中使用变量,它将在 运行 时替换那些变量,并且您将拥有动态数据库连接。
这个确切的用例在 FAQ Beginner Section 中有很好的解释。
简而言之:
0) 检查你有所有驱动程序。
1) 不要忘记在转换s 和作业中指定这些变量的名称 (right-click anywhere, Properties, Parameters
)。而且它们是在工作范围级别定义的。
2) 重要:您转到视图(在左侧窗格中,您很可能在设计),并共享连接以便 PDI 知道您在任何位置的连接transformation/job.
3) 编辑连接,并在主机名、数据库名...框中,写入${HOST}
、${DATABASE_NAME}
...或您为变量指定的任何名称。如果您执行了步骤 (1),只需在下拉菜单中按 Crtl-Space 和 select。
4) 使用记事本编辑名为 C:\Users\yourname\.kettle\shared.xml
的文件。如果您保留上一个工作版本的副本,甚至会很有趣。而且,如果您足够勇敢,您甚至可以使用 PDI 生成此文件。
现在你提出了一个有趣的问题:你似乎连接了 jdbc-url,你可以在 PDI 中做到这一点(使用 Generic Database Connection
),但是方法 PDI 不知道您使用的是哪种 sql-方言。因此,如果您在流程中遇到一些有趣的错误,请确保您 SELECT *
,不要使用惰性转换并查看带有 Right-click/Output Fields
.