将字典列表作为输入转换为 pyspark 中的不同列

conver list of dictionaries as input to different columns in pyspark

我得到的数据是 3 个字符串列(看起来像字典列表)并想将其转换为列 如下所示(示例),

输入数据

预期输出,

基本上我需要帮助来转换

[{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]

(作为字符串)到列

|VAT|price|
|20|160|

如果对您有帮助,请查看以下内容:

_my_data = [{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]
# Using list comprehension
# to iterate over the dictionaries in the list
# picking values only, which are not alphabets
values = [dict_values for element in _my_data for dict_values in element.values() if not dict_values.isalpha()]
print(values)
#####
['20.00', '160.00']

使用 from_json 将您的字符串解析为 JSON 对象和 select 每列所需的确切元素。

from pyspark.sql import functions as F

(df
    .withColumn('kpi', F.from_json('kpi', 'array<struct<name:string, value:string>>'))
    .withColumn('vat', F.col('kpi')[0]['value'])
    .withColumn('price', F.col('kpi')[1]['value'])
    .select('vat', 'price')
    .show(10, False)
)

+-----+------+
|vat  |price |
+-----+------+
|20.00|160.00|
|10.00|610.00|
+-----+------+

如下使用 Pyspark -

输入数据

from pyspark.sql.functions import *
from pyspark.sql.types import *
# StructType([StructField("name", StringType(), True), StructField("value", StringType(), True)])

schema = StructType([
                     StructField("kpi", StringType(), True),
                     StructField("productid", IntegerType(), True),
                     StructField("productname", StringType(), True)
                    ]
                   )

df = spark.createDataFrame([("""[{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]""", 1, "tshirt"), ("""[{"name":"vat","value":"20.00"},{"name":"price","value":"120.00"}]""", 2, "cap"), ("""[{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]""", 3, "shoes")], schema = schema)

df.createOrReplaceTempView("tbl")

df.show(truncate=False)

+------------------------------------------------------------------+---------+-----------+
|kpi                                                               |productid|productname|
+------------------------------------------------------------------+---------+-----------+
|[{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]|1        |tshirt     |
|[{"name":"vat","value":"20.00"},{"name":"price","value":"120.00"}]|2        |cap        |
|[{"name":"vat","value":"20.00"},{"name":"price","value":"160.00"}]|3        |shoes      |
+------------------------------------------------------------------+---------+-----------+

需要输出 -

(df.select(from_json("kpi", ArrayType(StringType())).alias("col"), "productid", "productname")
   .select(explode("col"),"productid", "productname")
   .select(from_json("col", MapType(StringType(), StringType())).alias("kpi"), "productid", "productname")
   .select("kpi.name", "kpi.value","productid", "productname")
   .groupBy("productid", "productname").pivot("name").agg(first("value"))
).show(truncate=False)

+---------+-----------+------+-----+
|productid|productname|price |vat  |
+---------+-----------+------+-----+
|1        |tshirt     |160.00|20.00|
|2        |cap        |120.00|20.00|
|3        |shoes      |160.00|20.00|
+---------+-----------+------+-----+