将 ExecuteSQL 处理器的结果与 nifi 6.0 中的 Json 内容合并

Merge results of ExecuteSQL processor with Json content in nifi 6.0

我正在处理 json 个包含地理坐标点的对象。我想 运行 这些点针对我在本地的 postgis 服务器,以评估多边形匹配中的点。

我希望用现有的处理器来做到这一点 - 我成功地使用 "EvaluateJsonPath" 处理器将 lat/lon 坐标提取到属性中,并成功地使用 [= 向我的本地 postgis 数据存储发出查询32=]。这给我留下了 avro 响应,然后我可以使用 "ConvertAvroToJSON" 处理器将其转换为 JSON。

我在如何将查询结果与原始 JSON 对象合并回一起方面遇到了概念上的问题。事实上,我有两个具有相同片段 ID 的流文件,理论上我可以将它们与 "mergecontent" 合并在一起,但这让我:

{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}

是否有任何建议的策略可以将 SQL 查询的结果合并到原始 json 结构中,所以我的结果应该是这样的:

{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}

我是 运行ning nifi 6.0、postgres 9.5.2 和 postgis 2.2.1。

我在 https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html 中看到了一些关于使用 replaceText 处理器的参考 - 但这似乎是将属性中的内容合并到内容的主体中。我错过了合并原始内容和 SQL 响应的内容,或者从没有内容的 SQL 响应中提取的属性的要点。

编辑:

Groovy 以下脚本似乎可以执行所需的操作。我不是 groovy 编码员,所以欢迎任何改进。

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def originaljsontext = flowFile.getAttribute('original.json')
        def originaljson = slurper.parseText(originaljsontext)
        originaljson.put("point_polygon_info", obj)
        outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)

如果您的原始 JSON 相对较小,可能的方法如下...

  • 在进入 ExecuteSQL 之前使用 ExtractText 将原始 JSON 复制到属性中。
  • 在 ExecuteSQL 和 ConvertAvroToJSON 之后,使用 ExecuteScript 处理器创建一个新的 JSON 文档,该文档将属性中的原始文档与内容中的结果相结合。

我不确定脚本中需要做什么,但我知道其他人已经通过 ExecuteScript 处理器使用 Groovy 和 JsonSlurper 取得了成功。