删除重复的行并使用 pyspark 中的 groupby 合并 id
Drop the duplicated rows and merge the ids using groupby in pyspark
我有一个数据框,其中一些行重复 ids
但不同 timestamp
并且一些行重复 ids
但相同 timestamp
但具有以下之一以下 (yob
和 gender
) 列为空。现在我想使用 groupby 进行操作:
- 如果相同
id
有差异 timestamp
,想要获取最近的时间戳。
- 如果相同的
ids
具有相同的 timestamp
但任何列具有空值(yob
和 gender
),那时候,想要合并两者 id
作为没有空值的单个记录。下面我粘贴了数据框和所需的输出。
输入数据
from pyspark.sql.functions import col, max as max_
df = sc.parallelize([
("e5882", "null", "M", "AD", "9/14/2021 13:50"),
("e5882", "null", "M", "AD", "10/22/2021 13:10"),
("5cddf", "null", "M", "ED", "9/9/2021 12:00"),
("5cddf", "2010", "null", "ED", "9/9/2021 12:00"),
("c3882", "null", "M", "BD", "11/27/2021 5:00"),
("c3882", "1975", "null", "BD", "11/27/2021 5:00"),
("9297d","1999", "null", "GF","10/18/2021 7:00"),
("9298e","1990","null","GF","10/18/2021 7:00")
]).toDF(["ID", "yob", "gender","country","timestamp"])
期望输出:
本题用到的代码,但没有得到准确的结果,ids
部分缺失,
w = Window.partitionBy('Id')
# to obtain the recent date
df1 = df.withColumn('maxB', F.max('timestamp').over(w)).where(F.col('timestamp') == F.col('maxB')).drop('maxB')
# to merge the null column based of id
(df1.groupBy('Id').agg(*[F.first(x,ignorenulls=True) for x in df1.columns if x!='Id'])).show()
使用这个输入数据框:
df = spark.createDataFrame([
("e5882", None, "M", "AD", "9/14/2021 13:50"),
("e5882", None, "M", "AD", "10/22/2021 13:10"),
("5cddf", None, "M", "ED", "9/9/2021 12:00"),
("5cddf", "2010", None, "ED", "9/9/2021 12:00"),
("c3882", None, "M", "BD", "11/27/2021 5:00"),
("c3882", "1975", None, "BD", "11/27/2021 5:00"),
("9297d", None, "M", "GF", "10/18/2021 7:00"),
("9297d", "1999", None, "GF", "10/18/2021 7:00"),
("9298e", "1990", None, "GF", "10/18/2021 7:00"),
], ["id", "yob", "gender", "country", "timestamp"])
- If the same
id
having difference timestamp
, want to pickup the recent timestamp.
使用 window 排名函数获取每个 id
的最新行。当您想合并具有相同时间戳的那些时,您可以使用 dense_rank
而不是 row_number
。但首先你需要将 timestamp
字符串转换为 TimestampType 否则比较将不正确(如 '9/9/2021 12:00' > '10/18/2021 7:00'
)
from pyspark.sql import Window
import pyspark.sql.functions as F
df_most_recent = df.withColumn(
"timestamp",
F.to_timestamp("timestamp", "M/d/yyyy H:mm")
).withColumn(
"rn",
F.dense_rank().over(Window.partitionBy("id").orderBy(F.desc("timestamp")))
).filter("rn = 1")
- If the same ids having same
timestamp
but the any of column having null(yob
and gender
), that time, want to merge the both id
as single
record without null. below I have pasted the data frame and desired
output.
现在上面的 df_most_recent
包含一行或多行,每个 id
具有相同的最新时间戳,您可以按 id
分组以合并其他列的值,就像这样:
result = df_most_recent.groupBy("id").agg(
*[F.collect_set(c)[0].alias(c) for c in df.columns if c!='id']
# or *[F.first(c).alias(c) for c in df.columns if c!='id']
)
result.show()
#+-----+----+------+-------+-------------------+
#|id |yob |gender|country|timestamp |
#+-----+----+------+-------+-------------------+
#|5cddf|2010|M |ED |2021-09-09 12:00:00|
#|9297d|1999|M |GF |2021-10-18 07:00:00|
#|9298e|1990|null |GF |2021-10-18 07:00:00|
#|c3882|1975|M |BD |2021-11-27 05:00:00|
#|e5882|null|M |AD |2021-10-22 13:10:00|
#+-----+----+------+-------+-------------------+
我有一个数据框,其中一些行重复 ids
但不同 timestamp
并且一些行重复 ids
但相同 timestamp
但具有以下之一以下 (yob
和 gender
) 列为空。现在我想使用 groupby 进行操作:
- 如果相同
id
有差异timestamp
,想要获取最近的时间戳。 - 如果相同的
ids
具有相同的timestamp
但任何列具有空值(yob
和gender
),那时候,想要合并两者id
作为没有空值的单个记录。下面我粘贴了数据框和所需的输出。
输入数据
from pyspark.sql.functions import col, max as max_
df = sc.parallelize([
("e5882", "null", "M", "AD", "9/14/2021 13:50"),
("e5882", "null", "M", "AD", "10/22/2021 13:10"),
("5cddf", "null", "M", "ED", "9/9/2021 12:00"),
("5cddf", "2010", "null", "ED", "9/9/2021 12:00"),
("c3882", "null", "M", "BD", "11/27/2021 5:00"),
("c3882", "1975", "null", "BD", "11/27/2021 5:00"),
("9297d","1999", "null", "GF","10/18/2021 7:00"),
("9298e","1990","null","GF","10/18/2021 7:00")
]).toDF(["ID", "yob", "gender","country","timestamp"])
期望输出:
本题用到的代码,但没有得到准确的结果,ids
部分缺失,
w = Window.partitionBy('Id')
# to obtain the recent date
df1 = df.withColumn('maxB', F.max('timestamp').over(w)).where(F.col('timestamp') == F.col('maxB')).drop('maxB')
# to merge the null column based of id
(df1.groupBy('Id').agg(*[F.first(x,ignorenulls=True) for x in df1.columns if x!='Id'])).show()
使用这个输入数据框:
df = spark.createDataFrame([
("e5882", None, "M", "AD", "9/14/2021 13:50"),
("e5882", None, "M", "AD", "10/22/2021 13:10"),
("5cddf", None, "M", "ED", "9/9/2021 12:00"),
("5cddf", "2010", None, "ED", "9/9/2021 12:00"),
("c3882", None, "M", "BD", "11/27/2021 5:00"),
("c3882", "1975", None, "BD", "11/27/2021 5:00"),
("9297d", None, "M", "GF", "10/18/2021 7:00"),
("9297d", "1999", None, "GF", "10/18/2021 7:00"),
("9298e", "1990", None, "GF", "10/18/2021 7:00"),
], ["id", "yob", "gender", "country", "timestamp"])
- If the same
id
having differencetimestamp
, want to pickup the recent timestamp.
使用 window 排名函数获取每个 id
的最新行。当您想合并具有相同时间戳的那些时,您可以使用 dense_rank
而不是 row_number
。但首先你需要将 timestamp
字符串转换为 TimestampType 否则比较将不正确(如 '9/9/2021 12:00' > '10/18/2021 7:00'
)
from pyspark.sql import Window
import pyspark.sql.functions as F
df_most_recent = df.withColumn(
"timestamp",
F.to_timestamp("timestamp", "M/d/yyyy H:mm")
).withColumn(
"rn",
F.dense_rank().over(Window.partitionBy("id").orderBy(F.desc("timestamp")))
).filter("rn = 1")
- If the same ids having same
timestamp
but the any of column having null(yob
andgender
), that time, want to merge the bothid
as single record without null. below I have pasted the data frame and desired output.
现在上面的 df_most_recent
包含一行或多行,每个 id
具有相同的最新时间戳,您可以按 id
分组以合并其他列的值,就像这样:
result = df_most_recent.groupBy("id").agg(
*[F.collect_set(c)[0].alias(c) for c in df.columns if c!='id']
# or *[F.first(c).alias(c) for c in df.columns if c!='id']
)
result.show()
#+-----+----+------+-------+-------------------+
#|id |yob |gender|country|timestamp |
#+-----+----+------+-------+-------------------+
#|5cddf|2010|M |ED |2021-09-09 12:00:00|
#|9297d|1999|M |GF |2021-10-18 07:00:00|
#|9298e|1990|null |GF |2021-10-18 07:00:00|
#|c3882|1975|M |BD |2021-11-27 05:00:00|
#|e5882|null|M |AD |2021-10-22 13:10:00|
#+-----+----+------+-------+-------------------+