Scala Spark 添加一个数字占总和的百分比的列

Scala Spark Add a Column with percentage of a number over sum

我有一个带有架构的数据集

root
 |-- id: long (nullable = true)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)

假设 dist 是 [(A, 10), (B, 5), (C, 3)],我想添加一列 [10/18, 5/18, 3/18] 对应于数字除以总和。

我该怎么做?谢谢大家

假设您从类似的数据集开始(已更新以处理 Option[Map[String, Long]]):

import org.apache.spark.sql.Dataset

case class Model(id: Long, dist: Option[Map[String, Long]])

val ds: Dataset[Model] = List(
  Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
  Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
  Model(3, None)
).toDS()

和您的新列 percentage,需要 String 列才能显示 {x}/{y}:

Spark >= 3.0.0

import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType

ds
  // take just the values of the map `dist`
  .withColumn("values", map_values(col("dist")))
   // calculate the total per row
  .withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc + x))
  // format column with total
  .withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
  .drop("total", "values")
  .show(false)

给出:

+---+----------------------------+-------------------------+
|id |dist                        |percentage               |
+---+----------------------------+-------------------------+
|1  |{a -> 10, b -> 5, c -> 3}   |[10/18, 5/18, 3/18]      |
|2  |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3  |null                        |null                     |
+---+----------------------------+-------------------------+

其架构为:

root
 |-- id: long (nullable = false)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = false)
 |-- percentage: array (nullable = true)
 |    |-- element: string (containsNull = false)

Spark < 3.0.0(使用 Spark 2.4.5 测试)

import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._

val toDivisionUDF = udf(
  (values: Seq[String], total: Long) => 
    "[" + values.map(v =>  s"$v/$total").mkString(", ") + "]"
)

ds
  // take just the values of the map `dist`
  .withColumn("values", map_values(col("dist")))
   // calculate the total per row
  .withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc + v)"))
  // format column with total
  .withColumn("percentage", toDivisionUDF(col("values"), col("total")))
  .drop("total", "values")
  .show(false)

给出:

+---+----------------------------+-------------------------+
|id |dist                        |percentage               |
+---+----------------------------+-------------------------+
|1  |[a -> 10, b -> 5, c -> 3]   |[10/18, 5/18, 3/18]      |
|2  |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3  |null                        |null                     |
+---+----------------------------+-------------------------+

其架构为:

root
 |-- id: long (nullable = false)
 |-- dist: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = false)
 |-- percentage: string (nullable = true)

请注意,包含 Option 根本没有改变代码,因为 Spark 处理选项,将选项内的字段视为可为空。