pyspark:如何将字符串类型的列分解为 spark 数据框的行和列
pyspark : How to explode a column of string type into rows and columns of a spark data frame
我正在使用 spark 2.3。我有一个 spark 数据框,格式如下
| person_id | person_attributes
____________________________________________________________________________
| id_1 "department=Sales__title=Sales_executive__level=junior"
| id_2 "department=Engineering__title=Software Engineer__level=entry-level"
等等。
person_attributes 列的类型为 string
如何在没有 level
attribute_key
的情况下分解此框架以获得如下类型的数据框架
| person_id | attribute_key| attribute_value
____________________________________________________________________________
| id_1 department Sales
| id_1 title Sales_executive
| id_2 department Engineering
| id_2 title Software Engineer
这是一个大型分布式数据框架,因此,转换为 pandas 或缓存不是一个选项
试试这个,
import org.apache.spark.sql.functions._
df
.withColumn("attributes_splitted", split(col("person_attributes"), "__")) // Split by delimiter `__`
.withColumn("exploded", explode(col("attributes_splitted"))) // explode the splitted column
.withColumn("temp", split(col("exploded"), "=")) // again split based on delimiter `=`
.withColumn("attribute_key", col("temp").getItem(0))
.withColumn("attribute_value", col("temp").getItem(1))
.drop("attributes_splitted", "exploded", "temp", "person_attributes")
.show(false)
Spark2.3
试试这个:
from pyspark.sql import functions as F
df.withColumn("arr", F.split("person_attributes",'\=|__'))\
.withColumn("map", F.create_map(F.lit('department'),F.col("arr")[1]\
,F.lit('title'),F.col("arr")[3]))\
.select("person_id", F.explode("map").alias("attribute_key","attribute_value"))\
.show(truncate=False)
#+---------+-------------+-----------------+
#|person_id|attribute_key|attribute_value |
#+---------+-------------+-----------------+
#|id_1 |department |Sales |
#|id_1 |title |Sales_executive |
#|id_2 |department |Engineering |
#|id_2 |title |Software Engineer|
#+---------+-------------+-----------------+
Spark2.4+
试试这个
from pyspark.sql import functions as F
df.withColumn("arr", F.split("person_attributes",'\=|__'))\
.withColumn("map", F.map_from_arrays(F.expr("""filter(arr,(x,i)->i%2=0)""")\
,F.expr("""filter(arr,(x,i)->i%2!=0)""")))\
.select("person_id", F.explode("map").alias("attribute_key","attribute_value")).filter("""attribute_key!='level'""")\
.show(truncate=False)
#+---------+-------------+-----------------+
#|person_id|attribute_key|attribute_value |
#+---------+-------------+-----------------+
#|id_1 |department |Sales |
#|id_1 |title |Sales_executive |
#|id_2 |department |Engineering |
#|id_2 |title |Software Engineer|
#+---------+-------------+-----------------+
我正在使用 spark 2.3。我有一个 spark 数据框,格式如下
| person_id | person_attributes
____________________________________________________________________________
| id_1 "department=Sales__title=Sales_executive__level=junior"
| id_2 "department=Engineering__title=Software Engineer__level=entry-level"
等等。
person_attributes 列的类型为 string
如何在没有 level
attribute_key
| person_id | attribute_key| attribute_value
____________________________________________________________________________
| id_1 department Sales
| id_1 title Sales_executive
| id_2 department Engineering
| id_2 title Software Engineer
这是一个大型分布式数据框架,因此,转换为 pandas 或缓存不是一个选项
试试这个,
import org.apache.spark.sql.functions._
df
.withColumn("attributes_splitted", split(col("person_attributes"), "__")) // Split by delimiter `__`
.withColumn("exploded", explode(col("attributes_splitted"))) // explode the splitted column
.withColumn("temp", split(col("exploded"), "=")) // again split based on delimiter `=`
.withColumn("attribute_key", col("temp").getItem(0))
.withColumn("attribute_value", col("temp").getItem(1))
.drop("attributes_splitted", "exploded", "temp", "person_attributes")
.show(false)
Spark2.3
试试这个:
from pyspark.sql import functions as F
df.withColumn("arr", F.split("person_attributes",'\=|__'))\
.withColumn("map", F.create_map(F.lit('department'),F.col("arr")[1]\
,F.lit('title'),F.col("arr")[3]))\
.select("person_id", F.explode("map").alias("attribute_key","attribute_value"))\
.show(truncate=False)
#+---------+-------------+-----------------+
#|person_id|attribute_key|attribute_value |
#+---------+-------------+-----------------+
#|id_1 |department |Sales |
#|id_1 |title |Sales_executive |
#|id_2 |department |Engineering |
#|id_2 |title |Software Engineer|
#+---------+-------------+-----------------+
Spark2.4+
from pyspark.sql import functions as F
df.withColumn("arr", F.split("person_attributes",'\=|__'))\
.withColumn("map", F.map_from_arrays(F.expr("""filter(arr,(x,i)->i%2=0)""")\
,F.expr("""filter(arr,(x,i)->i%2!=0)""")))\
.select("person_id", F.explode("map").alias("attribute_key","attribute_value")).filter("""attribute_key!='level'""")\
.show(truncate=False)
#+---------+-------------+-----------------+
#|person_id|attribute_key|attribute_value |
#+---------+-------------+-----------------+
#|id_1 |department |Sales |
#|id_1 |title |Sales_executive |
#|id_2 |department |Engineering |
#|id_2 |title |Software Engineer|
#+---------+-------------+-----------------+