如何从 Scala 方法创建 UDF(计算 md5)?
How to create UDF from Scala methods (to compute md5)?
我想从两个已经工作的函数构建一个 UDF。我正在尝试将 md5 哈希计算为现有 Spark Dataframe 的新列。
def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
结构(我目前所拥有的)
val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))
不幸的是,我不知道如何将函数适当地实现为 UDF。
您可以使用以下 udf
函数命名为 md5
import org.apache.spark.sql.functions._
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))
val new_df = load_df.withColumn("New_MD5_Column", md5(col("Duration")))
为什么不使用内置的 md5 功能?
md5(e: Column): Column Calculates the MD5 digest of a binary column and returns the value as a 32 character hex string.
然后您可以按如下方式使用它:
val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))
您必须确保该列是二进制类型,所以如果它是 int,您可能会看到以下错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'md5(Duration
)' due to data type mismatch: argument 1 requires binary type, however, 'Duration
' is of int type.;;
然后您应该使用 bin 函数将类型更改为 md5
兼容,即二进制类型。
bin(e: Column): Column An expression that returns the string representation of the binary value of the given long column. For example, bin("12")
returns "1100"
.
解决方法如下:
val solution = load_df.
withColumn("bin_duration", bin($"duration")).
withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5 |
+--------+------------+--------------------------------+
|1 |1 |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+
您也可以将函数“链接”在一起,并在一个 withColumn
中进行转换和计算 MD5,但我更喜欢将步骤分开,以防出现需要解决的问题,中间步骤通常会有所帮助。
性能
您考虑使用内置函数 bin
和 md5
而不是自定义用户定义函数 (UDF) 的原因是您 可以 获得更好的性能,因为 Spark SQL 处于完全控制状态并且 不会 不会为内部行表示的序列化和反序列化添加额外的步骤。
这里不是这种情况,但仍然需要较少的导入和使用。
我想从两个已经工作的函数构建一个 UDF。我正在尝试将 md5 哈希计算为现有 Spark Dataframe 的新列。
def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
结构(我目前所拥有的)
val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))
不幸的是,我不知道如何将函数适当地实现为 UDF。
您可以使用以下 udf
函数命名为 md5
import org.apache.spark.sql.functions._
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))
val new_df = load_df.withColumn("New_MD5_Column", md5(col("Duration")))
为什么不使用内置的 md5 功能?
md5(e: Column): Column Calculates the MD5 digest of a binary column and returns the value as a 32 character hex string.
然后您可以按如下方式使用它:
val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))
您必须确保该列是二进制类型,所以如果它是 int,您可能会看到以下错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'md5(
Duration
)' due to data type mismatch: argument 1 requires binary type, however, 'Duration
' is of int type.;;
然后您应该使用 bin 函数将类型更改为 md5
兼容,即二进制类型。
bin(e: Column): Column An expression that returns the string representation of the binary value of the given long column. For example,
bin("12")
returns"1100"
.
解决方法如下:
val solution = load_df.
withColumn("bin_duration", bin($"duration")).
withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5 |
+--------+------------+--------------------------------+
|1 |1 |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+
您也可以将函数“链接”在一起,并在一个 withColumn
中进行转换和计算 MD5,但我更喜欢将步骤分开,以防出现需要解决的问题,中间步骤通常会有所帮助。
性能
您考虑使用内置函数 bin
和 md5
而不是自定义用户定义函数 (UDF) 的原因是您 可以 获得更好的性能,因为 Spark SQL 处于完全控制状态并且 不会 不会为内部行表示的序列化和反序列化添加额外的步骤。
这里不是这种情况,但仍然需要较少的导入和使用。