使用 PySpark 如何根据 groupby/window/partition 填充列中的值并执行 UDF?
With PySpark how do I populate values in a column based on either groupby/window/partition and perform a UDF?
我正在尝试填充列中的缺失值。 group/partition 中第一行或以下任何行(根据日期按顺序排列)中的配置文件列将具有必须填充到配置文件列的以下单元格中的值。
我曾尝试使用 window 函数对其进行 运行,但无法将 UDF 应用于 window 函数。
valuesA = [('1',"", "20190108"),('1',"", "20190107"),('1',"abcd", "20190106"),('1',"", "20190105"),('1',"", "20190104"),('2',"wxyz", "20190103"),('2',"", "20190102"),('2',"", "20190101")]
TableA = spark.createDataFrame(valuesA,['vid','profile', 'date'])
valuesB = [('1',"null", "20190108"),('1',"null", "20190107"),('1',"abcd", "20190106"),('1',"abcd", "20190105"),('1',"abcd", "20190104"),('2',"wxyz", "20190103"),('2', "wxyz", "20190102"),('2', "wxyz", "20190101")]
TableB = spark.createDataFrame(valuesB,['vid','profile', 'date'])
TableA.show()
TableB.show()
Table A: This is what I have.
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| |20190108|
| 1| |20190107|
| 1| abcd|20190106|
| 1| |20190105|
| 1| |20190104|
| 2| wxyz|20190103|
| 2| |20190102|
| 2| |20190101|
+---+-------+--------+
Table B: What I am expecting.
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| null|20190108|
| 1| null|20190107|
| 1| abcd|20190106|
| 1| abcd|20190105|
| 1| abcd|20190104|
| 2| wxyz|20190103|
| 2| wxyz|20190102|
| 2| wxyz|20190101|
+---+-------+--------+
您可以使用last
window 功能。
注意 - 首先 withColumn
是用空值替换所有空字符串 - last
函数默认跳过空值,在本例中这就是我们想要的。
from pyspark.sql.window import Window
from pyspark.sql.functions import *
TableB = TableA.withColumn('profile', when(length('profile') == 0, lit(None)).otherwise(col('profile')))\
.withColumn("profile", last(col('profile'), True).over(Window.partitionBy('vid').orderBy(col('date').desc())))
TableB.show()
输出:
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| null|20190108|
| 1| null|20190107|
| 1| abcd|20190106|
| 1| abcd|20190105|
| 1| abcd|20190104|
| 2| wxyz|20190103|
| 2| wxyz|20190102|
| 2| wxyz|20190101|
+---+-------+--------+
我正在尝试填充列中的缺失值。 group/partition 中第一行或以下任何行(根据日期按顺序排列)中的配置文件列将具有必须填充到配置文件列的以下单元格中的值。
我曾尝试使用 window 函数对其进行 运行,但无法将 UDF 应用于 window 函数。
valuesA = [('1',"", "20190108"),('1',"", "20190107"),('1',"abcd", "20190106"),('1',"", "20190105"),('1',"", "20190104"),('2',"wxyz", "20190103"),('2',"", "20190102"),('2',"", "20190101")]
TableA = spark.createDataFrame(valuesA,['vid','profile', 'date'])
valuesB = [('1',"null", "20190108"),('1',"null", "20190107"),('1',"abcd", "20190106"),('1',"abcd", "20190105"),('1',"abcd", "20190104"),('2',"wxyz", "20190103"),('2', "wxyz", "20190102"),('2', "wxyz", "20190101")]
TableB = spark.createDataFrame(valuesB,['vid','profile', 'date'])
TableA.show()
TableB.show()
Table A: This is what I have.
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| |20190108|
| 1| |20190107|
| 1| abcd|20190106|
| 1| |20190105|
| 1| |20190104|
| 2| wxyz|20190103|
| 2| |20190102|
| 2| |20190101|
+---+-------+--------+
Table B: What I am expecting.
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| null|20190108|
| 1| null|20190107|
| 1| abcd|20190106|
| 1| abcd|20190105|
| 1| abcd|20190104|
| 2| wxyz|20190103|
| 2| wxyz|20190102|
| 2| wxyz|20190101|
+---+-------+--------+
您可以使用last
window 功能。
注意 - 首先 withColumn
是用空值替换所有空字符串 - last
函数默认跳过空值,在本例中这就是我们想要的。
from pyspark.sql.window import Window
from pyspark.sql.functions import *
TableB = TableA.withColumn('profile', when(length('profile') == 0, lit(None)).otherwise(col('profile')))\
.withColumn("profile", last(col('profile'), True).over(Window.partitionBy('vid').orderBy(col('date').desc())))
TableB.show()
输出:
+---+-------+--------+
|vid|profile| date|
+---+-------+--------+
| 1| null|20190108|
| 1| null|20190107|
| 1| abcd|20190106|
| 1| abcd|20190105|
| 1| abcd|20190104|
| 2| wxyz|20190103|
| 2| wxyz|20190102|
| 2| wxyz|20190101|
+---+-------+--------+