如何将 Json 转换为 CSV 并将其发送到大查询或 google 云存储桶
How to convert Json to CSV and send it to big query or google cloud bucket
我是 nifi 的新手,我想将大量 json 数据转换为 csv 格式。
这是我目前正在做的,但这不是预期的结果。
这些是步骤:
使用 InvokeHTTP 创建 access_token 和发送请求 body 的进程(这部分工作正常我不会命名进程,因为这是预期的结果)并获得响应 body json.
json 响应示例:
[
{
"results":[
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdaasdasdad",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdasda"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdasdasdasd",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdas"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
....etc....
]
}
]
现在我正在使用:
===>SplitJson ($[].results[])==>具有此规范的 JoltTransformJSON:
[{
"operation": "shift",
"spec": {
"customer": {
"id": "customer_id"
},
"campaign": {
"id": "campaign_id",
"name": "campaign_name"
},
"adGroup": {
"id": "ad_group_id",
"name": "ad_group_name"
},
"metrics": {
"clicks": "clicks",
"costMicros": "cost",
"impressions": "impressions"
},
"segments": {
"device": "device",
"date": "date"
},
"incomeRangeView": {
"resourceName": "keywords_id"
}
}
}]
==>> MergeContent(这是我不知道如何解决的问题)
合并策略:碎片整理
合并格式:二进制串联
属性策略只保留公共属性
Bins 的最大数量 5(我尝试了 10 个相同的结果)
分隔符策略:文本
Header: [
页脚:]
分界符:,
我得到的结果是什么?
我得到一个 json 文件,其中包含部分 json 数据
示例:我在 1 个 json 文件中有 50k customer_ids,所以我想将此数据发送到大查询 table 并将所有 ID 都放在同一字段“customer_id”下.
MergeContent 使用拆分 json 文件并将它们合并,但我仍然会为每个文件获得 10k customer_ids,即我有 5 个文件,而不是 1 个 50k customer_ids
在 MergeContent 之后,我使用 ==>>ConvertRecord 以及这些设置:
RecordReaderJsonTreeReader(Schema访问策略:InferSchema)
记录器 CsvRecordWriter (
Schema 写策略:不写 Schema
架构访问策略:继承记录架构
CSV 格式:微软 Excel
包括 Header 行:true
字符集 UTF-8
)
==>>UpdateAttribute (custom prop: filename: ${filename}.csv) ==>> PutGCSObject(并将数据放入 google 桶中(这一步工作正常 - 我能够将文件放在那里))
使用这种方法我无法将数据发送到大查询(在 MergeContent 之后我尝试使用 PutBigQueryBatch 并在 bq sheel 中使用此命令来获取我需要的模式:
bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields'
我根据需要填写了所有字段并加载文件类型:如果我将其转换为 CSV,我尝试了 NEWLINE_DELIMITED_JSON 或 CSV(我没有收到错误,但没有数据上传到 table )
)
我做错了什么?我基本上想以每个字段数据都在相同字段名称下的方式映射数据
您缺少的技巧是使用记录。
不要使用 X>SplitJson>JoltTransformJson>Merge>Convert>X,而是尝试使用 X>JoltTransformRecord>X 和 JSON Reader 和 CSV 编写器。这跳过了很多效率低下的问题。
如果您真的需要拆分(除非完全必要,否则您应该避免拆分和合并),您可以改用 MergeRecord - 再次使用 JSON Reader 和 CSV 编写器。这将使您的流程 X>Split>Jolt>MergeRecord>X.
我是 nifi 的新手,我想将大量 json 数据转换为 csv 格式。 这是我目前正在做的,但这不是预期的结果。
这些是步骤:
使用 InvokeHTTP 创建 access_token 和发送请求 body 的进程(这部分工作正常我不会命名进程,因为这是预期的结果)并获得响应 body json.
json 响应示例:
[
{
"results":[
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdaasdasdad",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdasda"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdasdasdasd",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdas"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
....etc....
]
}
]
现在我正在使用: ===>SplitJson ($[].results[])==>具有此规范的 JoltTransformJSON:
[{
"operation": "shift",
"spec": {
"customer": {
"id": "customer_id"
},
"campaign": {
"id": "campaign_id",
"name": "campaign_name"
},
"adGroup": {
"id": "ad_group_id",
"name": "ad_group_name"
},
"metrics": {
"clicks": "clicks",
"costMicros": "cost",
"impressions": "impressions"
},
"segments": {
"device": "device",
"date": "date"
},
"incomeRangeView": {
"resourceName": "keywords_id"
}
}
}]
==>> MergeContent(这是我不知道如何解决的问题) 合并策略:碎片整理 合并格式:二进制串联 属性策略只保留公共属性 Bins 的最大数量 5(我尝试了 10 个相同的结果) 分隔符策略:文本 Header: [ 页脚:] 分界符:,
我得到的结果是什么? 我得到一个 json 文件,其中包含部分 json 数据 示例:我在 1 个 json 文件中有 50k customer_ids,所以我想将此数据发送到大查询 table 并将所有 ID 都放在同一字段“customer_id”下.
MergeContent 使用拆分 json 文件并将它们合并,但我仍然会为每个文件获得 10k customer_ids,即我有 5 个文件,而不是 1 个 50k customer_ids
在 MergeContent 之后,我使用 ==>>ConvertRecord 以及这些设置: RecordReaderJsonTreeReader(Schema访问策略:InferSchema) 记录器 CsvRecordWriter ( Schema 写策略:不写 Schema 架构访问策略:继承记录架构 CSV 格式:微软 Excel 包括 Header 行:true 字符集 UTF-8 )
==>>UpdateAttribute (custom prop: filename: ${filename}.csv) ==>> PutGCSObject(并将数据放入 google 桶中(这一步工作正常 - 我能够将文件放在那里))
使用这种方法我无法将数据发送到大查询(在 MergeContent 之后我尝试使用 PutBigQueryBatch 并在 bq sheel 中使用此命令来获取我需要的模式:
bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields'
我根据需要填写了所有字段并加载文件类型:如果我将其转换为 CSV,我尝试了 NEWLINE_DELIMITED_JSON 或 CSV(我没有收到错误,但没有数据上传到 table ) )
我做错了什么?我基本上想以每个字段数据都在相同字段名称下的方式映射数据
您缺少的技巧是使用记录。
不要使用 X>SplitJson>JoltTransformJson>Merge>Convert>X,而是尝试使用 X>JoltTransformRecord>X 和 JSON Reader 和 CSV 编写器。这跳过了很多效率低下的问题。
如果您真的需要拆分(除非完全必要,否则您应该避免拆分和合并),您可以改用 MergeRecord - 再次使用 JSON Reader 和 CSV 编写器。这将使您的流程 X>Split>Jolt>MergeRecord>X.