使用 PySpark 将数组值分解为多列
Explode array values into multiple columns using PySpark
我是 pyspark 的新手,我想分解数组值,使每个值都分配给一个新列。我尝试使用 explode 但无法获得所需的输出。下面是我的输出
这是代码
from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local[3]") \
.appName("DataOps") \
.getOrCreate()
dataFrameJSON = spark.read \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.json("data.json")
dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()
sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)
new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()
这是数据:
{
"values" : {
"line" : [
{
"id" : 1,
"period" : {
"start_ts" : "2020-01-01T00:00:00",
"end_ts" : "2020-01-01T00:15:00"
},
"property" : [
{
"name" : "PID",
"val" : "P120E12345678"
},
{
"name" : "EngID",
"val" : "PANELID00000000"
},
{
"name" : "TownIstat",
"val" : "12058091"
},
{
"name" : "ActiveEng",
"val" : "5678.1"
}
]
}
}
您可以包含数据而不是屏幕截图吗?
同时,假设 df
是正在使用的数据框,我们需要做的是创建一个新的数据框,同时从之前的 property
数组中提取 vals
到新列,最后删除 property
列:
from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0].val).withColumn("EngID", col("property")[1].val).withColumn("TownIstat", col("property")[2].val).withColumn("ActiveEng", col("property")[3].val).drop("property")
如果 element
是 ArrayType
类型,请使用以下内容:
from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0][1]).withColumn("EngID", col("property")[1][1]).withColumn("TownIstat", col("property")[2][1]).withColumn("ActiveEng", col("property")[3][1]).drop("property")
Explode 会将数组分解为新的行,而不是列,请参阅:pyspark explode
这是一个通用的解决方案,即使在 JSON 杂乱无章的情况下也能正常工作(元素的不同排序或者某些元素缺失)
你必须先展平,regexp_replace
到拆分'property'列,最后枢轴。这也避免了新列名称的硬编码。
构建数据框:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("id", IntegerType()), StructField("start_ts", StringType()), StructField("end_ts", StringType()), \
StructField("property", ArrayType(StructType( [StructField("name", StringType()), StructField("val", StringType())] )))])
data = [[1, "2010", "2020", [["PID", "P123"], ["Eng", "PA111"], ["Town", "999"], ["Act", "123.1"]]],\
[2, "2011", "2012", [["PID", "P456"], ["Eng", "PA222"], ["Town", "777"], ["Act", "234.1"]]]]
df = spark.createDataFrame(data,schema=schema)
df.show(truncate=False)
+---+--------+------+------------------------------------------------------+
|id |start_ts|end_ts|property |
+---+--------+------+------------------------------------------------------+
|1 |2010 |2020 |[[PID, P123], [Eng, PA111], [Town, 999], [Act, 123.1]]|
|2 |2011 |2012 |[[PID, P456], [Eng, PA222], [Town, 777], [Act, 234.1]]|
+---+--------+------+------------------------------------------------------+
展平和旋转:
df_flatten = df.rdd.flatMap(lambda x: [(x[0],x[1], x[2], y) for y in x[3]]).toDF(['id', 'start_ts', 'end_ts', 'property'])\
.select('id', 'start_ts', 'end_ts', col("property").cast("string"))
df_split = df_flatten.select('id', 'start_ts', 'end_ts', regexp_replace(df_flatten.property, "[\[\]]", "").alias("replacced_col"))\
.withColumn("arr", split(col("replacced_col"), ", "))\
.select(col("arr")[0].alias("col1"), col("arr")[1].alias("col2"), 'id', 'start_ts', 'end_ts')
final_df = df_split.groupby(df_split.id,)\
.pivot("col1")\
.agg(first("col2"))\
.join(df,'id').drop("property")
输出:
final_df.show()
+---+-----+-----+----+----+--------+------+
| id| Act| Eng| PID|Town|start_ts|end_ts|
+---+-----+-----+----+----+--------+------+
| 1|123.1|PA111|P123| 999| 2010| 2020|
| 2|234.1|PA222|P456| 777| 2011| 2012|
+---+-----+-----+----+----+--------+------+
我是 pyspark 的新手,我想分解数组值,使每个值都分配给一个新列。我尝试使用 explode 但无法获得所需的输出。下面是我的输出
这是代码
from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local[3]") \
.appName("DataOps") \
.getOrCreate()
dataFrameJSON = spark.read \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.json("data.json")
dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()
sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)
new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()
这是数据:
{
"values" : {
"line" : [
{
"id" : 1,
"period" : {
"start_ts" : "2020-01-01T00:00:00",
"end_ts" : "2020-01-01T00:15:00"
},
"property" : [
{
"name" : "PID",
"val" : "P120E12345678"
},
{
"name" : "EngID",
"val" : "PANELID00000000"
},
{
"name" : "TownIstat",
"val" : "12058091"
},
{
"name" : "ActiveEng",
"val" : "5678.1"
}
]
}
}
您可以包含数据而不是屏幕截图吗?
同时,假设 df
是正在使用的数据框,我们需要做的是创建一个新的数据框,同时从之前的 property
数组中提取 vals
到新列,最后删除 property
列:
from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0].val).withColumn("EngID", col("property")[1].val).withColumn("TownIstat", col("property")[2].val).withColumn("ActiveEng", col("property")[3].val).drop("property")
如果 element
是 ArrayType
类型,请使用以下内容:
from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0][1]).withColumn("EngID", col("property")[1][1]).withColumn("TownIstat", col("property")[2][1]).withColumn("ActiveEng", col("property")[3][1]).drop("property")
Explode 会将数组分解为新的行,而不是列,请参阅:pyspark explode
这是一个通用的解决方案,即使在 JSON 杂乱无章的情况下也能正常工作(元素的不同排序或者某些元素缺失)
你必须先展平,regexp_replace
到拆分'property'列,最后枢轴。这也避免了新列名称的硬编码。
构建数据框:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("id", IntegerType()), StructField("start_ts", StringType()), StructField("end_ts", StringType()), \
StructField("property", ArrayType(StructType( [StructField("name", StringType()), StructField("val", StringType())] )))])
data = [[1, "2010", "2020", [["PID", "P123"], ["Eng", "PA111"], ["Town", "999"], ["Act", "123.1"]]],\
[2, "2011", "2012", [["PID", "P456"], ["Eng", "PA222"], ["Town", "777"], ["Act", "234.1"]]]]
df = spark.createDataFrame(data,schema=schema)
df.show(truncate=False)
+---+--------+------+------------------------------------------------------+
|id |start_ts|end_ts|property |
+---+--------+------+------------------------------------------------------+
|1 |2010 |2020 |[[PID, P123], [Eng, PA111], [Town, 999], [Act, 123.1]]|
|2 |2011 |2012 |[[PID, P456], [Eng, PA222], [Town, 777], [Act, 234.1]]|
+---+--------+------+------------------------------------------------------+
展平和旋转:
df_flatten = df.rdd.flatMap(lambda x: [(x[0],x[1], x[2], y) for y in x[3]]).toDF(['id', 'start_ts', 'end_ts', 'property'])\
.select('id', 'start_ts', 'end_ts', col("property").cast("string"))
df_split = df_flatten.select('id', 'start_ts', 'end_ts', regexp_replace(df_flatten.property, "[\[\]]", "").alias("replacced_col"))\
.withColumn("arr", split(col("replacced_col"), ", "))\
.select(col("arr")[0].alias("col1"), col("arr")[1].alias("col2"), 'id', 'start_ts', 'end_ts')
final_df = df_split.groupby(df_split.id,)\
.pivot("col1")\
.agg(first("col2"))\
.join(df,'id').drop("property")
输出:
final_df.show()
+---+-----+-----+----+----+--------+------+
| id| Act| Eng| PID|Town|start_ts|end_ts|
+---+-----+-----+----+----+--------+------+
| 1|123.1|PA111|P123| 999| 2010| 2020|
| 2|234.1|PA222|P456| 777| 2011| 2012|
+---+-----+-----+----+----+--------+------+