用于搜索和替换 txt 文件中的多个单词的 NiFi 处理器

NiFi processor to search and replace multiple words in a txt file

我有一个 txt 文件,我正在 NiFi 中使用 FetchSFTP 读取它。另外,我有 json 格式的键和值,如下所示,在 REST 调用和 JoltTransformJSON 之后收到:

[{
    "Key": "k2s2e2",
    "Value": "Ottawa"
}, {
    "Key": "60601",
    "Value": "Chicago"
}, {
    "Key": "",
    "Value": "London"
}]

如何将上面出现的所有匹配键替换为其在 txt 文件中的值。

示例:abc.txt

000 apple stocks at k2s2e2 888
9000 samsung stocks at 60601 9990377
88 nokia devivces at 78889 790888071 hgj 7

输出:

000 apple stocks at Ottawa 888
9000 samsung stocks at Chicago 9990377
88 nokia devivces at 78889 790888071 hgj 7

我尝试使用 ExecuteGroovyScript:

import static groovy.json.JsonOutput.toJson

import java.nio.charset.StandardCharsets

import groovy.json.JsonBuilder
import groovy.json.JsonSlurper

class KeyValue{
    String key
    String value
}

private findAndReplace(KeyValueList) {
    def response
    KeyValueList.each {
    def srcExp = it.key
    def replaceText = it.value

   def inputFilepath = "C:\Project\abc.txt"
    def outputFilepath = "C:\Project\abc_output.txt"
    
    new File(outputFilepath).withWriter { w ->
        new File(inputFilepath).eachLine { line ->
          w << line.replaceAll(srcExp , replaceText ) << '\n'
        }
      response = w
      }
new File(inputFilepath).text= new File(outputFilepath).text
}
return response;
}

def flowFile = session.get()
if(!flowFile) return

def KeyValueList = []
//try {
        def is = flowFile.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }
        is.each {
               if(it.Key != "") {
                    KeyValue keyValue = new KeyValue(key:it.Key,value:it.Value)
                    KeyValueList.add(keyValue)
                 }
            }
          def retval =  findAndReplace(KeyValueList)
          flowFile = session.write(flowFile, {outputStream ->
               outputStream.write(retval.toString().getBytes(StandardCharsets.UTF_8))
           } as OutputStreamCallback)
    
    session.transfer(flowFile, REL_SUCCESS)
//}catch(Exception e) {
 //   log.info(e.getMessage())
//    REL_FAILURE << flowFile
//}

这不是对您问题的回应。只是尝试修复您的代码。

如果我理解正确的话你正在尝试

  • 从流文件中读取json
  • 从某个文件路径读取文本
  • 将带有替换的文本写入流文件

ExecuteGroovyScript 处理器的代码

import groovy.json.JsonSlurper

def ff = session.get()
if(!ff) return

ff.write{rawIn, rawOut->
    def keyValueList = rawIn.withReader("UTF-8"){ new JsonSlurper().parse(it) }

    new File('c:/Project/abc.txt').withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line->
               keyValueList.each{ if(it.Key) line = line.replaceAll(it.Key, it.Value) }
               writer << line << '\n'
            }
        }
    }
}

REL_SUCCESS << ff

没时间测试...