NiFi - 如何更改 ReplaceText 上的 ReplacementValue?
NiFi - How can i change ReplacementValue on ReplaceText?
我正在使用 ExecuteSQL、SplitAvro、ConvertAvroToJSON、EvaluateJsonPath、ReplaceText、ExecuteSQL。
我正在尝试使用 replaceText 处理器替换流文件中的内容。
现在,我可以这样替换了。 -> 插入值 (${id},'${name}') 。 ReplaceText 进程像这样发送执行 sql :
插入 x 值(1,'xx')
插入 x 值(2,'yy')
插入 x 值(3,'zz')
我正在为每一行发送 INSERT 查询。
但我想像这样发送执行sql进程
插入 x 值 (1,'xx'),(2,'yy'),(3,'zz')
我不确定这是最好的方法,但你可以这样做:
SplitAvro # i guess you are splitting the records here (here you should get fragment.* attributes)
ConvertAvroToJSON # converting each record to json
EvaluateJsonPath # getting id,name values from json
ReplaceText # (${id}, '${name}')
MergeContent # merge rows back to single file with header and delimiter
Binary Concatenation
Header = Insert into X values
Demarcator = ,
ExecuteSQL
尽管它不会完全生成您要查找的 SQL,请查看 ConvertRecord and/or JoltTransformRecord -> PutDatabaseRecord。前者用于将你的每条Avro记录变成你想要的形式(id
,name
),而PutDatabaseRecord会使用一个PreparedStatement分批将记录发送到数据库。它可能不如单个 INSERT 高效,但应该比 Split -> Convert -> ExecuteSQL 和每个 FlowFile 单独的 INSERT 更有效。
要真正获得您想要的 SQL,您可能需要一个脚本处理器,例如带有 RecordReader 的 InvokeScriptedProcessor,我有一个关于这个主题的 blog post。
我正在使用 ExecuteSQL、SplitAvro、ConvertAvroToJSON、EvaluateJsonPath、ReplaceText、ExecuteSQL。
我正在尝试使用 replaceText 处理器替换流文件中的内容。
现在,我可以这样替换了。 -> 插入值 (${id},'${name}') 。 ReplaceText 进程像这样发送执行 sql :
插入 x 值(1,'xx')
插入 x 值(2,'yy')
插入 x 值(3,'zz')
我正在为每一行发送 INSERT 查询。
但我想像这样发送执行sql进程
插入 x 值 (1,'xx'),(2,'yy'),(3,'zz')
我不确定这是最好的方法,但你可以这样做:
SplitAvro # i guess you are splitting the records here (here you should get fragment.* attributes)
ConvertAvroToJSON # converting each record to json
EvaluateJsonPath # getting id,name values from json
ReplaceText # (${id}, '${name}')
MergeContent # merge rows back to single file with header and delimiter
Binary Concatenation
Header = Insert into X values
Demarcator = ,
ExecuteSQL
尽管它不会完全生成您要查找的 SQL,请查看 ConvertRecord and/or JoltTransformRecord -> PutDatabaseRecord。前者用于将你的每条Avro记录变成你想要的形式(id
,name
),而PutDatabaseRecord会使用一个PreparedStatement分批将记录发送到数据库。它可能不如单个 INSERT 高效,但应该比 Split -> Convert -> ExecuteSQL 和每个 FlowFile 单独的 INSERT 更有效。
要真正获得您想要的 SQL,您可能需要一个脚本处理器,例如带有 RecordReader 的 InvokeScriptedProcessor,我有一个关于这个主题的 blog post。