使用 NiFi 提取数据库

Database Extract with NiFi

我想从 table 'nifitest' 的 SQL 数据库中提取所有名为 customers 的记录。我正在为您的帮助人员附上模板。我面临以下问题: 一旦我 运行 'QueryDatabaseTable' 处理器我得到队列中的流文件但是当我清空队列并尝试重新 运行 'QueryDatabaseTable' 处理器时我无法获取记录队列。我该如何解决这个问题?

流程的整体思路是从 mySql 数据库中提取记录并将文件本地存储在我的桌面上。

我已使用以下网站上的模板来实现此目的,并根据我的要求更改了我的 table 名称和数据库连接池服务:-

https://www.batchiq.com/database-extract-with-nifi.html

此外,在合并内容之后,我将文件作为文件类型获取。使用哪个处理器将合并文件存储为 CSV 文件类型 (所有记录存储为 Test.csv 而不是文件类型) 我使用的是 'UpdateAttribute'处理器并将 属性 'filename' 值添加为 'Test.csv' 但在输出文件中我只看到一个记录不是所有的记录。如何解决这个问题?

谢谢!

QueryDatabaseTable 将在运行时 存储状态 并在再次运行时拉取已添加到 table 的 only the incremental records

要从 table 获取所有记录,您 需要清除处理器的状态 然后只有您才能从 [=101] 获取所有记录=].

How to clear state of QueryDatabaseTable processor?

 1. Stop Query Database processor //make sure no threads are running at top right corner

 2. Right Click on the processor

 3. Go to View state tab

 4. Click on clear state //this will clear all the stored state in the processor

有关清除 QueryDatabase table 处理器

状态的更多详细信息,请参阅 this link

-> 另一个问题是 MergeContent 处理器 作为

最小条目数 设置为 100 因此处理器将等到 MergeContent 处理器之前的队列中有 100 个流文件。

使用 Max Bin Age 属性1 min ..etc 这样处理器将强制合并流文件和将合并的流文件传输到合并关系中。

有关 MergeContent 处理器 usage/configuration 的更多详细信息,请参阅 this link。

UPDATE:

1.

如果您想将最终的文件格式转换为csv然后保存到L本地路径。

那么你可以在QueryDatabaseTable之后使用ConvertRecord处理器,不需要转换Avro --> Json --> Csv

ConvertRecord 处理器配置为 Avro ReaderCsvRecordSetWriter 控制器服务然后处理器读取 Avro 数据然后转换为 Csv 格式。

要更改流文件的文件名,请使用 UpdateAttribute 处理器将所需的文件名作为 new 属性 添加到处理器

流量:

  - QueryDatabaseTable //get data from source in avro format 
  - ConvertRecord //convert Avro format to CSV format
  - UpdateAttribute //change filename
  - PutFile //store the csv file into local

2.

如果您担心 数据在输出流文件中存储为一行,请将合并内容处理器配置为

Delimiter Strategy Text

Demarcator shift+enter

有关此合并内容定界符策略配置的更多详细信息,请参阅 link。

使用此配置我们不会更改数据格式(数据仍将在 json 中)但我们会在每个 json 记录后添加新行。