Nifi - 截断并加载到 mysql db table

Nifi - Truncate and Load to mysql db table

我正在从 SFTP 站点读取 CSV 文件并使用 Nifi 将其加载到 mysql 数据库。

我有以下工作流程,似乎工作正常。我只需要一些帮助来弄清楚如何在开始加载数据之前 运行 对 table 进行分类。

尼菲流:

ListSFTP -> FetchSFTP -> InferAvroSchema -> ConvertCSVtoAvro -> ConvertAvrotoJSON -> SplitJSON -> ConvertJSONtoSQL -> PutSQL

这个流程似乎工作正常,但每次我 运行 这个,我需要 table 先被 t运行 cated 然后开始加载。

有人可以帮我提供一些有关如何实现此目标的信息。或者有没有比我写的更好的流程,请指教

谢谢, 阿迪尔

在进入 "truncate before insert" 部分之前,我建议用 PutDatabaseRecord 替换从 ConvertCSVtoAvro 到 PutSQL 的所有内容。它基本上一起执行 "ConvertXtoSQL->PutSQL",并减轻了所有这些转换的需要,只是为了执行 SQL 语句。

插入部分之前的截断有点棘手。从 NiFi 1.5.0(在撰写本文时尚未发布)开始,通过 NIFI-4522,您将能够在执行 SQL 命令时保留流文件内容。这意味着在 PutDatabaseRecord 之前,您可以将 "SQL Statement" 设置为 "TRUNCATE myTable" 的 PutSQL。 PutSQL 将发出 TRUNCATE 但继续传递流文件。

目前我能想到的唯一解决方法是 ExecuteScript 处理器。如果使用 PutDatabaseRecord,您可能必须按名称获取 DBCPConnectionPool(请参阅连接上的 my blog post for an example) and execute the TRUNCATE statement yourself, then pass along the incoming flow file. If using your flow above with PutSQL, you could get the incoming flow file, then write out a "TRUNCATE myTable" flow file ONLY if the fragment.index is zero and you're using a Prioritizer,然后传输传入的流文件。