使用 apache Nifi 将 Json 数组转换为 csv

Convert Json array into csv using apache Nifi

我希望将 JSON 数组转换为 csv 格式。数组中的元素数量对于每一行都是动态的。我尝试使用此流程,(在 post 上附加了流程文件 xml )。

GetFile --> ConvertRecord --> UpdateAttribute --> PutFile

还有其他选择吗?

JSON格式:

{  "LogData": {
"Location": "APAC",
"product": "w1"  },  "Outcome": [
{
  "limit": "0",
  "pri": "3",
  "result": "pass"
},
{
  "limit": "1",
  "pri": "2",
  "result": "pass"
},
{
  "limit": "5",
  "priority": "1",
  "result": "fail"
}  ],  "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00"  }}

csv 格式的预期输出:

location,   product,    limit,  pri,    result, vers,   datetime
APAC        w1          0       3       pass    1       2018-01-10 00:36:00
APAC        w1          1       2       pass    1       2018-01-10 00:36:00
APAC        w1          5       1       fail    1       2018-01-10 00:36:00

附加流程的输出: 日志数据、结果、属性 "MapRecord[{product=w1, Location=APAC}]",[MapRecord[{limit=0, result=pass, pri=3}], MapRecord[{limit=1, result=pass, pri=2}], MapRecord[{limit=5, result =失败}]]","MapRecord[{datetime=2018-01-10 00:36:00, vers=1}]"

ConvertRecord -- 我正在使用 JSONTreereader 和 CSVRecordSSetwriter 配置如下:

JSONTreereader 控制器服务配置: CSVRecordSetwriter 控制器服务配置: AvroschemaRegistry 控制器服务配置:

Avro 架构: { "name": "myschema", "type": "record", "namespace": "myschema", "fields": [{"name" : "LogData","type": { "name": "LogData", "type": "record", "fields": [{ "name": "Location", "type": "string"},{ "name": "product", "type": "string"} ]}}, {"name": "Outcome","type": { "type": "array", "items": {"name": "Outcome_record" ,"type": "record","fields": [ {"name": "limit","type": "string" }, {"name": "pri","type": ["string","null"] }, {"name": "result","type": "string" }] }}},{"name": "attr","type": { "name": "attr", "type": "record", "fields": [{ "name": "vers", "type": "string"},{ "name": "datetime", "type": "string"} ]}} ]}

似乎您需要在转换为 CSV 之前执行 Jolt 转换,否则将无法正常工作。

在 ConvertRecord 之前在 JoltTransformJSON 中尝试此规范:

  {
    "operation": "shift",
    "spec": {
      "Outcome": {
        "*": {
          "@(3,LogData.Location)": "[#2].location",
          "@(3,LogData.product)": "[#2].product",
          "@(3,attr.vers)": "[#2].vers",
          "@(3,attr.datetime)": "[#2].datetime",
          "*": "[#2].&"
        }
      }
    }
  }
]```