在 PySpark 中标记重复项
Labelling duplicates in PySpark
我正在尝试根据他们的组在我的 PySpark DataFrame 中标记重复项,同时拥有完整长度的数据框。下面是一个示例代码。
data= [
("A", "2018-01-03"),
("A", "2018-01-03"),
("A", "2018-01-03"),
("B", "2019-01-03"),
("B", "2019-01-03"),
("B", "2019-01-03"),
("C", "2020-01-03"),
("C", "2020-01-03"),
("C", "2020-01-03"),
]
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark= SparkSession.builder.getOrCreate()
df= spark.createDataFrame(data=data, schema=["Group", "Date"])
df= df.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))
from pyspark.sql import Window
windowSpec= Window.partitionBy("Group").orderBy(F.asc("Date"))
df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()
这是我当前的输出,虽然它是正确的,因为代码根据其组对“日期”进行排名,但这不是我想要的结果。
+-----+----------+------------+
|Group| Date|group_number|
+-----+----------+------------+
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| B|2019-01-03| 1|
| B|2019-01-03| 1|
| B|2019-01-03| 1|
| C|2020-01-03| 1|
| C|2020-01-03| 1|
| C|2020-01-03| 1|
+-----+----------+------------+
我希望我的输出看起来像这样
+-----+----------+------------+
|Group| Date|group_number|
+-----+----------+------------+
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| B|2019-01-03| 2|
| B|2019-01-03| 2|
| B|2019-01-03| 2|
| C|2020-01-03| 3|
| C|2020-01-03| 3|
| C|2020-01-03| 3|
+-----+----------+------------+
有什么建议吗?我找到了 this post 但这只是一个二进制解决方案!我的数据集中有 2 个以上的组。
声明 windowSpec 时不需要使用 partitionBy 函数。通过在 partionBy 中指定“组”列,您告诉程序根据“日期”为每个分区执行 dense_rank()。所以输出是正确的。如果我们看 A 组,他们有相同的日期,因此他们的 group_rank 都是 1。继续看 B 组,他们都有相同的日期,因此他们的组排名为 1。
因此,快速解决问题的方法是删除 windowSpec 中的 partionBy。
编辑:如果您要按组列进行分组,以下是另一种解决方案:您可以使用用户定义函数 (UDF) 作为 df.withColumn() 中的第二个自变量参数。在此 UDF 中,您将像普通函数一样指定 input/output。像这样:
import pyspark.sql.functions import udf
def new_column(group):
return ord(group) - 64 # Unicode integer equivalent as A is 65
funct = udf(new_column, IntegerType())
df.withColumn("group_number", funct(df["Group"])).orderBy("Date").show()
如果要为日期使用 UDF,则需要一些方法来跟踪日期。一个例子:
import datetime
date_dict = {}
def new_column(date_obj):
if len(date_dict) > 0 and date_dict[date_obj.strftime("%Y-%m-%d")]:
return date_dict[date_obj.strftime("%Y-%m-%d")]
date_dict[date_obj.strftime("%Y-%m-%d")] = len(date_obj.strftime("%Y-%m-%d")) + 1
return date_dict[date_obj.strftime("%Y-%m-%d")]
你想要的是对不在每个组中的所有组进行排名,这样你就不需要按 Window
进行分区,只需按 Group
和 Date
排序即可给你想要的输出:
windowSpec = Window.orderBy(F.asc("Group"), F.asc("Date"))
df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()
#+-----+----------+------------+
#|Group| Date|group_number|
#+-----+----------+------------+
#| A|2018-01-03| 1|
#| A|2018-01-03| 1|
#| A|2018-01-03| 1|
#| B|2019-01-03| 2|
#| B|2019-01-03| 2|
#| B|2019-01-03| 2|
#| C|2020-01-03| 3|
#| C|2020-01-03| 3|
#| C|2020-01-03| 3|
#+-----+----------+------------+
你肯定不需要任何 UDF 正如其他答案所建议的那样。
我正在尝试根据他们的组在我的 PySpark DataFrame 中标记重复项,同时拥有完整长度的数据框。下面是一个示例代码。
data= [
("A", "2018-01-03"),
("A", "2018-01-03"),
("A", "2018-01-03"),
("B", "2019-01-03"),
("B", "2019-01-03"),
("B", "2019-01-03"),
("C", "2020-01-03"),
("C", "2020-01-03"),
("C", "2020-01-03"),
]
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark= SparkSession.builder.getOrCreate()
df= spark.createDataFrame(data=data, schema=["Group", "Date"])
df= df.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))
from pyspark.sql import Window
windowSpec= Window.partitionBy("Group").orderBy(F.asc("Date"))
df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()
这是我当前的输出,虽然它是正确的,因为代码根据其组对“日期”进行排名,但这不是我想要的结果。
+-----+----------+------------+
|Group| Date|group_number|
+-----+----------+------------+
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| B|2019-01-03| 1|
| B|2019-01-03| 1|
| B|2019-01-03| 1|
| C|2020-01-03| 1|
| C|2020-01-03| 1|
| C|2020-01-03| 1|
+-----+----------+------------+
我希望我的输出看起来像这样
+-----+----------+------------+
|Group| Date|group_number|
+-----+----------+------------+
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| A|2018-01-03| 1|
| B|2019-01-03| 2|
| B|2019-01-03| 2|
| B|2019-01-03| 2|
| C|2020-01-03| 3|
| C|2020-01-03| 3|
| C|2020-01-03| 3|
+-----+----------+------------+
有什么建议吗?我找到了 this post 但这只是一个二进制解决方案!我的数据集中有 2 个以上的组。
声明 windowSpec 时不需要使用 partitionBy 函数。通过在 partionBy 中指定“组”列,您告诉程序根据“日期”为每个分区执行 dense_rank()。所以输出是正确的。如果我们看 A 组,他们有相同的日期,因此他们的 group_rank 都是 1。继续看 B 组,他们都有相同的日期,因此他们的组排名为 1。
因此,快速解决问题的方法是删除 windowSpec 中的 partionBy。
编辑:如果您要按组列进行分组,以下是另一种解决方案:您可以使用用户定义函数 (UDF) 作为 df.withColumn() 中的第二个自变量参数。在此 UDF 中,您将像普通函数一样指定 input/output。像这样:
import pyspark.sql.functions import udf
def new_column(group):
return ord(group) - 64 # Unicode integer equivalent as A is 65
funct = udf(new_column, IntegerType())
df.withColumn("group_number", funct(df["Group"])).orderBy("Date").show()
如果要为日期使用 UDF,则需要一些方法来跟踪日期。一个例子:
import datetime
date_dict = {}
def new_column(date_obj):
if len(date_dict) > 0 and date_dict[date_obj.strftime("%Y-%m-%d")]:
return date_dict[date_obj.strftime("%Y-%m-%d")]
date_dict[date_obj.strftime("%Y-%m-%d")] = len(date_obj.strftime("%Y-%m-%d")) + 1
return date_dict[date_obj.strftime("%Y-%m-%d")]
你想要的是对不在每个组中的所有组进行排名,这样你就不需要按 Window
进行分区,只需按 Group
和 Date
排序即可给你想要的输出:
windowSpec = Window.orderBy(F.asc("Group"), F.asc("Date"))
df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()
#+-----+----------+------------+
#|Group| Date|group_number|
#+-----+----------+------------+
#| A|2018-01-03| 1|
#| A|2018-01-03| 1|
#| A|2018-01-03| 1|
#| B|2019-01-03| 2|
#| B|2019-01-03| 2|
#| B|2019-01-03| 2|
#| C|2020-01-03| 3|
#| C|2020-01-03| 3|
#| C|2020-01-03| 3|
#+-----+----------+------------+
你肯定不需要任何 UDF 正如其他答案所建议的那样。