如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null
How to drop duplicates from PySpark Dataframe and change the remaining column value to null
我是 Pyspark 的新手。
我有一个 Pyspark 数据框,我想根据 id 和 timestamp 列删除重复项。然后我想将重复 id 的读取值替换为 null。我不想使用 Pandas。请看下面:
数据框:
id reading timestamp
1 13015 2018-03-22 08:00:00.000
1 14550 2018-03-22 09:00:00.000
1 14570 2018-03-22 09:00:00.000
2 15700 2018-03-22 08:00:00.000
2 16700 2018-03-22 09:00:00.000
2 18000 2018-03-22 10:00:00.000
期望的输出:
id reading timestamp
1 13015 2018-03-22 08:00:00.000
1 Null 2018-03-22 09:00:00.000
2 15700 2018-03-22 08:00:00.000
2 16700 2018-03-22 09:00:00.000
2 18000 2018-03-22 10:00:00.000
我需要如何添加到此代码中:
df.dropDuplicates(['id','timestamp'])
如有任何帮助,我们将不胜感激。非常感谢
在 Scala 上可以通过分组来完成,并将 "reading" 值替换为空值,其中计数大于一:
val df = Seq(
(1, 13015, "2018-03-22 08:00:00.000"),
(1, 14550, "2018-03-22 09:00:00.000"),
(1, 14570, "2018-03-22 09:00:00.000"),
(2, 15700, "2018-03-22 08:00:00.000"),
(2, 16700, "2018-03-22 09:00:00.000"),
(2, 18000, "2018-03-22 10:00:00.000")
).toDF("id", "reading", "timestamp")
// action
df
.groupBy("id", "timestamp")
.agg(
min("reading").alias("reading"),
count("reading").alias("readingCount")
)
.withColumn("reading", when($"readingCount" > 1, null).otherwise($"reading"))
.drop("readingCount")
输出为:
+---+-----------------------+-------+
|id |timestamp |reading|
+---+-----------------------+-------+
|2 |2018-03-22 09:00:00.000|16700 |
|1 |2018-03-22 08:00:00.000|13015 |
|1 |2018-03-22 09:00:00.000|null |
|2 |2018-03-22 10:00:00.000|18000 |
|2 |2018-03-22 08:00:00.000|15700 |
+---+-----------------------+-------+
猜猜,可以轻松转换为 Python。
使用 Window 函数计算分区 id, timestamp
上的重复项,然后根据计数更新 reading
的一种方法:
from pyspark.sql import Window
w = Window.partitionBy("id", "timestamp").orderBy("timestamp")
df.select(col("id"),
when(count("*").over(w) > lit(1), lit(None)).otherwise(col("reading")).alias("reading"),
col("timestamp")
) \
.dropDuplicates(["id", "reading", "timestamp"]).show(truncate=False)
或使用分组依据:
df.groupBy("id", "timestamp").agg(first("reading").alias("reading"), count("*").alias("cn")) \
.withColumn("reading", when(col("cn") > lit(1), lit(None)).otherwise(col("reading"))) \
.select(*df.columns) \
.show(truncate=False)
给出:
+---+-------+-----------------------+
|id |reading|timestamp |
+---+-------+-----------------------+
|1 |null |2018-03-22 09:00:00.000|
|1 |13015 |2018-03-22 08:00:00.000|
|2 |18000 |2018-03-22 10:00:00.000|
|2 |15700 |2018-03-22 08:00:00.000|
|2 |16700 |2018-03-22 09:00:00.000|
+---+-------+-----------------------+
我是 Pyspark 的新手。 我有一个 Pyspark 数据框,我想根据 id 和 timestamp 列删除重复项。然后我想将重复 id 的读取值替换为 null。我不想使用 Pandas。请看下面:
数据框:
id reading timestamp
1 13015 2018-03-22 08:00:00.000
1 14550 2018-03-22 09:00:00.000
1 14570 2018-03-22 09:00:00.000
2 15700 2018-03-22 08:00:00.000
2 16700 2018-03-22 09:00:00.000
2 18000 2018-03-22 10:00:00.000
期望的输出:
id reading timestamp
1 13015 2018-03-22 08:00:00.000
1 Null 2018-03-22 09:00:00.000
2 15700 2018-03-22 08:00:00.000
2 16700 2018-03-22 09:00:00.000
2 18000 2018-03-22 10:00:00.000
我需要如何添加到此代码中:
df.dropDuplicates(['id','timestamp'])
如有任何帮助,我们将不胜感激。非常感谢
在 Scala 上可以通过分组来完成,并将 "reading" 值替换为空值,其中计数大于一:
val df = Seq(
(1, 13015, "2018-03-22 08:00:00.000"),
(1, 14550, "2018-03-22 09:00:00.000"),
(1, 14570, "2018-03-22 09:00:00.000"),
(2, 15700, "2018-03-22 08:00:00.000"),
(2, 16700, "2018-03-22 09:00:00.000"),
(2, 18000, "2018-03-22 10:00:00.000")
).toDF("id", "reading", "timestamp")
// action
df
.groupBy("id", "timestamp")
.agg(
min("reading").alias("reading"),
count("reading").alias("readingCount")
)
.withColumn("reading", when($"readingCount" > 1, null).otherwise($"reading"))
.drop("readingCount")
输出为:
+---+-----------------------+-------+
|id |timestamp |reading|
+---+-----------------------+-------+
|2 |2018-03-22 09:00:00.000|16700 |
|1 |2018-03-22 08:00:00.000|13015 |
|1 |2018-03-22 09:00:00.000|null |
|2 |2018-03-22 10:00:00.000|18000 |
|2 |2018-03-22 08:00:00.000|15700 |
+---+-----------------------+-------+
猜猜,可以轻松转换为 Python。
使用 Window 函数计算分区 id, timestamp
上的重复项,然后根据计数更新 reading
的一种方法:
from pyspark.sql import Window
w = Window.partitionBy("id", "timestamp").orderBy("timestamp")
df.select(col("id"),
when(count("*").over(w) > lit(1), lit(None)).otherwise(col("reading")).alias("reading"),
col("timestamp")
) \
.dropDuplicates(["id", "reading", "timestamp"]).show(truncate=False)
或使用分组依据:
df.groupBy("id", "timestamp").agg(first("reading").alias("reading"), count("*").alias("cn")) \
.withColumn("reading", when(col("cn") > lit(1), lit(None)).otherwise(col("reading"))) \
.select(*df.columns) \
.show(truncate=False)
给出:
+---+-------+-----------------------+
|id |reading|timestamp |
+---+-------+-----------------------+
|1 |null |2018-03-22 09:00:00.000|
|1 |13015 |2018-03-22 08:00:00.000|
|2 |18000 |2018-03-22 10:00:00.000|
|2 |15700 |2018-03-22 08:00:00.000|
|2 |16700 |2018-03-22 09:00:00.000|
+---+-------+-----------------------+