如何为每个对应的唯一列值分配一个唯一 ID?

How to assign a unique ID to each corresponding unique column value?

想象一个如下所示的数据框:

+-------+--------+---------+---------+--------+-----------------+---+
|address|lastname|firstname|patientid|policyno|visitid          |id |
+-------+--------+---------+---------+--------+-----------------+---+
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_1| 1 |
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_2| 1 |
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_3| 1 |
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_1 | 2 |
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_2 | 2 |
|addr2  |Dobs    |OP       |patid3   |policy3 |visituid_OP_1    | 3 |
+-------+--------+---------+---------+--------+-----------------+---+

当名称 (firstname + lastname) 相同时,“id”的列值保持不变,当名称变得不同时 - 我想分配一个新的 id。

我需要这个,因为我想取消识别某个数据集(其中包含敏感详细信息),以便我可以在我的应用程序中使用相同的数据但更改了值。字段 id 将用作与其他数据帧连接的索引键。

关于 id 的另一部分是其他虚拟数据框也将包含类似的 id 列,可能会使用 monotonically_increasing_id() 填充,因此最好id 从 0 或 1 开始,以不断增加的方式。

我怎样才能用 scala 在 spark 中实现这个目标?

import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
  ("Xxx", "yyy"),
  ("xxx", "yyy"),
  ("aaa", "yyy")
)).toDF("c1", "c2")

df.withColumn("hashName", hash(concat($"c1", $"c2")))//.show(false)

那么明显的改进是什么?散列前两者和大写或小写之间的分隔符。 如果不清理,用分隔符替换所有空格和奇数字符。

更好的是:

df.withColumn("preHashName", lower(trim(concat($"c1", lit("|"), $"c2"))))
  .withColumn("hashName", hash(lower(concat($"c1", lit("|"), $"c2")))).show(false)

不这样做意味着在另一个答案中,AN OTHER 在散列方面可以与 ANO THER 相同。

输出:

+---+------+-----------+-----------+
|c1 |c2    |preHashName|hashName   |
+---+------+-----------+-----------+
|Xxx|y yy  |xxx|y yy   |907198499  |
|xxx|yyy   |xxx|yyy    |-1167597858|
|aaa|yyy   |aaa|yyy    |495090835  |
+---+------+-----------+-----------+

使用 window 函数。

scala> df.show(false)
+-------+--------+---------+---------+--------+-----------------+
|address|lastname|firstname|patientid|policyno|visitid          |
+-------+--------+---------+---------+--------+-----------------+
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_1|
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_2|
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_3|
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_1 |
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_2 |
|addr2  |Dobs    |OP       |patid3   |policy3 |visituid_OP_1    |
|addr4  |AN      |OTHER    |patid4   |policy4 |visituid_OP_1    |
|addr2  |ANO     |THER     |patid5   |policy5 |visituid_OP_1    |
+-------+--------+---------+---------+--------+-----------------+
val expr = Seq("lastname","firstname")
            .map(c => hash(col(c)).as(c))
            .reduce(concat(_,_).asc)
val winSpec = dense_rank()
                .over(Window.orderBy(expr))

输出

scala> df.withColumn("id",winSpec).show(false)

+-------+--------+---------+---------+--------+-----------------+---+
|address|lastname|firstname|patientid|policyno|visitid          |id |
+-------+--------+---------+---------+--------+-----------------+---+
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_1|1  |
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_2|1  |
|addr1  |Pits    |Rodney   |patid1   |policy1 |visituid_rodney_3|1  |
|addr4  |AN      |OTHER    |patid4   |policy4 |visituid_OP_1    |2  |
|addr2  |Dobs    |OP       |patid3   |policy3 |visituid_OP_1    |3  |
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_1 |4  |
|addr2  |Dobs    |Aliya    |patid2   |policy2 |visituid_aliya_2 |4  |
|addr2  |ANO     |THER     |patid5   |policy5 |visituid_OP_1    |5  |
+-------+--------+---------+---------+--------+-----------------+---+