使用 Pyspark 从数组中读取 JSON 项?

Using Pyspark to read JSON items from an array?

我在从数据块中的 Cosmos DB 中读取项目时遇到了一些问题,它似乎将 JSON 读取为字符串值,并且在将数据从中读取到列时遇到了一些问题。

我有一个名为 ProductRanges 的列,其中连续包含以下值:

[   {
        "name": "Red",
        "min": 0,
        "max": 99,
        "value": "Order More"
    },
    {
        "name": "Amber",
        "min": 100,
        "max": 499,
        "value": "Stock OK"
    },
    {
        "name": "Green",
        "min": 500,
        "max": 1000000,
        "value": "Overstocked"
    }
]

在 Cosmos DB 中,JSON 文档是有效的,导入数据时数据框中的数据类型是字符串,而不是我期望的 JSON object/struct。

我希望能够计算 "name" 出现的次数并遍历它们以获得最小值、最大值和值项,因为我们可以拥有的范围数可以是超过 3 个。我在 Whosebug 和其他地方浏览了一些 post,但仍然停留在格式上。我尝试使用 explode 并读取基于列值的模式,但它确实显示 'in vaild document',认为这可能是由于 Pyspark 在开始和结束时需要 {},但甚至将其连接在来自 cosmos db 的 SQL 查询仍然以字符串的数据类型结束。

如有指点,将不胜感激

我看到您从 Azure CosmosDB 检索了 JSON 文档并将它们转换为 PySpark DataFrame,但是嵌套的 JSON 文档或数组无法转换为 JSON 中的对象DataFrame 列符合您的预期,因为 pyspark.sql.types 模块中没有定义 JSON 类型,如下所示。

我搜索了一篇文档PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame,它是适合您当前案例的解决方案,甚至与您想要的一样,而我正在尝试解决它。​​

上面的文档显示了如何使用 ArrayTypeStructTypeStructField 和其他基本 PySpark 数据类型将列中的 JSON 字符串转换为组合数据类型通过定义列架构和 UDF,可以在 PySpark 中更轻松地处理它。

这里是示例代码的总结。希望对你有帮助。

source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]

JSON通过sqlContext读入一个数据框。输出为:

+------+--------------------+

|attr_1|              attr_2|

+------+--------------------+

|     1|[{"a":1,"b":1},{"...|

|     2|[{"a":3,"b":3},{"...|

+------+--------------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: string (nullable = true)

然后,通过定义列架构和 UDF 转换 attr_2 列。

# Function to convert JSON array string to a list
import json

def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField

json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))

# Define udf
from pyspark.sql.functions import udf

udf_parse_json = udf(lambda str: parse_json(str), json_schema)

# Generate a new data frame with the expected schema

df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()

输出结果如下:

+------+--------------+

|attr_1|        attr_2|

+------+--------------+

|     1|[[1,1], [2,2]]|

|     2|[[3,3], [4,4]]|

+------+--------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- a: integer (nullable = false)
  |    |    |-- b: integer (nullable = false)