Better/Efficient 过滤掉具有多个条件的 Spark Dataframe 行的方法

Better/Efficient way to filter out Spark Dataframe rows with multiple conditions

我的数据框如下所示

  id          pub_date   version         unique_id     c_id    p_id    type      source
lni001        20220301      1           64WP-UI-POLI    002     P02    org      internet
lni001        20220301      1           64WP-UI-POLI    002     P02    org      internet
lni001        20220301      1           64WP-UI-POLI    002     P02    org      internet
lni001        20220301      2           64WP-UI-CFGT    012     K21   location  internet
lni001        20220301      2           64WP-UI-CFGT    012     K21   location  internet
lni001        20220301      3           64WP-UI-CFGT    012     K21   location  internet
lni001        20220301      3           64WP-UI-POLI    002     P02    org      internet
lni001        20220301      85          64WP-UI-POLI    002     P02    org      internet
lni001        20220301      85          64WP-UI-POLI    002     P02    org      internet
lni001        20220301      5           64WP-UI-CFGT    012     K21   location  internet
lni002        20220301      1           64WP-UI-CFGT    012     K21   location  internet
 ::
 ::

我想按 id 列分组,只保留版本列中的最高数字,但这里有一个问题,我还需要考虑类型列(只有两种类型,组织或位置)。 最终数据框如下所示

  id          pub_date   version         unique_id     c_id    p_id    type      source
lni001        20220301      85          64WP-UI-POLI    002     P02    org      internet
lni001        20220301      85          64WP-UI-POLI    002     P02    org      internet
lni001        20220301      5           64WP-UI-CFGT    012     K21   location  internet
lni002        20220301      14          64WP-UI-CFGT    012     K21   location  internet
 ::
 ::

我目前的方法是将数据框分成两个不同的部分,第一个是类型列下的 org,另一个是类型 column.Then 下的位置我正在使用 groupby, withColumn 但我的数据框很大.我想知道是否有更有效的方法可以在一行代码中做到这一点?而不是需要将它们分成两个数据框然后将它们合并回一起?

谢谢!

dense_rank() 可用于根据 ID 和类型找出最高版本。这可用于仅保留每个组中的顶部记录。

input.withColumn("rank", dense_rank() over (Window.partitionBy($"id",$"type").orderBy($"version".desc)))
  .filter($"rank" === 1)
  .drop($"rank")

输出:

+------+--------+-------+------------+---+----+--------+--------+
|id    |pub_date|version|unique_id   |_id|p_id|type    |source  |
+------+--------+-------+------------+---+----+--------+--------+
|lni001|20220301|5      |64WP-UI-CFGT|012|K21 |location|internet|
|lni001|20220301|85     |64WP-UI-POLI|002|P02 |org     |internet|
|lni001|20220301|85     |64WP-UI-POLI|002|P02 |org     |internet|
|lni002|20220301|1      |64WP-UI-CFGT|012|K21 |location|internet|
+------+--------+-------+------------+---+----+--------+--------+