使用 NiFi 生成 XML 个包含日语字符的文件

Generate XML files with Japanese characters using NiFi

我正在使用 NiFi 中的 Python ExecuteScript 处理器将 JSON 有效负载转换为 XML 文件。 JSON 看起来像这样:

{
  "Header": {
    "Att1": 1,
    "Att2": "value2",
    "Att3": "1",
    "Att4": "경기00자123"
  }
}

将 JSON 转换为 XML 的 python 脚本如下:

import json
import xml.etree.ElementTree as ET
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text)
        root = ET.Element("headerinfo")
        entity = ET.SubElement(root, "headerfile")
        ET.SubElement(entity, "Att1").text = str(data["Header"]["Att1"])
        ET.SubElement(entity, "Att2").text = str(data["Header"]["Att2"])
        ET.SubElement(entity, "Att3").text = str(data["Header"]["Att3"])
        ET.SubElement(entity, "Att4").text = data["Header"]["Att4"].encode("utf8")
        xmlNew = ET.tostring(root)
        outputStream.write(bytearray(xmlNew))

flowFile = session.get()
if flowFile != None:
    try :
        flowFile = session.write(flowFile, ModJSON())
        flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        flowFile = session.putAttribute(flowFile,'python_error', str(e))
        session.transfer(flowFile, REL_FAILURE)

无论我如何尝试用日文字符对 Att4 进行编码,结果 XML 中看起来像这样:

京都111を3

我如何更改代码来解决这个问题? 尝试了很多不同的东西,但似乎没有任何效果。

jython 中的字节字符串似乎存在问题 - 它们会自动转换为编码不正确的 str 对象。

但是 ElementTree 有 write function that could write to a file-like object and OutputStream(java 对象)实际上实现了写入功能 - 所以,我们可以让 ElementTree 直接写入 OutputStream

import json
import xml.etree.ElementTree as ET
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text)
        root = ET.Element("headerinfo")
        entity = ET.SubElement(root, "headerfile")
        ET.SubElement(entity, "Att1").text = str(data["Header"]["Att1"])
        ET.SubElement(entity, "Att2").text = str(data["Header"]["Att2"])
        ET.SubElement(entity, "Att3").text = str(data["Header"]["Att3"])
        ET.SubElement(entity, "Att4").text = data["Header"]["Att4"]
        ET.ElementTree(root).write(outputStream, encoding='utf-8')

flowFile = session.get()
if flowFile != None:
    try :
        flowFile = session.write(flowFile, ModJSON())
        flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        flowFile = session.putAttribute(flowFile,'python_error', str(e))
        session.transfer(flowFile, REL_FAILURE)