在 NiFi 的 QueryDatabaseTable 中执行多个 Select *
Executing multiple Select * in QueryDatabaseTable in NiFi
我要执行select * from table1
, select * from table2
, select * from table3
,...select * from table80
.....(基本上从80个不同的表中提取数据并发送数据到 Elasticsearch(Kibana) 中的 80 个不同索引。
我可以在一个查询数据库 Table 中给出多个 select * statement
然后将其路由到不同的索引吗?如果是,流程会怎样?
QueryDatabaseTable
不允许 传入连接 所以它是 not possible
.
但您可以通过以下流程实现相同的用例
流量:
1. ListDatabaseTables
2. RouteOnAttribute //*optional* filter only required tables
3. GenerateTableFetch //to generate pages of sql queries and store state
4. RemoteProcessGroup (or) Load balance connection
5. ExecuteSql //run more than one concurrent task if needed
6. further processing
7. PutElasticSearch.
此外,如果您不想 运行 流量 incrementally
然后删除 GenerateTableFetch
处理器
配置ExecuteSql
处理器select查询为
select * from ${db.table.schema}.${db.table.name}
一些有用的参考资料:
您可以采用多种方法来解决此问题。
- 如果您的 table 字面上是
table1
、table2
等,您可以简单地生成 80 个流文件,每个流文件在属性中都有一个唯一的整数值(即 table_count
) 并使用 GenerateTableFetch
和 ExecuteSQL
通过表达式语言 创建使用此属性的查询
- 如果 table 名称是不连续的(即
users
、addresses
等),您可以从一个文件中读取每行列出的名称或使用 ListDatabaseTables
查询数据库中的名称。然后,您可以执行简单的文本处理,将创建的流文件拆分为一个 table 并继续如上
我要执行select * from table1
, select * from table2
, select * from table3
,...select * from table80
.....(基本上从80个不同的表中提取数据并发送数据到 Elasticsearch(Kibana) 中的 80 个不同索引。
我可以在一个查询数据库 Table 中给出多个 select * statement
然后将其路由到不同的索引吗?如果是,流程会怎样?
QueryDatabaseTable
不允许 传入连接 所以它是 not possible
.
但您可以通过以下流程实现相同的用例
流量:
1. ListDatabaseTables
2. RouteOnAttribute //*optional* filter only required tables
3. GenerateTableFetch //to generate pages of sql queries and store state
4. RemoteProcessGroup (or) Load balance connection
5. ExecuteSql //run more than one concurrent task if needed
6. further processing
7. PutElasticSearch.
此外,如果您不想 运行 流量 incrementally
然后删除 GenerateTableFetch
处理器
配置
ExecuteSql
处理器select查询为select * from ${db.table.schema}.${db.table.name}
一些有用的参考资料:
您可以采用多种方法来解决此问题。
- 如果您的 table 字面上是
table1
、table2
等,您可以简单地生成 80 个流文件,每个流文件在属性中都有一个唯一的整数值(即table_count
) 并使用GenerateTableFetch
和ExecuteSQL
通过表达式语言 创建使用此属性的查询
- 如果 table 名称是不连续的(即
users
、addresses
等),您可以从一个文件中读取每行列出的名称或使用ListDatabaseTables
查询数据库中的名称。然后,您可以执行简单的文本处理,将创建的流文件拆分为一个 table 并继续如上