JSON 使用 Apache NiFi 进行数据操作转换
JSON Transformation with data manipulation using Apache NiFi
我想对下面的示例 JSON 做一些基本的转换,我想将 timeStamp 标签的值更改为日期格式,并想添加一个值为 created_ts
的新标签current_timestamp
使用 NiFi 达到我预期的 JSON 输出。
样本JSON:
{"name": "SAMPLE_NAME","timeStamp": "1477307252000","value": "-0.06279052","quality": "1090"}
预期 JSON:
{"name": "SAMPLE_NAME","timeStamp": "2016-11-08 14:46:13.674","value": "-0.06279052","quality": "1090","created_ts":"2016-11-08 14:46:13.674"}
能否请您帮助了解在 Apache 中遵循的详细步骤 NiFi/HDF。
数据转换未实现。
查看官方文档:
https://github.com/bazaarvoice/jolt#stock-transforms
股票转换
股票转换是:
shift : copy data from the input tree and put it the output tree
default : apply default values to the tree
remove : remove data from the tree
sort : sort the Map key values alphabetically ( for debugging and human readability )
cardinality : "fix" the cardinality of input data. Eg, the "urls" element is usually a List,
but if there is only one, then it is a String
目前,所有 Stock 转换仅影响数据的 "structure"。
要进行数据操作,您需要编写 Java 代码。
如果您编写 Java "data manipulation" 代码来实现 Transform 接口,那么您可以将您的代码插入到转换链中。
所以,为了完成你的任务,我看到了两个主要变体:
V1:
使用以下处理器的顺序:
EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON
在 EvaluateJsonPath
中用 $.name
、$.timeStamp
、...
等表达式定义每个字段属性
在UpdateAttributes
中转换timeStamp
的格式并定义新的属性:
attribute | value/expression
-----------------------------------------------------------
timeStamp | timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
created_ts | now():format('yyyy-MM-dd HH:mm:ss.SSS')
在AttributesToJSON
中定义Attributes List
作为json对象存储到文件内容中
V2: 使用 ExecuteScript
处理器,代码如下:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
//parse reader into Map
def json = new JsonSlurper().parse(reader)
//change/set values
json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
//write changed object to writer
new JsonBuilder(json).writeTo(writer)
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)
我想对下面的示例 JSON 做一些基本的转换,我想将 timeStamp 标签的值更改为日期格式,并想添加一个值为 created_ts
的新标签current_timestamp
使用 NiFi 达到我预期的 JSON 输出。
样本JSON:
{"name": "SAMPLE_NAME","timeStamp": "1477307252000","value": "-0.06279052","quality": "1090"}
预期 JSON:
{"name": "SAMPLE_NAME","timeStamp": "2016-11-08 14:46:13.674","value": "-0.06279052","quality": "1090","created_ts":"2016-11-08 14:46:13.674"}
能否请您帮助了解在 Apache 中遵循的详细步骤 NiFi/HDF。
数据转换未实现。
查看官方文档:
https://github.com/bazaarvoice/jolt#stock-transforms
股票转换
股票转换是:
shift : copy data from the input tree and put it the output tree
default : apply default values to the tree
remove : remove data from the tree
sort : sort the Map key values alphabetically ( for debugging and human readability )
cardinality : "fix" the cardinality of input data. Eg, the "urls" element is usually a List,
but if there is only one, then it is a String
目前,所有 Stock 转换仅影响数据的 "structure"。
要进行数据操作,您需要编写 Java 代码。
如果您编写 Java "data manipulation" 代码来实现 Transform 接口,那么您可以将您的代码插入到转换链中。
所以,为了完成你的任务,我看到了两个主要变体:
V1:
使用以下处理器的顺序:
EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON
在 EvaluateJsonPath
中用 $.name
、$.timeStamp
、...
在UpdateAttributes
中转换timeStamp
的格式并定义新的属性:
attribute | value/expression
-----------------------------------------------------------
timeStamp | timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
created_ts | now():format('yyyy-MM-dd HH:mm:ss.SSS')
在AttributesToJSON
中定义Attributes List
作为json对象存储到文件内容中
V2: 使用 ExecuteScript
处理器,代码如下:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
//parse reader into Map
def json = new JsonSlurper().parse(reader)
//change/set values
json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
//write changed object to writer
new JsonBuilder(json).writeTo(writer)
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)