从 Json 加载到 Spark 的数据从 dynamodb 转储到 s3 时出现问题

Issues in data loading into Spark from Json dumped from dynamo db to s3

我需要帮助将这些数据解析为 Spark Dataframe.json

json文件的结构如下:

"categories": {
    "M": {
      "schoolHash": {
        "N": "0.27235612"
      },
      "audioBooksHash": {
        "N": "0.7517752"
      },
      "contk": {
        "N": "0.48212662"
      }
    }
}

我所做的如下

val schema = StructType(Array(
      StructField("categories", MapType(StringType, ArrayType(MapType(StringType, DoubleType))),
        true),
      StructField("nextRequestAt", StringType, true),
      StructField("requestId", StringType, true),
      StructField("requestedAt", StringType, true),
      StructField("status", StringType, true),
      StructField("url", StringType, true),
      StructField("validUntil", StringType, true)


    ))
    val df = spark.read.option("multiLine", true)
        .schema(schema)
        .json(s"${PATH}/*-load-dynamodb-data.json")

O/p 是 -> 类别 = null

+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|categories|nextRequestAt        |requestId|requestedAt          |status|url                                                                                                                                     |validUntil           |
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|null      |{"N":"1631595262"}   |null     |{"N":"1629003261857"}|null  |{"http://abc"}                                                                                                                          |{"N":"1636779262"}   |
|null      |{"N":"1599037109070"}|null     |{"N":"1588669021526"}|null  |{"S":"http://16k"}                                                                                                                      |{"N":"1591261022000"}|
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+

其他列加载正确,因为它很简单,

仅供参考,此数据是从 dynamoDB 转储的

任何帮助都会非常有用。谢谢

你的 categories 结构有一个 ArrayType 而你提供的 JSON 例子不包含任何列表,它只包含字典,即 MapType.

这里有一个 categories 结构的例子:

data = [
    {
        "categories": {
            "M": {
                "schoolHash": {"N": "0.27235612"},
                "audioBooksHash": {"N": "0.7517752"},
                "contk": {"N": "0.48212662"},
            }
        }
    }
]
schema = StructType(
    [
        StructField(
            "categories",
            MapType(
                StringType(),
                MapType(
                    StringType(),
                    MapType(
                        StringType(),
                        StringType(),
                    ),
                ),
            ),
        )
    ]
)

df = spark.createDataFrame(data=data, schema=schema)

架构:

root
 |-- categories: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

数据框:

+--------------------------------------------------------------------------------------------------------+
|categories                                                                                              |
+--------------------------------------------------------------------------------------------------------+
|{M -> {schoolHash -> {N -> 0.27235612}, contk -> {N -> 0.48212662}, audioBooksHash -> {N -> 0.7517752}}}|
+--------------------------------------------------------------------------------------------------------+

请注意 JSON 中的数字是字符串,因此您不能在架构中使用 DoubleType。我猜你必须在 explode 之后将它们全部转换为 DoubleType。另一种方法(如果可能的话)是在将记录加载到 Dataframe 之前预处理 JSON 记录。