使用 Pyspark 从数组中读取 JSON 项?
Using Pyspark to read JSON items from an array?
我在从数据块中的 Cosmos DB 中读取项目时遇到了一些问题,它似乎将 JSON 读取为字符串值,并且在将数据从中读取到列时遇到了一些问题。
我有一个名为 ProductRanges 的列,其中连续包含以下值:
[ {
"name": "Red",
"min": 0,
"max": 99,
"value": "Order More"
},
{
"name": "Amber",
"min": 100,
"max": 499,
"value": "Stock OK"
},
{
"name": "Green",
"min": 500,
"max": 1000000,
"value": "Overstocked"
}
]
在 Cosmos DB 中,JSON 文档是有效的,导入数据时数据框中的数据类型是字符串,而不是我期望的 JSON object/struct。
我希望能够计算 "name" 出现的次数并遍历它们以获得最小值、最大值和值项,因为我们可以拥有的范围数可以是超过 3 个。我在 Whosebug 和其他地方浏览了一些 post,但仍然停留在格式上。我尝试使用 explode 并读取基于列值的模式,但它确实显示 'in vaild document',认为这可能是由于 Pyspark 在开始和结束时需要 {},但甚至将其连接在来自 cosmos db 的 SQL 查询仍然以字符串的数据类型结束。
如有指点,将不胜感激
我看到您从 Azure CosmosDB 检索了 JSON 文档并将它们转换为 PySpark DataFrame,但是嵌套的 JSON 文档或数组无法转换为 JSON 中的对象DataFrame 列符合您的预期,因为 pyspark.sql.types
模块中没有定义 JSON 类型,如下所示。
我搜索了一篇文档PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame
,它是适合您当前案例的解决方案,甚至与您想要的一样,而我正在尝试解决它。
上面的文档显示了如何使用 ArrayType
、StructType
、StructField
和其他基本 PySpark 数据类型将列中的 JSON 字符串转换为组合数据类型通过定义列架构和 UDF,可以在 PySpark 中更轻松地处理它。
这里是示例代码的总结。希望对你有帮助。
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
JSON通过sqlContext读入一个数据框。输出为:
+------+--------------------+
|attr_1| attr_2|
+------+--------------------+
| 1|[{"a":1,"b":1},{"...|
| 2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
然后,通过定义列架构和 UDF 转换 attr_2
列。
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
输出结果如下:
+------+--------------+
|attr_1| attr_2|
+------+--------------+
| 1|[[1,1], [2,2]]|
| 2|[[3,3], [4,4]]|
+------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)
我在从数据块中的 Cosmos DB 中读取项目时遇到了一些问题,它似乎将 JSON 读取为字符串值,并且在将数据从中读取到列时遇到了一些问题。
我有一个名为 ProductRanges 的列,其中连续包含以下值:
[ {
"name": "Red",
"min": 0,
"max": 99,
"value": "Order More"
},
{
"name": "Amber",
"min": 100,
"max": 499,
"value": "Stock OK"
},
{
"name": "Green",
"min": 500,
"max": 1000000,
"value": "Overstocked"
}
]
在 Cosmos DB 中,JSON 文档是有效的,导入数据时数据框中的数据类型是字符串,而不是我期望的 JSON object/struct。
我希望能够计算 "name" 出现的次数并遍历它们以获得最小值、最大值和值项,因为我们可以拥有的范围数可以是超过 3 个。我在 Whosebug 和其他地方浏览了一些 post,但仍然停留在格式上。我尝试使用 explode 并读取基于列值的模式,但它确实显示 'in vaild document',认为这可能是由于 Pyspark 在开始和结束时需要 {},但甚至将其连接在来自 cosmos db 的 SQL 查询仍然以字符串的数据类型结束。
如有指点,将不胜感激
我看到您从 Azure CosmosDB 检索了 JSON 文档并将它们转换为 PySpark DataFrame,但是嵌套的 JSON 文档或数组无法转换为 JSON 中的对象DataFrame 列符合您的预期,因为 pyspark.sql.types
模块中没有定义 JSON 类型,如下所示。
我搜索了一篇文档PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame
,它是适合您当前案例的解决方案,甚至与您想要的一样,而我正在尝试解决它。
上面的文档显示了如何使用 ArrayType
、StructType
、StructField
和其他基本 PySpark 数据类型将列中的 JSON 字符串转换为组合数据类型通过定义列架构和 UDF,可以在 PySpark 中更轻松地处理它。
这里是示例代码的总结。希望对你有帮助。
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
JSON通过sqlContext读入一个数据框。输出为:
+------+--------------------+
|attr_1| attr_2|
+------+--------------------+
| 1|[{"a":1,"b":1},{"...|
| 2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
然后,通过定义列架构和 UDF 转换 attr_2
列。
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
输出结果如下:
+------+--------------+
|attr_1| attr_2|
+------+--------------+
| 1|[[1,1], [2,2]]|
| 2|[[3,3], [4,4]]|
+------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)