使用 apache Nifi 将 Json 数组转换为 csv
Convert Json array into csv using apache Nifi
我希望将 JSON 数组转换为 csv 格式。数组中的元素数量对于每一行都是动态的。我尝试使用此流程,(在 post 上附加了流程文件 xml )。
GetFile --> ConvertRecord --> UpdateAttribute --> PutFile
还有其他选择吗?
JSON格式:
{ "LogData": {
"Location": "APAC",
"product": "w1" }, "Outcome": [
{
"limit": "0",
"pri": "3",
"result": "pass"
},
{
"limit": "1",
"pri": "2",
"result": "pass"
},
{
"limit": "5",
"priority": "1",
"result": "fail"
} ], "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00" }}
csv 格式的预期输出:
location, product, limit, pri, result, vers, datetime
APAC w1 0 3 pass 1 2018-01-10 00:36:00
APAC w1 1 2 pass 1 2018-01-10 00:36:00
APAC w1 5 1 fail 1 2018-01-10 00:36:00
附加流程的输出:
日志数据、结果、属性
"MapRecord[{product=w1, Location=APAC}]",[MapRecord[{limit=0, result=pass, pri=3}], MapRecord[{limit=1, result=pass, pri=2}], MapRecord[{limit=5, result =失败}]]","MapRecord[{datetime=2018-01-10 00:36:00, vers=1}]"
ConvertRecord -- 我正在使用 JSONTreereader 和 CSVRecordSSetwriter 配置如下:
JSONTreereader 控制器服务配置:
CSVRecordSetwriter 控制器服务配置:
AvroschemaRegistry 控制器服务配置:
Avro 架构:
{ "name": "myschema", "type": "record", "namespace": "myschema", "fields": [{"name" : "LogData","type": { "name": "LogData", "type": "record", "fields": [{ "name": "Location", "type": "string"},{ "name": "product", "type": "string"} ]}}, {"name": "Outcome","type": { "type": "array", "items": {"name": "Outcome_record" ,"type": "record","fields": [ {"name": "limit","type": "string" }, {"name": "pri","type": ["string","null"] }, {"name": "result","type": "string" }] }}},{"name": "attr","type": { "name": "attr", "type": "record", "fields": [{ "name": "vers", "type": "string"},{ "name": "datetime", "type": "string"} ]}} ]}
似乎您需要在转换为 CSV 之前执行 Jolt 转换,否则将无法正常工作。
在 ConvertRecord 之前在 JoltTransformJSON 中尝试此规范:
{
"operation": "shift",
"spec": {
"Outcome": {
"*": {
"@(3,LogData.Location)": "[#2].location",
"@(3,LogData.product)": "[#2].product",
"@(3,attr.vers)": "[#2].vers",
"@(3,attr.datetime)": "[#2].datetime",
"*": "[#2].&"
}
}
}
}
]```
我希望将 JSON 数组转换为 csv 格式。数组中的元素数量对于每一行都是动态的。我尝试使用此流程,(在 post 上附加了流程文件 xml )。
GetFile --> ConvertRecord --> UpdateAttribute --> PutFile
还有其他选择吗?
JSON格式:
{ "LogData": {
"Location": "APAC",
"product": "w1" }, "Outcome": [
{
"limit": "0",
"pri": "3",
"result": "pass"
},
{
"limit": "1",
"pri": "2",
"result": "pass"
},
{
"limit": "5",
"priority": "1",
"result": "fail"
} ], "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00" }}
csv 格式的预期输出:
location, product, limit, pri, result, vers, datetime
APAC w1 0 3 pass 1 2018-01-10 00:36:00
APAC w1 1 2 pass 1 2018-01-10 00:36:00
APAC w1 5 1 fail 1 2018-01-10 00:36:00
附加流程的输出: 日志数据、结果、属性 "MapRecord[{product=w1, Location=APAC}]",[MapRecord[{limit=0, result=pass, pri=3}], MapRecord[{limit=1, result=pass, pri=2}], MapRecord[{limit=5, result =失败}]]","MapRecord[{datetime=2018-01-10 00:36:00, vers=1}]"
ConvertRecord -- 我正在使用 JSONTreereader 和 CSVRecordSSetwriter 配置如下:
JSONTreereader 控制器服务配置:
Avro 架构: { "name": "myschema", "type": "record", "namespace": "myschema", "fields": [{"name" : "LogData","type": { "name": "LogData", "type": "record", "fields": [{ "name": "Location", "type": "string"},{ "name": "product", "type": "string"} ]}}, {"name": "Outcome","type": { "type": "array", "items": {"name": "Outcome_record" ,"type": "record","fields": [ {"name": "limit","type": "string" }, {"name": "pri","type": ["string","null"] }, {"name": "result","type": "string" }] }}},{"name": "attr","type": { "name": "attr", "type": "record", "fields": [{ "name": "vers", "type": "string"},{ "name": "datetime", "type": "string"} ]}} ]}
似乎您需要在转换为 CSV 之前执行 Jolt 转换,否则将无法正常工作。
在 ConvertRecord 之前在 JoltTransformJSON 中尝试此规范:
{
"operation": "shift",
"spec": {
"Outcome": {
"*": {
"@(3,LogData.Location)": "[#2].location",
"@(3,LogData.product)": "[#2].product",
"@(3,attr.vers)": "[#2].vers",
"@(3,attr.datetime)": "[#2].datetime",
"*": "[#2].&"
}
}
}
}
]```