过滤 DataFrame 以删除 pyspark 中的重复值
Filter DataFrame to delete duplicate values in pyspark
我有以下数据框
date | value | ID
--------------------------------------
2021-12-06 15:00:00 25 1
2021-12-06 15:15:00 35 1
2021-11-30 00:00:00 20 2
2021-11-25 00:00:00 10 2
我想加入这个 DF 和另一个这样的 DF:
idUser | Name | Gender
-------------------
1 John M
2 Anne F
我的预期输出是:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
我需要的是:仅获取第一个数据帧的最新值,并将该值与我的第二个数据帧连接起来。虽然,我的 spark 脚本加入了两个值:
我的代码:
df = df1.select(
col("date"),
col("value"),
col("ID"),
).OrderBy(
col("ID").asc(),
col("date").desc(),
).groupBy(
col("ID"), col("date").cast(StringType()).substr(0,10).alias("date")
).agg (
max(col("value")).alias("value")
)
final_df = df2.join(
df,
(col("idUser") == col("ID")),
how="left"
)
当我执行此连接时(格式化列在此 post 中被抽象化)我有以下输出:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
2 Anne F 10
我使用 substr
删除小时和分钟以仅按日期过滤。但是当我在不同的日子里有相同的 ID 时,我的输出 df 有 2 个值而不是最近的值。我该如何解决这个问题?
注意:我只使用 pyspark 函数来执行此操作(我现在想使用 spark.sql(...)
)。
您可以在 pysaprk
中使用 window
和 row_number
函数
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("ID").orderBy("date").desc()
df1_latest_val = df1.withColumn("row_number", row_number().over(windowSpec)).filter(
f.col("row_number") == 1
)
table df1_latest_val
的输出看起来像这样
date | value | ID | row_number |
-----------------------------------------------------
2021-12-06 15:15:00 35 1 1
2021-11-30 00:00:00 20 2 1
现在您将拥有带有最新 val 的 df,您可以直接将其与另一个 table。
我有以下数据框
date | value | ID
--------------------------------------
2021-12-06 15:00:00 25 1
2021-12-06 15:15:00 35 1
2021-11-30 00:00:00 20 2
2021-11-25 00:00:00 10 2
我想加入这个 DF 和另一个这样的 DF:
idUser | Name | Gender
-------------------
1 John M
2 Anne F
我的预期输出是:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
我需要的是:仅获取第一个数据帧的最新值,并将该值与我的第二个数据帧连接起来。虽然,我的 spark 脚本加入了两个值:
我的代码:
df = df1.select(
col("date"),
col("value"),
col("ID"),
).OrderBy(
col("ID").asc(),
col("date").desc(),
).groupBy(
col("ID"), col("date").cast(StringType()).substr(0,10).alias("date")
).agg (
max(col("value")).alias("value")
)
final_df = df2.join(
df,
(col("idUser") == col("ID")),
how="left"
)
当我执行此连接时(格式化列在此 post 中被抽象化)我有以下输出:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
2 Anne F 10
我使用 substr
删除小时和分钟以仅按日期过滤。但是当我在不同的日子里有相同的 ID 时,我的输出 df 有 2 个值而不是最近的值。我该如何解决这个问题?
注意:我只使用 pyspark 函数来执行此操作(我现在想使用 spark.sql(...)
)。
您可以在 pysaprk
中使用window
和 row_number
函数
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("ID").orderBy("date").desc()
df1_latest_val = df1.withColumn("row_number", row_number().over(windowSpec)).filter(
f.col("row_number") == 1
)
table df1_latest_val
的输出看起来像这样
date | value | ID | row_number |
-----------------------------------------------------
2021-12-06 15:15:00 35 1 1
2021-11-30 00:00:00 20 2 1
现在您将拥有带有最新 val 的 df,您可以直接将其与另一个 table。