重命名 JSON 中的无效键

rename invalid keys from JSON

我在 NIFI 中有以下流程,JSON 中有 (1000+) 个对象。

invokeHTTP->SPLIT JSON->putMongo

Flow 工作正常,直到我在 json 中收到一些带有“.”的键。在名字里。例如"spark.databricks.acl.dfAclsEnabled".

我目前的解决方案不是最优的,我记下了错误的键,并使用多个替换文本处理器来替换“。”和 ”_”。我没有使用 REGEX,我使用的是字符串文字 find/replace。所以每次我在 putMongo 处理器中遇到故障时,我都会插入新的 replaceText 处理器。

这是不可维护的。我想知道我是否可以为此使用 JOLT?关于输入 JSON.

的一些信息

1) 没有固定的结构,唯一确定的是。一切都将在事件数组中。但是事件对象本身是自由形式的。

2) 最大列表大小 = 1000。

3) 第 3 方 JSON,所以我不能要求更改格式。

此外,带“.”的键可以出现在任何地方。所以我正在寻找可以在所有级别进行清理然后重命名的 JOLT 规范。

{
  "events": [
    {
            "cluster_id": "0717-035521-puny598",
            "timestamp": 1531896847915,
            "type": "EDITED",
            "details": {
                "previous_attributes": {
                    "cluster_name": "Kylo",
                    "spark_version": "4.1.x-scala2.11",
                    "spark_conf": {
                        "spark.databricks.acl.dfAclsEnabled": "true",
                        "spark.databricks.repl.allowedLanguages": "python,sql"
                    },
                    "node_type_id": "Standard_DS3_v2",
                    "driver_node_type_id": "Standard_DS3_v2",
                    "autotermination_minutes": 10,
                    "enable_elastic_disk": true,
                    "cluster_source": "UI"
                },
                "attributes": {
                    "cluster_name": "Kylo",
                    "spark_version": "4.1.x-scala2.11",
                    "node_type_id": "Standard_DS3_v2",
                    "driver_node_type_id": "Standard_DS3_v2",
                    "autotermination_minutes": 10,
                    "enable_elastic_disk": true,
                    "cluster_source": "UI"
                },
                "previous_cluster_size": {
                    "autoscale": {
                        "min_workers": 1,
                        "max_workers": 8
                    }
                },
                "cluster_size": {
                    "autoscale": {
                        "min_workers": 1,
                        "max_workers": 8
                    }
                },
                "user": ""
            }
        },
    {
      "cluster_id": "0717-035521-puny598",
      "timestamp": 1535540053785,
      "type": "TERMINATING",
      "details": {
        "reason": {
          "code": "INACTIVITY",
          "parameters": {
            "inactivity_duration_min": "15"
          }
        }
      }
    },
    {
      "cluster_id": "0717-035521-puny598",
      "timestamp": 1535537117300,
      "type": "EXPANDED_DISK",
      "details": {
        "previous_disk_size": 29454626816,
        "disk_size": 136828809216,
        "free_space": 17151311872,
        "instance_id": "6cea5c332af94d7f85aff23e5d8cea37"
      }
    }
  ]
}

我使用 ReplaceTextRouteOnContent 创建了 a template 来执行此任务。循环是必需的,因为正则表达式在每次传递时仅替换 JSON 键中的第一个 .。您可能能够改进它以在一次通过中执行所有替换,但是在使用前瞻和后视组对正则表达式进行模糊测试几分钟后,重新路由更快。我验证了这适用于您提供的 JSON,并且 JSON 具有不同行上的键和值(: 两行):

...
"spark_conf": {
                        "spark.databricks.acl.dfAclsEnabled":
 "true",
                        "spark.databricks.repl.allowedLanguages"
: "python,sql"
                    },
...

您还可以使用带有 Groovy 的 ExecuteScript 处理器来摄取 JSON,快速过滤所有包含 . 的 JSON 键,执行collect 操作进行替换,如果您希望单个处理器一次性完成此操作,请将密钥重新插入 JSON 数据中。