寻找 pyspark 的倒数 arrays_zip
Looking for an inverse of pyspark's arrays_zip
我有以下格式不正确的输入数据框:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").getOrCreate()
input_df = spark.createDataFrame(
[
('Alice;Bob;Carol',),
('12;13;14',),
('5;;7',),
('1;;3',),
(';;3',)
],
['data']
)
input_df.show()
# +---------------+
# | data|
# +---------------+
# |Alice;Bob;Carol|
# | 12;13;14|
# | 5;;7|
# | 1;;3|
# | ;;3|
# +---------------+
实际输入的是一个以分号分隔的 CSV 文件,其中一列包含一个人的值。每个人可以有不同数量的值。这里,Alice 有 3 个值,Bob 只有一个,Carol 有四个值。
我想在 PySpark 中将其转换为一个输出数据框,每个人都有一个数组,在此示例中,输出为:
result = spark.createDataFrame(
[
("Alice", [12, 5, 1]),
("Bob", [13,]),
("Carol", [14, 7, 3, 3])
],
['name', 'values']
)
result.show()
# +-----+-------------+
# | name| values|
# +-----+-------------+
# |Alice| [12, 5, 1]|
# | Bob| [13]|
# |Carol|[14, 7, 3, 3]|
# +-----+-------------+
我该怎么做?我想这将是 F.arrays_zip()
、F.split()
and/or F.explode()
的某种组合,但我想不通。
我目前被困在这里,这是我目前的尝试:
(input_df
.withColumn('splits', F.split(F.col('data'), ';'))
.drop('data')
).show()
# +-------------------+
# | splits|
# +-------------------+
# |[Alice, Bob, Carol]|
# | [12, 13, 14]|
# | [5, , 7]|
# | [1, , 3]|
# | [, , 3]|
# +-------------------+
Solution for Spark-2.4+:
使用groupBy
将所有行合并为一行使用collect_list
然后拆分创建新专栏。
- 使用
arrays_zip
压缩数组并创建嵌套数组[key,[values]]
- 最后
explode
嵌套数组。
Example:
df.show()
#+---------------+
#| data|
#+---------------+
#|Alice;Bob;Carol|
#| 12;13;14|
#| 5;;7|
#| 1;;3|
#| ;;3|
#+---------------+
from pyspark.sql.functions import *
df.agg(split(concat_ws("|",collect_list(col("data"))),"\|").alias("tmp")).\
withColumn("col1",split(element_at(col("tmp"),1),";")).\
withColumn("col2",split(element_at(col("tmp"),2),";")).\
withColumn("col3",split(element_at(col("tmp"),3),";")).\
withColumn("col4",split(element_at(col("tmp"),4),";")).\
withColumn("zip",arrays_zip(col("col1"),arrays_zip(col("col2"),col("col3"),col("col4")))).\
selectExpr("explode(zip)as tmp").\
selectExpr("tmp.*").\
toDF("name","values").\
show(10,False)
#+-----+----------+
#|name |values |
#+-----+----------+
#|Alice|[12, 5, 1]|
#|Bob |[13, , ] |
#|Carol|[14, 7, 3]|
#+-----+----------+
对于 spark < 2.4
使用 对于 arrays_zip 使用 getItem(<n>)
而不是 element_at
函数。
我建议将数据读取为 ;
分隔的 csv,然后处理以获得 name
和 values
列,如下所示-
Please note that this code is written in scala but similar code can be implemented in pyspark with minimal change
加载 ;
分隔的 csv
val data =
"""
|Alice;Bob;Carol
| 12;13;14
| 5;;7
| 1;;3
| ;;3
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\;").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
.toSeq.toDS()
val df = spark.read
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.printSchema()
df.show(false)
/**
* root
* |-- Alice: integer (nullable = true)
* |-- Bob: integer (nullable = true)
* |-- Carol: integer (nullable = true)
*
* +-----+----+-----+
* |Alice|Bob |Carol|
* +-----+----+-----+
* |12 |13 |14 |
* |5 |null|7 |
* |1 |null|3 |
* |null |null|3 |
* +-----+----+-----+
*/
导出 name
和 values
列
val columns = df.columns.map(c => expr(s"named_struct('name', '$c', 'values', collect_list($c))"))
df.select(array(columns: _*).as("array"))
.selectExpr("inline_outer(array)")
.show(false)
/**
* +-----+-------------+
* |name |values |
* +-----+-------------+
* |Alice|[12, 5, 1] |
* |Bob |[13] |
* |Carol|[14, 7, 3, 3]|
* +-----+-------------+
*/
一种方法是将第一行读作 header 然后逆透视数据
df1 = spark.createDataFrame([(12,13,14),(5,None,7),(1,None,3),(None,None,3)], ['Alice','Bob','Carol'])
df1.show()
+-----+----+-----+
|Alice| Bob|Carol|
+-----+----+-----+
| 12| 13| 14|
| 5|null| 7|
| 1|null| 3|
| null|null| 3|
+-----+----+-----+
df1.select(f.expr('''stack(3,'Alice',Alice,'Bob',Bob,'Carol',Carol) as (Name,Value)'''))\
.groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
+-----+-------------+
| Name| Value|
+-----+-------------+
|Alice| [12, 5, 1]|
| Bob| [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+
要动态传递列,请使用以下代码
cols = ','.join([f"'{i[0]}',{i[1]}" for i in zip(df1.columns,df1.columns)])
df1.select(f.expr(f'''stack(3,{cols}) as (Name,Value)''')).groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
+-----+-------------+
| Name| Value|
+-----+-------------+
|Alice| [12, 5, 1]|
| Bob| [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+
我有以下格式不正确的输入数据框:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").getOrCreate()
input_df = spark.createDataFrame(
[
('Alice;Bob;Carol',),
('12;13;14',),
('5;;7',),
('1;;3',),
(';;3',)
],
['data']
)
input_df.show()
# +---------------+
# | data|
# +---------------+
# |Alice;Bob;Carol|
# | 12;13;14|
# | 5;;7|
# | 1;;3|
# | ;;3|
# +---------------+
实际输入的是一个以分号分隔的 CSV 文件,其中一列包含一个人的值。每个人可以有不同数量的值。这里,Alice 有 3 个值,Bob 只有一个,Carol 有四个值。
我想在 PySpark 中将其转换为一个输出数据框,每个人都有一个数组,在此示例中,输出为:
result = spark.createDataFrame(
[
("Alice", [12, 5, 1]),
("Bob", [13,]),
("Carol", [14, 7, 3, 3])
],
['name', 'values']
)
result.show()
# +-----+-------------+
# | name| values|
# +-----+-------------+
# |Alice| [12, 5, 1]|
# | Bob| [13]|
# |Carol|[14, 7, 3, 3]|
# +-----+-------------+
我该怎么做?我想这将是 F.arrays_zip()
、F.split()
and/or F.explode()
的某种组合,但我想不通。
我目前被困在这里,这是我目前的尝试:
(input_df
.withColumn('splits', F.split(F.col('data'), ';'))
.drop('data')
).show()
# +-------------------+
# | splits|
# +-------------------+
# |[Alice, Bob, Carol]|
# | [12, 13, 14]|
# | [5, , 7]|
# | [1, , 3]|
# | [, , 3]|
# +-------------------+
Solution for Spark-2.4+:
使用groupBy
将所有行合并为一行使用collect_list
然后拆分创建新专栏。
- 使用
arrays_zip
压缩数组并创建嵌套数组[key,[values]]
- 最后
explode
嵌套数组。
Example:
df.show()
#+---------------+
#| data|
#+---------------+
#|Alice;Bob;Carol|
#| 12;13;14|
#| 5;;7|
#| 1;;3|
#| ;;3|
#+---------------+
from pyspark.sql.functions import *
df.agg(split(concat_ws("|",collect_list(col("data"))),"\|").alias("tmp")).\
withColumn("col1",split(element_at(col("tmp"),1),";")).\
withColumn("col2",split(element_at(col("tmp"),2),";")).\
withColumn("col3",split(element_at(col("tmp"),3),";")).\
withColumn("col4",split(element_at(col("tmp"),4),";")).\
withColumn("zip",arrays_zip(col("col1"),arrays_zip(col("col2"),col("col3"),col("col4")))).\
selectExpr("explode(zip)as tmp").\
selectExpr("tmp.*").\
toDF("name","values").\
show(10,False)
#+-----+----------+
#|name |values |
#+-----+----------+
#|Alice|[12, 5, 1]|
#|Bob |[13, , ] |
#|Carol|[14, 7, 3]|
#+-----+----------+
对于 spark < 2.4
使用 getItem(<n>)
而不是 element_at
函数。
我建议将数据读取为 ;
分隔的 csv,然后处理以获得 name
和 values
列,如下所示-
Please note that this code is written in scala but similar code can be implemented in pyspark with minimal change
加载 ;
分隔的 csv
val data =
"""
|Alice;Bob;Carol
| 12;13;14
| 5;;7
| 1;;3
| ;;3
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\;").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
.toSeq.toDS()
val df = spark.read
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.printSchema()
df.show(false)
/**
* root
* |-- Alice: integer (nullable = true)
* |-- Bob: integer (nullable = true)
* |-- Carol: integer (nullable = true)
*
* +-----+----+-----+
* |Alice|Bob |Carol|
* +-----+----+-----+
* |12 |13 |14 |
* |5 |null|7 |
* |1 |null|3 |
* |null |null|3 |
* +-----+----+-----+
*/
导出 name
和 values
列
val columns = df.columns.map(c => expr(s"named_struct('name', '$c', 'values', collect_list($c))"))
df.select(array(columns: _*).as("array"))
.selectExpr("inline_outer(array)")
.show(false)
/**
* +-----+-------------+
* |name |values |
* +-----+-------------+
* |Alice|[12, 5, 1] |
* |Bob |[13] |
* |Carol|[14, 7, 3, 3]|
* +-----+-------------+
*/
一种方法是将第一行读作 header 然后逆透视数据
df1 = spark.createDataFrame([(12,13,14),(5,None,7),(1,None,3),(None,None,3)], ['Alice','Bob','Carol'])
df1.show()
+-----+----+-----+
|Alice| Bob|Carol|
+-----+----+-----+
| 12| 13| 14|
| 5|null| 7|
| 1|null| 3|
| null|null| 3|
+-----+----+-----+
df1.select(f.expr('''stack(3,'Alice',Alice,'Bob',Bob,'Carol',Carol) as (Name,Value)'''))\
.groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
+-----+-------------+
| Name| Value|
+-----+-------------+
|Alice| [12, 5, 1]|
| Bob| [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+
要动态传递列,请使用以下代码
cols = ','.join([f"'{i[0]}',{i[1]}" for i in zip(df1.columns,df1.columns)])
df1.select(f.expr(f'''stack(3,{cols}) as (Name,Value)''')).groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
+-----+-------------+
| Name| Value|
+-----+-------------+
|Alice| [12, 5, 1]|
| Bob| [13]|
|Carol|[14, 7, 3, 3]|
+-----+-------------+