从 Json 加载到 Spark 的数据从 dynamodb 转储到 s3 时出现问题
Issues in data loading into Spark from Json dumped from dynamo db to s3
我需要帮助将这些数据解析为 Spark Dataframe.json
json文件的结构如下:
"categories": {
"M": {
"schoolHash": {
"N": "0.27235612"
},
"audioBooksHash": {
"N": "0.7517752"
},
"contk": {
"N": "0.48212662"
}
}
}
我所做的如下
val schema = StructType(Array(
StructField("categories", MapType(StringType, ArrayType(MapType(StringType, DoubleType))),
true),
StructField("nextRequestAt", StringType, true),
StructField("requestId", StringType, true),
StructField("requestedAt", StringType, true),
StructField("status", StringType, true),
StructField("url", StringType, true),
StructField("validUntil", StringType, true)
))
val df = spark.read.option("multiLine", true)
.schema(schema)
.json(s"${PATH}/*-load-dynamodb-data.json")
O/p 是 -> 类别 = null
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|categories|nextRequestAt |requestId|requestedAt |status|url |validUntil |
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|null |{"N":"1631595262"} |null |{"N":"1629003261857"}|null |{"http://abc"} |{"N":"1636779262"} |
|null |{"N":"1599037109070"}|null |{"N":"1588669021526"}|null |{"S":"http://16k"} |{"N":"1591261022000"}|
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
其他列加载正确,因为它很简单,
仅供参考,此数据是从 dynamoDB 转储的
任何帮助都会非常有用。谢谢
你的 categories
结构有一个 ArrayType
而你提供的 JSON 例子不包含任何列表,它只包含字典,即 MapType
.
这里有一个 categories
结构的例子:
data = [
{
"categories": {
"M": {
"schoolHash": {"N": "0.27235612"},
"audioBooksHash": {"N": "0.7517752"},
"contk": {"N": "0.48212662"},
}
}
}
]
schema = StructType(
[
StructField(
"categories",
MapType(
StringType(),
MapType(
StringType(),
MapType(
StringType(),
StringType(),
),
),
),
)
]
)
df = spark.createDataFrame(data=data, schema=schema)
架构:
root
|-- categories: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
数据框:
+--------------------------------------------------------------------------------------------------------+
|categories |
+--------------------------------------------------------------------------------------------------------+
|{M -> {schoolHash -> {N -> 0.27235612}, contk -> {N -> 0.48212662}, audioBooksHash -> {N -> 0.7517752}}}|
+--------------------------------------------------------------------------------------------------------+
请注意 JSON 中的数字是字符串,因此您不能在架构中使用 DoubleType
。我猜你必须在 explode
之后将它们全部转换为 DoubleType
。另一种方法(如果可能的话)是在将记录加载到 Dataframe 之前预处理 JSON 记录。
我需要帮助将这些数据解析为 Spark Dataframe.json
json文件的结构如下:
"categories": {
"M": {
"schoolHash": {
"N": "0.27235612"
},
"audioBooksHash": {
"N": "0.7517752"
},
"contk": {
"N": "0.48212662"
}
}
}
我所做的如下
val schema = StructType(Array(
StructField("categories", MapType(StringType, ArrayType(MapType(StringType, DoubleType))),
true),
StructField("nextRequestAt", StringType, true),
StructField("requestId", StringType, true),
StructField("requestedAt", StringType, true),
StructField("status", StringType, true),
StructField("url", StringType, true),
StructField("validUntil", StringType, true)
))
val df = spark.read.option("multiLine", true)
.schema(schema)
.json(s"${PATH}/*-load-dynamodb-data.json")
O/p 是 -> 类别 = null
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|categories|nextRequestAt |requestId|requestedAt |status|url |validUntil |
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|null |{"N":"1631595262"} |null |{"N":"1629003261857"}|null |{"http://abc"} |{"N":"1636779262"} |
|null |{"N":"1599037109070"}|null |{"N":"1588669021526"}|null |{"S":"http://16k"} |{"N":"1591261022000"}|
+----------+---------------------+---------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------+---------------------+
其他列加载正确,因为它很简单,
仅供参考,此数据是从 dynamoDB 转储的
任何帮助都会非常有用。谢谢
你的 categories
结构有一个 ArrayType
而你提供的 JSON 例子不包含任何列表,它只包含字典,即 MapType
.
这里有一个 categories
结构的例子:
data = [
{
"categories": {
"M": {
"schoolHash": {"N": "0.27235612"},
"audioBooksHash": {"N": "0.7517752"},
"contk": {"N": "0.48212662"},
}
}
}
]
schema = StructType(
[
StructField(
"categories",
MapType(
StringType(),
MapType(
StringType(),
MapType(
StringType(),
StringType(),
),
),
),
)
]
)
df = spark.createDataFrame(data=data, schema=schema)
架构:
root
|-- categories: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
数据框:
+--------------------------------------------------------------------------------------------------------+
|categories |
+--------------------------------------------------------------------------------------------------------+
|{M -> {schoolHash -> {N -> 0.27235612}, contk -> {N -> 0.48212662}, audioBooksHash -> {N -> 0.7517752}}}|
+--------------------------------------------------------------------------------------------------------+
请注意 JSON 中的数字是字符串,因此您不能在架构中使用 DoubleType
。我猜你必须在 explode
之后将它们全部转换为 DoubleType
。另一种方法(如果可能的话)是在将记录加载到 Dataframe 之前预处理 JSON 记录。