Groovy script (NiFi) - xml select 子节点的动态属性

Groovy script (NiFi) - xml select dynamically attributes of child node

我有这样的 xml 结构:

    <Site >
        <Za id1="id1">
            <Infos>
                <Info>
                    <EndPoint foo="value-name" />
                </Info>
            </Infos>
            <Samples>
                <Sample date="date" attribute1="5.44" attribute2="234" attribute3="8.45"/>
                <Sample date="date" attribute1="7.45" attribute5="8.45"/>
            </Samples>
        </Za>
        <Za id2="id2">
            <Infos>
                <Info>
                    <EndPoint foo="value-name" />
                </Info>
            </Infos>
            <Samples>
                <Sample date="date" attribute1="5.44" attribute2="234" attribute3="8.45"/>
                <Sample date="date" attribute1="7.45" attribute5="8.45" attribute6="7.45" attribute7="8.45"/>
            </Samples>
        </Za>
    </Site>

期望的输出是这样的:

 {"time":"date1","name":"id1_attribute1","value":5.44}
 {"time":"date1","name":"id1_attribute2","value":234}
 {"time":"date1","name":"id1_attribute3","value":8.45}
 {"time":"date2","name":"id1_attribute4","value":7.45}
 {"time":"date2","name":"id1_attribute5","value":8.45}
 {"time":"date3","name":"id2_attribute1","value":5.44}
 .
 .
 .

我通过(在 NiFi 中列出并获取 ftp 处理器获取文件,但我无法打印我想要的输出。

我试图通过 相关问题中的这段代码获得我想要的输出,但我不确定如何更改它以使其正确。

所以代码如下:

import org.apache.nifi.flowfile.FlowFile;
import org.apache.commons.io.IOUtils
import org.apache.nifi.processor.io.InputStreamCallback
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import groovy.xml.dom.DOMCategory
import groovy.json.JsonGenerator

def flowFile

try {

  flowFile = session.get()

  DocumentBuilderFactory dbFactory = 
DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = null

session.read(flowFile, {inputStream ->
    doc =  dBuilder.parse(inputStream)
} as InputStreamCallback)

def root = doc.documentElement
def sb = new StringBuilder()
def jsonGenerator = new 
 JsonGenerator.Options().disableUnicodeEscaping().build()

// get a specific attribute
use(DOMCategory) {
    root['ExportData']['Site']['*'].findAll { node ->
        def data = new LinkedHashMap()
        data.id = node['@id1']
        sb.append(jsonGenerator.toJson(data))
        sb.append('\n')
    }   
 }

  // get all attributes of Sample under Samples
  use(DOMCategory) {
    root['ExportData']['Site']['Kapta']['Samples']['*'].findAll { 
  node ->
        def data = new LinkedHashMap()
        data.NodeName = node.name()
        def attributesMap = node.attributes()
        for (int x = 0; x < attributesMap.getLength(); x++) {
            data.AttrName = attributesMap.item(x).getNodeName();
            data.AttrValue = attributesMap.item(x).getNodeValue();
            sb.append(jsonGenerator.toJson(data))
            sb.append('\n')
        }
                
   }
 }   

 flowFile = session.write(flowFile, {inputStream, outputStream ->
    
 outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8))
 } as StreamCallback)

 session.transfer(flowFile, REL_SUCCESS)

 } catch (Exception e) {
   log.error('',e)
   session.transfer(flowFile, REL_FAILURE)
 }

此代码输出一个属性 id,然后动态地输出所有示例属性。我想像上面描述的那样打印每个 id,它的示例属性。

非常感谢您的时间和精力!

ExecuteGroovyScript 处理器的代码

import groovy.json.JsonBuilder

def ff = session.get()
if(!ff) return

ff.write{streamIn, streamOut->
    def xml = new XmlParser().parse(streamIn)
    def json = xml.ExportData.Site.Kapta.Samples.Sample.collectMany{sample->
        def attr = sample.attributes()
        def date = attr.remove('date')
        //use regexp to find id attribute by prefix `id`
        def id = sample.parent().parent().attributes().find{ k,v-> k =~ "^id.*" }.value
        attr.collect{k,v->
            [
                time: date,
                name: "${id}_${k}",
                value: new BigDecimal(v),
            ]
        }
    }
    streamOut.withWriter("UTF-8"){w-> new JsonBuilder(json).writeTo(w) }
}
ff."mime.type" = "application/json"
REL_SUCCESS<<ff

输出:

[
    {
        "time": "date1",
        "name": "id1_attribute1",
        "value": 5.44
    },
    {
        "time": "date1",
        "name": "id1_attribute2",
        "value": 234
    },
    {
        "time": "date1",
        "name": "id1_attribute3",
        "value": 8.45
    },
    {
        "time": "date2",
        "name": "id1_attribute1",
        "value": 7.45
    },
    {
        "time": "date2",
        "name": "id1_attribute5",
        "value": 8.45
    },
    {
        "time": "date3",
        "name": "id2_attribute1",
        "value": 5.44
    },
    {
        "time": "date3",
        "name": "id2_attribute2",
        "value": 234
    },
    {
        "time": "date3",
        "name": "id2_attribute3",
        "value": 8.45
    },
    {
        "time": "date4",
        "name": "id2_attribute1",
        "value": 7.45
    },
    {
        "time": "date4",
        "name": "id2_attribute5",
        "value": 8.45
    },
    {
        "time": "date4",
        "name": "id2_attribute6",
        "value": 7.45
    },
    {
        "time": "date4",
        "name": "id2_attribute7",
        "value": 8.45
    }
]