Spark Scala:如何将多个选定的列传递给一个函数?

Spark Scala: How to pass multiple selected columns to a function?

我需要 select 数据框中的几列(动态地)以从中创建一个 id(可能使用 sha256)。

我不知道如何 select 几列,将其转换为列 NameValue 的映射以发送给函数。

这是不起作用的代码

import org.apache.spark.sql._
val df = Seq(
    ("Java", "20000", "Compiled"), 
    ("Python", "100000", "Interpreted"), 
    ("Scala", "3000", "Compiled")).
    toDF("language","users_count","type")

val neededColumns = Array("language","type")

def GenerateId(map:Map[String,String]) =
{
    val builder = StringBuilder.newBuilder
    for((k,v) <- map) {
        builder.append(k)
        builder.append("=")
        builder.append(value)
    }
    sha2(builder.toString(), 256)
}

df.withColumn("id", GenerateId(df.columns.map(col)));

通过 select 从数据帧中提取 neededColumns 并发送到一个函数,所需的输出是

| language | users_count | type       | id    |
| -------- | ----------- |------------|-------|
| Java     | 20000       |Compiled    | hexid |
| Python   | 100000      |Interpreted | hexid |
| Scala    | 3000        |Compiled    | hexid |

您可以使用 contact 函数构建要传递给 sha2 的字符串列,如下所示:

def GenerateId(cols: Seq[String]) = {
  val concatExpr = concat_ws(",", cols.map(c => concat(lit(s"$c="), col(c))): _*)
  sha2(concatExpr, 256)
}

df.withColumn("id", GenerateId(neededColumns)).show(false)

//+--------+-----------+-----------+----------------------------------------------------------------+
//|language|users_count|type       |id                                                              |
//+--------+-----------+-----------+----------------------------------------------------------------+
//|Java    |20000      |Compiled   |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python  |100000     |Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala   |3000       |Compiled   |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+----------------------------------------------------------------+

幕后花絮:

val concatExpr = concat_ws(",", neededColumns.map(c => concat(lit(s"$c="), col(c))): _*)
df.withColumn("map", concatExpr).withColumn("id", sha2(col("map"), 256)).show(false)

//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|language|users_count|type       |map                             |id                                                              |
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+
//|Java    |20000      |Compiled   |language=Java,type=Compiled     |5322d4f18b5f6b1330e2f2096bdfbdfdcc1dc7ea678982c35a424d3be3077e58|
//|Python  |100000     |Interpreted|language=Python,type=Interpreted|cabdf1501324922d11f4281628c77cf983c219e5f123764598793baf1a96488a|
//|Scala   |3000       |Compiled   |language=Scala,type=Compiled    |4cc0da85338aa336c42dcd101daca71c6571aa12b1f752799fc7db1652a38d63|
//+--------+-----------+-----------+--------------------------------+----------------------------------------------------------------+