将 null 替换为 spark 数据框中所有行的自动递增唯一值,所有重复行应具有相同的 unique_id 值

Replace null with auto incremented unique values for all rows in a spark dataframe and all duplicate rows should have same unique_id value

我是 pyspark 的新手,在以下情况下需要帮助。

对于数据框:

i)我想为 unique_id 列生成唯一值,其范围应从 3000000 开始。 ii) 所有重复行的 unique_id 值应该相同。

输入 -->

+--------+------------+-----+
|g_id | c_id |unique_id |
| 1 | 8 |空 |
| 1 | 8 |空 |
| 4 | 4 |空 |
| 5 | 6 |空 |
| 2 | 1 |空 |
+--------+--------+---------+

输出-->

global_id comp_id unique_id

+-----+---+----+------------+
| g_id | c_id | unique_id |
| 1 | 8| 3000000 |
| 1 | 8| 3000000 |
| 4 | 4| 4384994 |
| 5 | 6| 3748484 |
| 2 | 1| 3674849 |
+---+---+------+------------+

这是我到目前为止尝试做的事情:

get_gouped_df = Window.partitionBy("g_id","comp_id").orderBy("unique_id")

assign_unique_id = df.withColumn("unique_id", when(row_number().over(get_gouped_df) == 1, 
                                                               monotonically_increasing_id())
                                                              .otherwise(checkglobalDF.unique))

您可以使用 rank() 函数,然后将 3000000 添加到该列。

你可以试试这个:

case class A(g_id: String, c_id: String)

import sparkSession.implicits._
import org.apache.spark.sql.functions._

Seq(A("1","8"),A("1","8"),A("4","4"),A("5","6"),A("2","1")).toDF
      .withColumn("unique_id", lit(3000000)+rank().over(Window.orderBy($"g_id", $"c_id")))
      .show(false)

输出:

+----+----+---------+
|g_id|c_id|unique_id|
+----+----+---------+
|1   |8   |1        |
|1   |8   |1        |
|2   |1   |3        |
|4   |4   |4        |
|5   |6   |5        |
+----+----+---------+