使用 PySpark 将数组值分解为多列

Explode array values into multiple columns using PySpark

我是 pyspark 的新手,我想分解数组值,使每个值都分配给一个新列。我尝试使用 explode 但无法获得所需的输出。下面是我的输出

这是代码

from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
    .master("local[3]") \
    .appName("DataOps") \
    .getOrCreate()

dataFrameJSON = spark.read \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .json("data.json")

dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()

sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)

new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()

这是数据:

{
        "values" : {
            "line" : [
                {
                    "id" : 1,
                    "period" : {
                        "start_ts" : "2020-01-01T00:00:00",
                        "end_ts" : "2020-01-01T00:15:00"
                    },
                    "property" : [
                        {
                            "name" : "PID",
                            "val" : "P120E12345678"
                        },
                        {
                            "name" : "EngID",
                            "val" : "PANELID00000000"
                        },
                        {
                            "name" : "TownIstat",
                            "val" : "12058091"
                        },
                        {
                            "name" : "ActiveEng",
                            "val" : "5678.1"
                        }
                    ]
                }
}

您可以包含数据而不是屏幕截图吗?

同时,假设 df 是正在使用的数据框,我们需要做的是创建一个新的数据框,同时从之前的 property 数组中提取 vals到新列,最后删除 property 列:

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0].val).withColumn("EngID", col("property")[1].val).withColumn("TownIstat", col("property")[2].val).withColumn("ActiveEng", col("property")[3].val).drop("property")

如果 elementArrayType 类型,请使用以下内容:

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0][1]).withColumn("EngID", col("property")[1][1]).withColumn("TownIstat", col("property")[2][1]).withColumn("ActiveEng", col("property")[3][1]).drop("property")

Explode 会将数组分解为新的行,而不是列,请参阅:pyspark explode

这是一个通用的解决方案,即使在 JSON 杂乱无章的情况下也能正常工作(元素的不同排序或者某些元素缺失)

你必须先展平regexp_replace拆分'property'列,最后枢轴。这也避免了新列名称的硬编码。

构建数据框:

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("id", IntegerType()), StructField("start_ts", StringType()), StructField("end_ts", StringType()), \
    StructField("property", ArrayType(StructType(  [StructField("name", StringType()),  StructField("val", StringType())]    )))])

data = [[1, "2010", "2020", [["PID", "P123"], ["Eng", "PA111"], ["Town", "999"], ["Act", "123.1"]]],\
         [2, "2011", "2012", [["PID", "P456"], ["Eng", "PA222"], ["Town", "777"], ["Act", "234.1"]]]]

df = spark.createDataFrame(data,schema=schema)

df.show(truncate=False)
+---+--------+------+------------------------------------------------------+    
|id |start_ts|end_ts|property                                              |
+---+--------+------+------------------------------------------------------+
|1  |2010    |2020  |[[PID, P123], [Eng, PA111], [Town, 999], [Act, 123.1]]|
|2  |2011    |2012  |[[PID, P456], [Eng, PA222], [Town, 777], [Act, 234.1]]|
+---+--------+------+------------------------------------------------------+

展平和旋转:

df_flatten = df.rdd.flatMap(lambda x: [(x[0],x[1], x[2], y) for y in x[3]]).toDF(['id', 'start_ts', 'end_ts', 'property'])\
            .select('id', 'start_ts', 'end_ts', col("property").cast("string"))

df_split = df_flatten.select('id', 'start_ts', 'end_ts', regexp_replace(df_flatten.property, "[\[\]]", "").alias("replacced_col"))\
                .withColumn("arr", split(col("replacced_col"), ", "))\
                .select(col("arr")[0].alias("col1"), col("arr")[1].alias("col2"), 'id', 'start_ts', 'end_ts')

final_df = df_split.groupby(df_split.id,)\
                        .pivot("col1")\
                        .agg(first("col2"))\
                        .join(df,'id').drop("property")

输出:

final_df.show()
+---+-----+-----+----+----+--------+------+
| id|  Act|  Eng| PID|Town|start_ts|end_ts|
+---+-----+-----+----+----+--------+------+
|  1|123.1|PA111|P123| 999|    2010|  2020|
|  2|234.1|PA222|P456| 777|    2011|  2012|
+---+-----+-----+----+----+--------+------+