重命名 JSON 中的无效键
rename invalid keys from JSON
我在 NIFI 中有以下流程,JSON 中有 (1000+) 个对象。
invokeHTTP->SPLIT JSON->putMongo
Flow 工作正常,直到我在 json 中收到一些带有“.”的键。在名字里。例如"spark.databricks.acl.dfAclsEnabled".
我目前的解决方案不是最优的,我记下了错误的键,并使用多个替换文本处理器来替换“。”和 ”_”。我没有使用 REGEX,我使用的是字符串文字 find/replace。所以每次我在 putMongo 处理器中遇到故障时,我都会插入新的 replaceText 处理器。
这是不可维护的。我想知道我是否可以为此使用 JOLT?关于输入 JSON.
的一些信息
1) 没有固定的结构,唯一确定的是。一切都将在事件数组中。但是事件对象本身是自由形式的。
2) 最大列表大小 = 1000。
3) 第 3 方 JSON,所以我不能要求更改格式。
此外,带“.”的键可以出现在任何地方。所以我正在寻找可以在所有级别进行清理然后重命名的 JOLT 规范。
{
"events": [
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1531896847915,
"type": "EDITED",
"details": {
"previous_attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled": "true",
"spark.databricks.repl.allowedLanguages": "python,sql"
},
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"previous_cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"user": ""
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535540053785,
"type": "TERMINATING",
"details": {
"reason": {
"code": "INACTIVITY",
"parameters": {
"inactivity_duration_min": "15"
}
}
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535537117300,
"type": "EXPANDED_DISK",
"details": {
"previous_disk_size": 29454626816,
"disk_size": 136828809216,
"free_space": 17151311872,
"instance_id": "6cea5c332af94d7f85aff23e5d8cea37"
}
}
]
}
我使用 ReplaceText
和 RouteOnContent
创建了 a template 来执行此任务。循环是必需的,因为正则表达式在每次传递时仅替换 JSON 键中的第一个 .
。您可能能够改进它以在一次通过中执行所有替换,但是在使用前瞻和后视组对正则表达式进行模糊测试几分钟后,重新路由更快。我验证了这适用于您提供的 JSON,并且 JSON 具有不同行上的键和值(:
两行):
...
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled":
"true",
"spark.databricks.repl.allowedLanguages"
: "python,sql"
},
...
您还可以使用带有 Groovy 的 ExecuteScript
处理器来摄取 JSON,快速过滤所有包含 .
的 JSON 键,执行collect
操作进行替换,如果您希望单个处理器一次性完成此操作,请将密钥重新插入 JSON 数据中。
我在 NIFI 中有以下流程,JSON 中有 (1000+) 个对象。
invokeHTTP->SPLIT JSON->putMongo
Flow 工作正常,直到我在 json 中收到一些带有“.”的键。在名字里。例如"spark.databricks.acl.dfAclsEnabled".
我目前的解决方案不是最优的,我记下了错误的键,并使用多个替换文本处理器来替换“。”和 ”_”。我没有使用 REGEX,我使用的是字符串文字 find/replace。所以每次我在 putMongo 处理器中遇到故障时,我都会插入新的 replaceText 处理器。
这是不可维护的。我想知道我是否可以为此使用 JOLT?关于输入 JSON.
的一些信息1) 没有固定的结构,唯一确定的是。一切都将在事件数组中。但是事件对象本身是自由形式的。
2) 最大列表大小 = 1000。
3) 第 3 方 JSON,所以我不能要求更改格式。
此外,带“.”的键可以出现在任何地方。所以我正在寻找可以在所有级别进行清理然后重命名的 JOLT 规范。
{
"events": [
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1531896847915,
"type": "EDITED",
"details": {
"previous_attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled": "true",
"spark.databricks.repl.allowedLanguages": "python,sql"
},
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"attributes": {
"cluster_name": "Kylo",
"spark_version": "4.1.x-scala2.11",
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"autotermination_minutes": 10,
"enable_elastic_disk": true,
"cluster_source": "UI"
},
"previous_cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"cluster_size": {
"autoscale": {
"min_workers": 1,
"max_workers": 8
}
},
"user": ""
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535540053785,
"type": "TERMINATING",
"details": {
"reason": {
"code": "INACTIVITY",
"parameters": {
"inactivity_duration_min": "15"
}
}
}
},
{
"cluster_id": "0717-035521-puny598",
"timestamp": 1535537117300,
"type": "EXPANDED_DISK",
"details": {
"previous_disk_size": 29454626816,
"disk_size": 136828809216,
"free_space": 17151311872,
"instance_id": "6cea5c332af94d7f85aff23e5d8cea37"
}
}
]
}
我使用 ReplaceText
和 RouteOnContent
创建了 a template 来执行此任务。循环是必需的,因为正则表达式在每次传递时仅替换 JSON 键中的第一个 .
。您可能能够改进它以在一次通过中执行所有替换,但是在使用前瞻和后视组对正则表达式进行模糊测试几分钟后,重新路由更快。我验证了这适用于您提供的 JSON,并且 JSON 具有不同行上的键和值(:
两行):
...
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled":
"true",
"spark.databricks.repl.allowedLanguages"
: "python,sql"
},
...
您还可以使用带有 Groovy 的 ExecuteScript
处理器来摄取 JSON,快速过滤所有包含 .
的 JSON 键,执行collect
操作进行替换,如果您希望单个处理器一次性完成此操作,请将密钥重新插入 JSON 数据中。