Spark Scala:如何将多个选定的列传递给一个函数?
Spark Scala: How to pass multiple selected columns to a function?
我需要 select 数据框中的几列(动态地)以从中创建一个 id(可能使用 sha256)。
我不知道如何 select 几列,将其转换为列 Name
和 Value
的映射以发送给函数。
这是不起作用的代码
import org.apache.spark.sql._
val df = Seq(
("Java", "20000", "Compiled"),
("Python", "100000", "Interpreted"),
("Scala", "3000", "Compiled")).
toDF("language","users_count","type")
val neededColumns = Array("language","type")
def GenerateId(map:Map[String,String]) =
{
val builder = StringBuilder.newBuilder
for((k,v) <- map) {
builder.append(k)
builder.append("=")
builder.append(value)
}
sha2(builder.toString(), 256)
}
df.withColumn("id", GenerateId(df.columns.map(col)));
通过 select 从数据帧中提取 neededColumns
并发送到一个函数,所需的输出是
| language | users_count | type | id |
| -------- | ----------- |------------|-------|
| Java | 20000 |Compiled | hexid |
| Python | 100000 |Interpreted | hexid |
| Scala | 3000 |Compiled | hexid |
您可以使用 contact
函数构建要传递给 sha2
的字符串列,如下所示:
def GenerateId(cols: Seq[String]) = {
val concatExpr = concat_ws(",", cols.map(c => concat(lit(s"$c="), col(c))): _*)
sha2(concatExpr, 256)
}
df.withColumn("id", GenerateId(neededColumns)).show(false)
//+--------+-----------+-----------+----------------------------------------------------------------+
//|language|users_count|type |id |
//+--------+-----------+-----------+----------------------------------------------------------------+
//|Java |20000 |Compiled |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python |100000 |Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala |3000 |Compiled |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+----------------------------------------------------------------+
幕后花絮:
val concatExpr = concat_ws(",", neededColumns.map(c => concat(lit(s"$c="), col(c))): _*)
df.withColumn("map", concatExpr).withColumn("id", sha2(col("map"), 256)).show(false)
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|language|users_count|type |map |id |
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|Java |20000 |Compiled |language=Java,type=Compiled |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python |100000 |Interpreted|language=Python,type=Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala |3000 |Compiled |language=Scala,type=Compiled |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
我需要 select 数据框中的几列(动态地)以从中创建一个 id(可能使用 sha256)。
我不知道如何 select 几列,将其转换为列 Name
和 Value
的映射以发送给函数。
这是不起作用的代码
import org.apache.spark.sql._
val df = Seq(
("Java", "20000", "Compiled"),
("Python", "100000", "Interpreted"),
("Scala", "3000", "Compiled")).
toDF("language","users_count","type")
val neededColumns = Array("language","type")
def GenerateId(map:Map[String,String]) =
{
val builder = StringBuilder.newBuilder
for((k,v) <- map) {
builder.append(k)
builder.append("=")
builder.append(value)
}
sha2(builder.toString(), 256)
}
df.withColumn("id", GenerateId(df.columns.map(col)));
通过 select 从数据帧中提取 neededColumns
并发送到一个函数,所需的输出是
| language | users_count | type | id |
| -------- | ----------- |------------|-------|
| Java | 20000 |Compiled | hexid |
| Python | 100000 |Interpreted | hexid |
| Scala | 3000 |Compiled | hexid |
您可以使用 contact
函数构建要传递给 sha2
的字符串列,如下所示:
def GenerateId(cols: Seq[String]) = {
val concatExpr = concat_ws(",", cols.map(c => concat(lit(s"$c="), col(c))): _*)
sha2(concatExpr, 256)
}
df.withColumn("id", GenerateId(neededColumns)).show(false)
//+--------+-----------+-----------+----------------------------------------------------------------+
//|language|users_count|type |id |
//+--------+-----------+-----------+----------------------------------------------------------------+
//|Java |20000 |Compiled |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python |100000 |Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala |3000 |Compiled |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+----------------------------------------------------------------+
幕后花絮:
val concatExpr = concat_ws(",", neededColumns.map(c => concat(lit(s"$c="), col(c))): _*)
df.withColumn("map", concatExpr).withColumn("id", sha2(col("map"), 256)).show(false)
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|language|users_count|type |map |id |
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|Java |20000 |Compiled |language=Java,type=Compiled |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python |100000 |Interpreted|language=Python,type=Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala |3000 |Compiled |language=Scala,type=Compiled |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+