在 PySpark 中标记重复项

Labelling duplicates in PySpark

我正在尝试根据他们的组在我的 PySpark DataFrame 中标记重复项,同时拥有完整长度的数据框。下面是一个示例代码。

data= [
    ("A", "2018-01-03"),
    ("A", "2018-01-03"),
    ("A", "2018-01-03"),
    ("B", "2019-01-03"),
    ("B", "2019-01-03"),
    ("B", "2019-01-03"),
    ("C", "2020-01-03"),
    ("C", "2020-01-03"),
    ("C", "2020-01-03"),
]

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark= SparkSession.builder.getOrCreate()

df= spark.createDataFrame(data=data, schema=["Group", "Date"])
df= df.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))


from pyspark.sql import Window
windowSpec= Window.partitionBy("Group").orderBy(F.asc("Date"))

df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()

这是我当前的输出,虽然它是正确的,因为代码根据其组对“日期”进行排名,但这不是我想要的结果。

+-----+----------+------------+
|Group|      Date|group_number|
+-----+----------+------------+
|    A|2018-01-03|           1|
|    A|2018-01-03|           1|
|    A|2018-01-03|           1|
|    B|2019-01-03|           1|
|    B|2019-01-03|           1|
|    B|2019-01-03|           1|
|    C|2020-01-03|           1|
|    C|2020-01-03|           1|
|    C|2020-01-03|           1|
+-----+----------+------------+

我希望我的输出看起来像这样

+-----+----------+------------+
|Group|      Date|group_number|
+-----+----------+------------+
|    A|2018-01-03|           1|
|    A|2018-01-03|           1|
|    A|2018-01-03|           1|
|    B|2019-01-03|           2|
|    B|2019-01-03|           2|
|    B|2019-01-03|           2|
|    C|2020-01-03|           3|
|    C|2020-01-03|           3|
|    C|2020-01-03|           3|
+-----+----------+------------+

有什么建议吗?我找到了 this post 但这只是一个二进制解决方案!我的数据集中有 2 个以上的组。

声明 windowSpec 时不需要使用 partitionBy 函数。通过在 partionBy 中指定“组”列,您告诉程序根据“日期”为每个分区执行 dense_rank()。所以输出是正确的。如果我们看 A 组,他们有相同的日期,因此他们的 group_rank 都是 1。继续看 B 组,他们都有相同的日期,因此他们的组排名为 1。

因此,快速解决问题的方法是删除 windowSpec 中的 partionBy。

编辑:如果您要按组列进行分组,以下是另一种解决方案:您可以使用用户定义函数 (UDF) 作为 df.withColumn() 中的第二个自变量参数。在此 UDF 中,您将像普通函数一样指定 input/output。像这样:

import pyspark.sql.functions import udf

def new_column(group):
  return ord(group) - 64 # Unicode integer equivalent as A is 65

funct = udf(new_column, IntegerType())

df.withColumn("group_number", funct(df["Group"])).orderBy("Date").show()

如果要为日期使用 UDF,则需要一些方法来跟踪日期。一个例子:

import datetime

date_dict = {}
def new_column(date_obj):
   if len(date_dict) > 0 and date_dict[date_obj.strftime("%Y-%m-%d")]:
     return date_dict[date_obj.strftime("%Y-%m-%d")]
   date_dict[date_obj.strftime("%Y-%m-%d")] = len(date_obj.strftime("%Y-%m-%d")) + 1
   return date_dict[date_obj.strftime("%Y-%m-%d")]

你想要的是对不在每个组中的所有组进行排名,这样你就不需要按 Window 进行分区,只需按 GroupDate 排序即可给你想要的输出:

windowSpec = Window.orderBy(F.asc("Group"), F.asc("Date"))

df.withColumn("group_number", F.dense_rank().over(windowSpec)).orderBy("Date").show()

#+-----+----------+------------+
#|Group|      Date|group_number|
#+-----+----------+------------+
#|    A|2018-01-03|           1|
#|    A|2018-01-03|           1|
#|    A|2018-01-03|           1|
#|    B|2019-01-03|           2|
#|    B|2019-01-03|           2|
#|    B|2019-01-03|           2|
#|    C|2020-01-03|           3|
#|    C|2020-01-03|           3|
#|    C|2020-01-03|           3|
#+-----+----------+------------+

你肯定不需要任何 UDF 正如其他答案所建议的那样。