在 spark 数据帧中生成哈希键(数据帧中的唯一标识符列)

generate hash key (unique identifier column in dataframe) in spark dataframe

我有 table 超过 100k 行。我需要从连接的列中生成唯一的 id,这将是唯一的。我已经尝试过 md5 函数,它适用于较少的数据,但对于大量数据,值会重复。 您能否提供任何替代解决方案或我如何使其适合 100k 数据行。

val df = Seq(
              ("Veg", "tomato", 1.99),
              ("Veg", "potato", 0.45),
              ("Fruit", "apple", 0.99),
              ("Fruit", "pineapple", 2.59)
               ).toDF("Type", "Item", "Price")

df.withColumn("hash_value",md5(concat($"Type",$"Item",$"Price"))).show(false)

+-----+---------+-----+--------------------------------+
|Type |Item     |Price|hash_value                      |
+-----+---------+-----+--------------------------------+
|Veg  |tomato   |1.99 |82215bc9c2078d2f1e773ad62b4f88c6|
|Veg  |potato   |0.45 |5c68bcadcbfdedf8b8c6edca20fd5126|
|Fruit|apple    |0.99 |830a70f1c16f015aa54ca24d9ea6ce0b|
|Fruit|pineapple|2.59 |1f0974817391905d41224e76735fc5d4|
+-----+---------+-----+--------------------------------+

如何为连接列创建唯一标识符?

我建议你使用 sha 256 或 512 以避免冲突:

df.withColumn(
    "hash_value",
    F.sha2(
        F.concat(*(
            F.col(col).cast("string")
            for col 
            in df.columns
        )),
        256
    )
).show()

Scala 语法:

如有必要,将字符串标准化为大写 trim:

'trim()' 和 'upper()'

...
sha2(upper(
          trim(
              concat(...)
              )
           ),256
     )
...

scala中的解决方案是这样的:

import org.apache.spark.sql.functions._

val df = Seq(
              ("Veg", "tomato", 1.99),
              ("Veg", "potato", 0.45),
              ("Fruit", "apple", 0.99),
              ("Fruit", "pineapple", 2.59)
               ).toDF("Type", "Item", "Price")

df.withColumn("hash_value",sha2(concat($"Type",$"Item",$"Price"),256)).show(false)


+-----+---------+-----+----------------------------------------------------------------+
|Type |Item     |Price|hash_value                                                      |
+-----+---------+-----+----------------------------------------------------------------+
|Veg  |tomato   |1.99 |a9770f4457e613bcd20ebb47d50e374f9b7de6d0ff0d19e4f246b09a9505af67|
|Veg  |potato   |0.45 |367e976594aa71d73d69cc5ca2e4ab523ec64be88210f5b61da76f92142770c3|
|Fruit|apple    |0.99 |da52a8e67132cf43a8fe39e3e97ed5759e9aa3f16140320a0f82664e10eea136|
|Fruit|pineapple|2.59 |df0353ab978049e27e8ff809a804b452b0f3b06321b494b903aea432fa4203c2|
+-----+---------+-----+----------------------------------------------------------------+

如果要生成散列键,同时处理包含null值的列,请执行以下操作:使用concat_ws

import pyspark.sql.functions as F
df = df.withColumn(
    "ID",
    F.sha2(
        F.concat_ws("", *(
            F.col(c).cast("string")
            for c
                in df.columns
        )),
        256
    )
)