如何从 Dataset Spark 中获取唯一键
How to get unique key from Dataset Spark
我的数据集很小:
+-------------------+-------------+--------------+-------+-------------+
| session_id| insert_dttm| key| value| process_name|
+-------------------+-------------+--------------+-------+-------------+
|local-1641922005078|1641922023703|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023704|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023705|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023706|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023707|test_file3.csv|Success|ProcessResult|
|local-1641922005078|1641922023708|test_file3.csv|Success|ProcessResult|
+-------------------+-------------+--------------+-------+-------------+
我想获取一个最新的唯一键值的新数据集。
示例输出数据集:
+-------------------+-------------+--------------+-------+-------------+
| session_id| insert_dttm| key| value| process_name|
+-------------------+-------------+--------------+-------+-------------+
|local-1641922005078|1641922023704|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023706|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023708|test_file3.csv|Success|ProcessResult|
+-------------------+-------------+--------------+-------+-------------+
如何在不使用 SQL 的情况下使用 Spark API 获得这样的数据集?
您可以使用此代码片段通过 Scala 删除重复的行:
val dataframe= (... your dataframe ...)
val rankColumn = "rank"
val window = Window.partitionBy(col("session_id"),col("key"),col("value"),col("process_name")).orderBy(col("insert_dttm").desc)
val deduplicatedDf = dataframe.withColumn(rankColumn, row_number over window).filter(col(rankColumn) === 1)
这可能有效:
import org.apache.spark.sql.functions.col
df.groupBy(
df.columns
.filterNot(z => z == "insert_dttm" || z == "session_id")
.map(col(_)):_*)
.agg(
max(df("insert_dttm")).as("insert_dttm"),
max(df("session_id")).as("session_id"))
这与SQL中的做法基本相同:
SELECT
MAX(insert_dttm),
MAX(session_id),
<all the other columns>
GROUP BY
<all the other columns>
不需要window
函数,如果可能的话最好避免。
我的数据集很小:
+-------------------+-------------+--------------+-------+-------------+
| session_id| insert_dttm| key| value| process_name|
+-------------------+-------------+--------------+-------+-------------+
|local-1641922005078|1641922023703|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023704|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023705|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023706|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023707|test_file3.csv|Success|ProcessResult|
|local-1641922005078|1641922023708|test_file3.csv|Success|ProcessResult|
+-------------------+-------------+--------------+-------+-------------+
我想获取一个最新的唯一键值的新数据集。
示例输出数据集:
+-------------------+-------------+--------------+-------+-------------+
| session_id| insert_dttm| key| value| process_name|
+-------------------+-------------+--------------+-------+-------------+
|local-1641922005078|1641922023704|test_file1.csv|Success|ProcessResult|
|local-1641922005078|1641922023706|test_file2.csv|Success|ProcessResult|
|local-1641922005078|1641922023708|test_file3.csv|Success|ProcessResult|
+-------------------+-------------+--------------+-------+-------------+
如何在不使用 SQL 的情况下使用 Spark API 获得这样的数据集?
您可以使用此代码片段通过 Scala 删除重复的行:
val dataframe= (... your dataframe ...)
val rankColumn = "rank"
val window = Window.partitionBy(col("session_id"),col("key"),col("value"),col("process_name")).orderBy(col("insert_dttm").desc)
val deduplicatedDf = dataframe.withColumn(rankColumn, row_number over window).filter(col(rankColumn) === 1)
这可能有效:
import org.apache.spark.sql.functions.col
df.groupBy(
df.columns
.filterNot(z => z == "insert_dttm" || z == "session_id")
.map(col(_)):_*)
.agg(
max(df("insert_dttm")).as("insert_dttm"),
max(df("session_id")).as("session_id"))
这与SQL中的做法基本相同:
SELECT
MAX(insert_dttm),
MAX(session_id),
<all the other columns>
GROUP BY
<all the other columns>
不需要window
函数,如果可能的话最好避免。