如何从 Scala 方法创建 UDF(计算 md5)?

How to create UDF from Scala methods (to compute md5)?

我想从两个已经工作的函数构建一个 UDF。我正在尝试将 md5 哈希计算为现有 Spark Dataframe 的新列。

def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")

结构(我目前所拥有的)

val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))

不幸的是,我不知道如何将函数适当地实现为 UDF。

您可以使用以下 udf 函数命名为 md5

import org.apache.spark.sql.functions._
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))

val new_df = load_df.withColumn("New_MD5_Column", md5(col("Duration")))

为什么不使用内置的 md5 功能?

md5(e: Column): Column Calculates the MD5 digest of a binary column and returns the value as a 32 character hex string.

然后您可以按如下方式使用它:

val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))

您必须确保该列是二进制类型,所以如果它是 int,您可能会看到以下错误:

org.apache.spark.sql.AnalysisException: cannot resolve 'md5(Duration)' due to data type mismatch: argument 1 requires binary type, however, 'Duration' is of int type.;;

然后您应该使用 bin 函数将类型更改为 md5 兼容,即二进制类型。

bin(e: Column): Column An expression that returns the string representation of the binary value of the given long column. For example, bin("12") returns "1100".

解决方法如下:

val solution = load_df.
  withColumn("bin_duration", bin($"duration")).
  withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5                             |
+--------+------------+--------------------------------+
|1       |1           |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+

您也可以将函数“链接”在一起,并在一个 withColumn 中进行转换和计算 MD5,但我更喜欢将步骤分开,以防出现需要解决的问题,中间步骤通常会有所帮助。

性能

您考虑使用内置函数 binmd5 而不是自定义用户定义函数 (UDF) 的原因是您 可以 获得更好的性能,因为 Spark SQL 处于完全控制状态并且 不会 不会为内部行表示的序列化和反序列化添加额外的步骤。

这里不是这种情况,但仍然需要较少的导入和使用。