NiFi - 如何更改 ReplaceText 上的 ReplacementValue?

NiFi - How can i change ReplacementValue on ReplaceText?

我正在使用 ExecuteSQL、SplitAvro、ConvertAvroToJSON、EvaluateJsonPath、ReplaceText、ExecuteSQL。

我正在尝试使用 replaceText 处理器替换流文件中的内容。

现在,我可以这样替换了。 -> 插入值 (${id},'${name}') 。 ReplaceText 进程像这样发送执行 sql :

插入 x 值(1,'xx')

插入 x 值(2,'yy')

插入 x 值(3,'zz')

我正在为每一行发送 INSERT 查询。

但我想像这样发送执行sql进程

插入 x 值 (1,'xx'),(2,'yy'),(3,'zz')

我不确定这是最好的方法,但你可以这样做:

SplitAvro           # i guess you are splitting the records here (here you should get fragment.* attributes)
ConvertAvroToJSON   # converting each record to json
EvaluateJsonPath    # getting id,name values from json
ReplaceText         # (${id}, '${name}')
MergeContent        # merge rows back to single file with header and delimiter
   Binary Concatenation
   Header = Insert into X values
   Demarcator = ,
ExecuteSQL

尽管它不会完全生成您要查找的 SQL,请查看 ConvertRecord and/or JoltTransformRecord -> PutDatabaseRecord。前者用于将你的每条Avro记录变成你想要的形式(idname),而PutDatabaseRecord会使用一个PreparedStatement分批将记录发送到数据库。它可能不如单个 INSERT 高效,但应该比 Split -> Convert -> ExecuteSQL 和每个 FlowFile 单独的 INSERT 更有效。

要真正获得您想要的 SQL,您可能需要一个脚本处理器,例如带有 RecordReader 的 InvokeScriptedProcessor,我有一个关于这个主题的 blog post