使用 NiFi 提取数据库
Database Extract with NiFi
我想从 table 'nifitest' 的 SQL 数据库中提取所有名为 customers 的记录。我正在为您的帮助人员附上模板。我面临以下问题:
一旦我 运行 'QueryDatabaseTable' 处理器我得到队列中的流文件但是当我清空队列并尝试重新 运行 'QueryDatabaseTable' 处理器时我无法获取记录队列。我该如何解决这个问题?
流程的整体思路是从 mySql 数据库中提取记录并将文件本地存储在我的桌面上。
我已使用以下网站上的模板来实现此目的,并根据我的要求更改了我的 table 名称和数据库连接池服务:-
https://www.batchiq.com/database-extract-with-nifi.html
此外,在合并内容之后,我将文件作为文件类型获取。使用哪个处理器将合并文件存储为 CSV 文件类型 (所有记录存储为 Test.csv 而不是文件类型) 我使用的是 'UpdateAttribute'处理器并将 属性 'filename' 和 值添加为 'Test.csv' 但在输出文件中我只看到一个记录不是所有的记录。如何解决这个问题?
谢谢!
QueryDatabaseTable 将在运行时 存储状态 并在再次运行时拉取已添加到 table 的 only the incremental records
。
要从 table 获取所有记录,您 需要清除处理器的状态 然后只有您才能从 [=101] 获取所有记录=].
How to clear state of QueryDatabaseTable processor?
1. Stop Query Database processor //make sure no threads are running at top right corner
2. Right Click on the processor
3. Go to View state tab
4. Click on clear state //this will clear all the stored state in the processor
有关清除 QueryDatabase table 处理器
状态的更多详细信息,请参阅 this link
-> 另一个问题是 MergeContent 处理器 作为
最小条目数 设置为 100 因此处理器将等到 MergeContent 处理器之前的队列中有 100 个流文件。
使用 Max Bin Age 属性 值 1 min ..etc 这样处理器将强制合并流文件和将合并的流文件传输到合并关系中。
有关 MergeContent 处理器 usage/configuration 的更多详细信息,请参阅 this link。
UPDATE:
1.
如果您想将最终的文件格式转换为csv然后保存到L本地路径。
那么你可以在QueryDatabaseTable
之后使用ConvertRecord处理器,不需要转换Avro --> Json --> Csv
将 ConvertRecord 处理器配置为 Avro Reader 和 CsvRecordSetWriter 控制器服务然后处理器读取 Avro 数据然后转换为 Csv 格式。
要更改流文件的文件名,请使用 UpdateAttribute 处理器将所需的文件名作为 new 属性 添加到处理器 。
流量:
- QueryDatabaseTable //get data from source in avro format
- ConvertRecord //convert Avro format to CSV format
- UpdateAttribute //change filename
- PutFile //store the csv file into local
2.
如果您担心 数据在输出流文件中存储为一行,请将合并内容处理器配置为
Delimiter Strategy Text
Demarcator shift+enter
有关此合并内容定界符策略配置的更多详细信息,请参阅 link。
使用此配置我们不会更改数据格式(数据仍将在 json 中)但我们会在每个 json 记录后添加新行。
我想从 table 'nifitest' 的 SQL 数据库中提取所有名为 customers 的记录。我正在为您的帮助人员附上模板。我面临以下问题: 一旦我 运行 'QueryDatabaseTable' 处理器我得到队列中的流文件但是当我清空队列并尝试重新 运行 'QueryDatabaseTable' 处理器时我无法获取记录队列。我该如何解决这个问题?
流程的整体思路是从 mySql 数据库中提取记录并将文件本地存储在我的桌面上。
我已使用以下网站上的模板来实现此目的,并根据我的要求更改了我的 table 名称和数据库连接池服务:-
https://www.batchiq.com/database-extract-with-nifi.html
此外,在合并内容之后,我将文件作为文件类型获取。使用哪个处理器将合并文件存储为 CSV 文件类型 (所有记录存储为 Test.csv 而不是文件类型) 我使用的是 'UpdateAttribute'处理器并将 属性 'filename' 和 值添加为 'Test.csv' 但在输出文件中我只看到一个记录不是所有的记录。如何解决这个问题?
谢谢!
QueryDatabaseTable 将在运行时 存储状态 并在再次运行时拉取已添加到 table 的 only the incremental records
。
要从 table 获取所有记录,您 需要清除处理器的状态 然后只有您才能从 [=101] 获取所有记录=].
How to clear state of QueryDatabaseTable processor?
1. Stop Query Database processor //make sure no threads are running at top right corner
2. Right Click on the processor
3. Go to View state tab
4. Click on clear state //this will clear all the stored state in the processor
-> 另一个问题是 MergeContent 处理器 作为
最小条目数 设置为 100 因此处理器将等到 MergeContent 处理器之前的队列中有 100 个流文件。
使用 Max Bin Age 属性 值 1 min ..etc 这样处理器将强制合并流文件和将合并的流文件传输到合并关系中。
有关 MergeContent 处理器 usage/configuration 的更多详细信息,请参阅 this link。
UPDATE:
1.
如果您想将最终的文件格式转换为csv然后保存到L本地路径。
那么你可以在QueryDatabaseTable
之后使用ConvertRecord处理器,不需要转换Avro --> Json --> Csv
将 ConvertRecord 处理器配置为 Avro Reader 和 CsvRecordSetWriter 控制器服务然后处理器读取 Avro 数据然后转换为 Csv 格式。
要更改流文件的文件名,请使用 UpdateAttribute 处理器将所需的文件名作为 new 属性 添加到处理器 。
流量:
- QueryDatabaseTable //get data from source in avro format
- ConvertRecord //convert Avro format to CSV format
- UpdateAttribute //change filename
- PutFile //store the csv file into local
2.
如果您担心 数据在输出流文件中存储为一行,请将合并内容处理器配置为
Delimiter Strategy Text
Demarcator shift+enter
有关此合并内容定界符策略配置的更多详细信息,请参阅
使用此配置我们不会更改数据格式(数据仍将在 json 中)但我们会在每个 json 记录后添加新行。