Pentaho Data Integration 动态连接(从数据库读取连接)

Pentaho Data Integration dynamic connection (read connection from database)

Pentaho 数据集成:CE 6.1.0.1-196

我是 Pentaho 数据集成的新手。 我需要 运行 在多个数据库中进行相同的查询。 我在master数据库中创建了一个table来存放需要查阅的其他数据库的连接信息。 table 结构下方。

SQL> desc database_connection;
Name          Type          Nullable Default Comments 
------------- ------------- -------- ------- -------- 
DATABASE_NAME VARCHAR2(32)  Y                         
JDBC_URL      VARCHAR2(512) Y                         
USERNAME      VARCHAR2(32)  Y                         
PASSWORD      VARCHAR2(32)  Y
ENABLED       VARCHAR2(1)   Y   

示例数据

DATABASE_NAME: XPTO
JDBC_URL: (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = xptosrv.xyz.com)(PORT = 1521))(LOAD_BALANCE = ON)(FAILOVER = ON)(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = XPTO.XYZ.COM)(FAILOVER_MODE = (TYPE = SELECT)(METHOD = BASIC)(RETRIES = 180)(DELAY = 5))))
USERNAME: SYSTEM
PASSWORD: blablabla
ENABLED: Y

我的 .ktr 文件:

(set_variables.ktr)

Table 输入 ---> 将行复制到结果

master 数据库中与输入 table 运行 关联的查询。

select database_name, jdbc_url, username, password from database_connection where enabled = 'Y'

(db_query.ktr)

Table 输入 ---> Table 输出

与 table 输入相关的查询 运行 o(多个数据库)并将数据存储在 table 输出(主数据库)


我的 .kjb 文件:

(run_for_each_row.kjb)

开始--->改造--->成功

转换文件名:${Internal.Job.Filename.Directory}/db_query.ktr

作业属性参数:

DATABASE_NAME JDBC_URL 密码 用户名

(master_job.kjb)

开始 ---> 转换 ---> 每行的作业 ---> 成功

转换文件名:${Internal.Job.Filename.Directory}/set_variables.ktr

每行文件名的作业:${Internal.Job.Filename.Directory}/run_for_each_row.kjb

每一行的作业...“高级”选项卡 将以前的结果复制到参数 -> 检查 对每个输入行执行 -> checked

每一行的作业...参数:DATABASE_NAME、JDBC_URL、密码、用户名

执行日志:

2016/10/06 10:36:15 - Spoon - Iniciando o job...
2016/10/06 10:36:15 - master_job - Início da execução do job
2016/10/06 10:36:15 - master_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/set_variables.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)
2016/10/06 10:36:15 - master_job - Starting entry [Job for each row]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - slave_job - Starting entry [Transformation]
2016/10/06 10:36:15 - Transformation - Loading transformation from XML file [file:///D:/pdi/db_query.ktr]
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - cfgbuilder - Warning: The configuration parameter [org] is not supported by the default configuration builder for scheme: sftp
2016/10/06 10:36:15 - db_query - Expedindo in?cio para transforma??o [db_query]
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : An error occurred, processing will be stopped: 
2016/10/06 10:36:15 - Table input.0 - Error occurred while trying to connect to the database
2016/10/06 10:36:15 - Table input.0 - 
2016/10/06 10:36:15 - Table input.0 - Error connecting to database: (using class oracle.jdbc.driver.OracleDriver)
2016/10/06 10:36:15 - Table input.0 - Erro de ES: Connect identifier was empty.
2016/10/06 10:36:15 - Table input.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Erro inicializando step [Table input]
2016/10/06 10:36:15 - Table output.0 - Connected to database [REPORT] (commit=1000)
2016/10/06 10:36:15 - db_query - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Step [Table input.0] falhou durante inicializa??o!
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Unable to prepare for execution of the transformation
2016/10/06 10:36:15 - Transformation - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : org.pentaho.di.core.exception.KettleException: 
2016/10/06 10:36:15 - Transformation - Falhou a inicializa??o de pelo menos um step. A Execu??o n?o pode sere iniciada!
2016/10/06 10:36:15 - Transformation - 
2016/10/06 10:36:15 - Transformation - 
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1142)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.trans.Trans.execute(Trans.java:612)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.job.entries.trans.JobEntryTrans.execute(JobEntryTrans.java:1097)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.job.Job.execute(Job.java:723)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.job.Job.execute(Job.java:864)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.job.Job.execute(Job.java:608)
2016/10/06 10:36:15 - Transformation -  at org.pentaho.di.job.entries.job.JobEntryJobRunner.run(JobEntryJobRunner.java:69)
2016/10/06 10:36:15 - Transformation -  at java.lang.Thread.run(Thread.java:745)
2016/10/06 10:36:15 - slave_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Job for each row] (result=[false])
2016/10/06 10:36:15 - master_job - Finished job entry [Transformation] (result=[false])
2016/10/06 10:36:15 - master_job - Job execution finished
2016/10/06 10:36:15 - Spoon - O Job finalizou.

正在读取来自 database_connection table 的数据

2016/10/06 10:36:15 - set_variables - Expedindo in?cio para transforma??o [set_variables]
2016/10/06 10:36:15 - Table input.0 - Finished reading query, closing connection.
2016/10/06 10:36:15 - Copy rows to result.0 - Finished processing (I=0, O=0, R=6, W=6, U=0, E=0)
2016/10/06 10:36:15 - Table input.0 - Finished processing (I=6, O=0, R=0, W=6, U=0, E=0)

但我不知道我做错了什么,这些数据没有作为参数传递。

感谢任何帮助,因为几天前我已经解决了这个问题。

我在 Whosebug 和 pentaho 论坛上找到的例子对我帮助不大。

项目文件(https://github.com/scarlosantos/pdi

谢谢

在 "set_variables.ktr" 中使用设置变量步骤而不是复制结果,并在连接属性中使用变量,它将在 运行 时替换那些变量,并且您将拥有动态数据库连接。

这个确切的用例在 FAQ Beginner Section 中有很好的解释。

简而言之:

0) 检查你有所有驱动程序。

1) 不要忘记在转换s 和作业中指定这些变量的名称 (right-click anywhere, Properties, Parameters)。而且它们是在工作范围级别定义的。

2) 重要:您转到视图(在左侧窗格中,您很可能在设计),并共享连接以便 PDI 知道您在任何位置的连接transformation/job.

3) 编辑连接,并在主机名、数据库名...框中,写入${HOST}${DATABASE_NAME}...或您为变量指定的任何名称。如果您执行了步骤 (1),只需在下拉菜单中按 Crtl-Space 和 select。

4) 使用记事本编辑名为 C:\Users\yourname\.kettle\shared.xml 的文件。如果您保留上一个工作版本的副本,甚至会很有趣。而且,如果您足够勇敢,您甚至可以使用 PDI 生成此文件。

现在你提出了一个有趣的问题:你似乎连接了 jdbc-url,你可以在 PDI 中做到这一点(使用 Generic Database Connection),但是方法 PDI 不知道您使用的是哪种 sql-方言。因此,如果您在流程中遇到一些有趣的错误,请确保您 SELECT *,不要使用惰性转换并查看带有 Right-click/Output Fields.

的类型