Nifi 将 sql 转换为 json 结构化

Nifi convert sql to json structured

我正在尝试找出一种从 SQL 中获取数据并将其格式化为特定 json 格式的方法,但我在 nifi 中很难做到这一点。

table 中的数据如下所示。

{
    "location_id": "123456",
    "name": "My Organization",
    "address_1": "Address 1",
    "address_2": "Suite 123",
    "city": "My City",
    "state": "FL",
    "zip_code": "33333",
    "description": "",
    "longitude": "-2222.132131321332113",
    "latitude": "111.21321321321321321",
    "type": "data type"
}

我想把它转换成这样的格式。

{
    "type": "FeatureCollection",
    "features": [
        {
            "geometry": {
                "type": "Point",
                "coordinates": [
                    $.longitude,
                    $.latitude
                ]
            },
            "type": "Feature",
            "properties": {
                "name": $.name,
                "phone": $.phone_number,
                "address1": $.address_1,
                "address2": $.address_2,
                "city": $.city,
                "state": $.state,
                "zip": $.zip_code,
                "type": $.type
            }
        }
    ]
}

这就是我目前所拥有的,如果我以一种奇怪的方式做这件事,一定要告诉我。

我想我可以将所有这些拆分成单个记录jsons 以这种格式格式化它们。

{
            "geometry": {
                "type": "Point",
                "coordinates": [
                    $.longitude,
                    $.latitude
                ]
            },
            "type": "Feature",
            "properties": {
                "name": $.name,
                "phone": $.phone_number,
                "address1": $.address_1,
                "address2": $.address_2,
                "city": $.city,
                "state": $.state,
                "zip": $.zip_code,
                "type": $.type
            }
        }

然后将所有记录合并在一起,并围绕此合并回来

{
        "type": "FeatureCollection",
        "features": [
        ]
}

我觉得我做这件事很奇怪,只是不知道该怎么做哈哈。

尝试使用 JsonRecordSetWriter 而不是 ExecuteSQL 来执行 ExecuteSQLRecord,这将允许您将行输出为 JSON 对象而无需转换 to/from Avro。如果你没有太多行(这会导致内存不足错误),你可以使用 JoltTransformJSON 来完成整个转换(不拆分行),链规范如下:

[
  {
    "operation": "shift",
    "spec": {
      "#FeatureCollection": "type",
      "*": {
        "#Feature": "features[&1].type",
        "name": "features[&1].properties.name",
        "address_1": "features[&1].properties.address_1",
        "address_2": "features[&1].properties.address_2",
        "city": "features[&1].properties.city",
        "state": "features[&1].properties.state",
        "zip_code": "features[&1].properties.zip",
        "type": "features[&1].properties.type",
        "longitude": "features[&1].geometry.coordinates.longitude",
        "latitude": "features[&1].geometry.coordinates.latitude"
      }
    }
  }
]

如果行太多,您可以使用 SplitJson 将它们拆分成较小的块,然后使用 JoltTransformJSON(使用上述规范)然后使用 MergeRecord 将它们合并回一个大数组。要将它们嵌套到 features 字段中,您可以使用 ReplaceText 来 "wrap" 外部 JSON 对象中的数组,但这也可能导致内存不足错误。