在 NiFi 的 QueryDatabaseTable 中执行多个 Select *

Executing multiple Select * in QueryDatabaseTable in NiFi

我要执行select * from table1, select * from table2, select * from table3,...select * from table80.....(基本上从80个不同的表中提取数据并发送数据到 Elasticsearch(Kibana) 中的 80 个不同索引。

我可以在一个查询数据库 Table 中给出多个 select * statement 然后将其路由到不同的索引吗?如果是,流程会怎样?

QueryDatabaseTable 不允许 传入连接 所以它是 not possible.

但您可以通过以下流程实现相同的用例

流量:

1. ListDatabaseTables
2. RouteOnAttribute //*optional* filter only required tables
3. GenerateTableFetch //to generate pages of sql queries and store state
4. RemoteProcessGroup (or) Load balance connection
5. ExecuteSql //run more than one concurrent task if needed
6. further processing
7. PutElasticSearch.

此外,如果您不想 运行 流量 incrementally 然后删除 GenerateTableFetch 处理器

  • 配置ExecuteSql处理器select查询为

    select * from ${db.table.schema}.${db.table.name}

一些有用的参考资料:

  • GenerateTableFetch link1 link2

  • 增量 运行 ExecuteSQL 处理器而不使用 GenerateTableFetch link

您可以采用多种方法来解决此问题。

  • 如果您的 table 字面上是 table1table2 等,您可以简单地生成 80 个流文件,每个流文件在属性中都有一个唯一的整数值(即 table_count) 并使用 GenerateTableFetchExecuteSQL 通过表达式语言
  • 创建使用此属性的查询
  • 如果 table 名称是不连续的(即 usersaddresses 等),您可以从一个文件中读取每行列出的名称或使用 ListDatabaseTables 查询数据库中的名称。然后,您可以执行简单的文本处理,将创建的流文件拆分为一个 table 并继续如上