一次处理多个 Spark DataFrame 列
Function over multiple Spark DataFrame columns at once
我需要一次计算多个数据帧列的 md5 哈希值。
函数
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
一列示例
var test_df = load_df.as('a).select($"a.attr1", md5($"a.attr2").as("hash_key"))
+-------------+--------------------+
| attr1 | hash_key|
+-------------+--------------------+
|9/1/2015 0:23|7a2f516dad8f13ae1...|
|9/1/2015 0:31|339c72b1870c3a6be...|
|9/1/2015 0:19|7065847af7abc6bce...|
|9/1/2015 1:32|38c7276958809893b...|
只有一列 (a.attr2) 的生成效果很好,但我找不到任何好的方法来 insert/concatenate 多列进入 md5() 函数。
您应该使用 concat_ws
如下:
md5(concat_ws(",",$"a.attr2",$"a.attr3",$"a.attr4"))
这是一个例子:
Seq(("a","b","c")).toDF("x","y","z").withColumn("foo", md5(concat_ws(",",$"x",$"y",$"z"))).show(false)
// +---+---+---+--------------------------------+
// |x |y |z |foo |
// +---+---+---+--------------------------------+
// |a |b |c |a44c56c8177e32d3613988f4dba7962e|
// +---+---+---+--------------------------------+
就我个人而言,我会在 UDF 中进行串联,这为您提供了更大的灵活性:
例如传递字符串数组:
val md5 = udf((arrs:Seq[String]) => {
val s = arrs.mkString(",")
// do something with s
s
})
df.withColumn("md5",md5(array($"x",$"y",$"z")))
或者甚至传递整行,如果您有混合类型的列,这也有效:
val md5 = udf((r:Row) => {
val s = r.mkString(",")
// do something with s
s
})
df.withColumn("md5",md5(struct($"x",$"y",$"z")))
如果您想使用自定义分隔符连接所有列,请使用:
df.withColumn('row_hash', md5(concat_ws('||', *df.columns)))
对计算行哈希很有用。
我需要一次计算多个数据帧列的 md5 哈希值。
函数
def md5 = udf((s: String) => toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8"))))
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")
一列示例
var test_df = load_df.as('a).select($"a.attr1", md5($"a.attr2").as("hash_key"))
+-------------+--------------------+
| attr1 | hash_key|
+-------------+--------------------+
|9/1/2015 0:23|7a2f516dad8f13ae1...|
|9/1/2015 0:31|339c72b1870c3a6be...|
|9/1/2015 0:19|7065847af7abc6bce...|
|9/1/2015 1:32|38c7276958809893b...|
只有一列 (a.attr2) 的生成效果很好,但我找不到任何好的方法来 insert/concatenate 多列进入 md5() 函数。
您应该使用 concat_ws
如下:
md5(concat_ws(",",$"a.attr2",$"a.attr3",$"a.attr4"))
这是一个例子:
Seq(("a","b","c")).toDF("x","y","z").withColumn("foo", md5(concat_ws(",",$"x",$"y",$"z"))).show(false)
// +---+---+---+--------------------------------+
// |x |y |z |foo |
// +---+---+---+--------------------------------+
// |a |b |c |a44c56c8177e32d3613988f4dba7962e|
// +---+---+---+--------------------------------+
就我个人而言,我会在 UDF 中进行串联,这为您提供了更大的灵活性:
例如传递字符串数组:
val md5 = udf((arrs:Seq[String]) => {
val s = arrs.mkString(",")
// do something with s
s
})
df.withColumn("md5",md5(array($"x",$"y",$"z")))
或者甚至传递整行,如果您有混合类型的列,这也有效:
val md5 = udf((r:Row) => {
val s = r.mkString(",")
// do something with s
s
})
df.withColumn("md5",md5(struct($"x",$"y",$"z")))
如果您想使用自定义分隔符连接所有列,请使用:
df.withColumn('row_hash', md5(concat_ws('||', *df.columns)))
对计算行哈希很有用。