删除重复的行并使用 pyspark 中的 groupby 合并 id

Drop the duplicated rows and merge the ids using groupby in pyspark

我有一个数据框,其中一些行重复 ids 但不同 timestamp 并且一些行重复 ids 但相同 timestamp 但具有以下之一以下 (yobgender) 列为空。现在我想使用 groupby 进行操作:

  1. 如果相同 id 有差异 timestamp,想要获取最近的时间戳。
  2. 如果相同的 ids 具有相同的 timestamp 但任何列具有空值(yobgender),那时候,想要合并两者 id 作为没有空值的单个记录。下面我粘贴了数据框和所需的输出。

输入数据

from pyspark.sql.functions import col, max as max_
df = sc.parallelize([
    ("e5882", "null", "M", "AD", "9/14/2021 13:50"),
    ("e5882", "null", "M", "AD", "10/22/2021 13:10"),
    ("5cddf", "null", "M", "ED", "9/9/2021 12:00"),
    ("5cddf", "2010", "null", "ED", "9/9/2021 12:00"),
    ("c3882", "null", "M", "BD", "11/27/2021 5:00"), 
    ("c3882", "1975", "null",  "BD", "11/27/2021 5:00"),
    ("9297d","1999", "null", "GF","10/18/2021 7:00"),
    ("9298e","1990","null","GF","10/18/2021 7:00")
]).toDF(["ID", "yob", "gender","country","timestamp"])

期望输出:

本题用到的代码,但没有得到准确的结果,ids部分缺失,

w = Window.partitionBy('Id')
# to obtain the recent date 
df1 = df.withColumn('maxB', F.max('timestamp').over(w)).where(F.col('timestamp') == F.col('maxB')).drop('maxB')
# to merge the null column based of id  
(df1.groupBy('Id').agg(*[F.first(x,ignorenulls=True) for x in df1.columns if x!='Id'])).show() 

使用这个输入数据框:

df = spark.createDataFrame([
    ("e5882", None, "M", "AD", "9/14/2021 13:50"),
    ("e5882", None, "M", "AD", "10/22/2021 13:10"),
    ("5cddf", None, "M", "ED", "9/9/2021 12:00"),
    ("5cddf", "2010", None, "ED", "9/9/2021 12:00"),
    ("c3882", None, "M", "BD", "11/27/2021 5:00"),
    ("c3882", "1975", None, "BD", "11/27/2021 5:00"),
    ("9297d", None, "M", "GF", "10/18/2021 7:00"),
    ("9297d", "1999", None, "GF", "10/18/2021 7:00"),
    ("9298e", "1990", None, "GF", "10/18/2021 7:00"),
], ["id", "yob", "gender", "country", "timestamp"])
  1. If the same id having difference timestamp, want to pickup the recent timestamp.

使用 window 排名函数获取每个 id 的最新行。当您想合并具有相同时间戳的那些时,您可以使用 dense_rank 而不是 row_number。但首先你需要将 timestamp 字符串转换为 TimestampType 否则比较将不正确(如 '9/9/2021 12:00' > '10/18/2021 7:00'

from pyspark.sql import Window
import pyspark.sql.functions as F

df_most_recent = df.withColumn(
   "timestamp",
   F.to_timestamp("timestamp", "M/d/yyyy H:mm")
).withColumn(
   "rn",
   F.dense_rank().over(Window.partitionBy("id").orderBy(F.desc("timestamp")))
).filter("rn = 1")
  1. If the same ids having same timestamp but the any of column having null(yob and gender), that time, want to merge the both id as single record without null. below I have pasted the data frame and desired output.

现在上面的 df_most_recent 包含一行或多行,每个 id 具有相同的最新时间戳,您可以按 id 分组以合并其他列的值,就像这样:

result = df_most_recent.groupBy("id").agg(
     *[F.collect_set(c)[0].alias(c) for c in df.columns if c!='id']
     # or *[F.first(c).alias(c) for c in df.columns if c!='id']
)

result.show()
#+-----+----+------+-------+-------------------+
#|id   |yob |gender|country|timestamp          |
#+-----+----+------+-------+-------------------+
#|5cddf|2010|M     |ED     |2021-09-09 12:00:00|
#|9297d|1999|M     |GF     |2021-10-18 07:00:00|
#|9298e|1990|null  |GF     |2021-10-18 07:00:00|
#|c3882|1975|M     |BD     |2021-11-27 05:00:00|
#|e5882|null|M     |AD     |2021-10-22 13:10:00|
#+-----+----+------+-------+-------------------+