Scala Spark 添加一个数字占总和的百分比的列
Scala Spark Add a Column with percentage of a number over sum
我有一个带有架构的数据集
root
|-- id: long (nullable = true)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
假设 dist 是 [(A, 10), (B, 5), (C, 3)]
,我想添加一列 [10/18, 5/18, 3/18]
对应于数字除以总和。
我该怎么做?谢谢大家
假设您从类似的数据集开始(已更新以处理 Option[Map[String, Long]]
):
import org.apache.spark.sql.Dataset
case class Model(id: Long, dist: Option[Map[String, Long]])
val ds: Dataset[Model] = List(
Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
Model(3, None)
).toDS()
和您的新列 percentage
,需要 String
列才能显示 {x}/{y}
:
Spark >= 3.0.0
import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc + x))
// format column with total
.withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
.drop("total", "values")
.show(false)
给出:
+---+----------------------------+-------------------------+
|id |dist |percentage |
+---+----------------------------+-------------------------+
|1 |{a -> 10, b -> 5, c -> 3} |[10/18, 5/18, 3/18] |
|2 |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3 |null |null |
+---+----------------------------+-------------------------+
其架构为:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: array (nullable = true)
| |-- element: string (containsNull = false)
Spark < 3.0.0(使用 Spark 2.4.5 测试)
import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._
val toDivisionUDF = udf(
(values: Seq[String], total: Long) =>
"[" + values.map(v => s"$v/$total").mkString(", ") + "]"
)
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc + v)"))
// format column with total
.withColumn("percentage", toDivisionUDF(col("values"), col("total")))
.drop("total", "values")
.show(false)
给出:
+---+----------------------------+-------------------------+
|id |dist |percentage |
+---+----------------------------+-------------------------+
|1 |[a -> 10, b -> 5, c -> 3] |[10/18, 5/18, 3/18] |
|2 |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3 |null |null |
+---+----------------------------+-------------------------+
其架构为:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: string (nullable = true)
请注意,包含 Option
根本没有改变代码,因为 Spark 处理选项,将选项内的字段视为可为空。
我有一个带有架构的数据集
root
|-- id: long (nullable = true)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
假设 dist 是 [(A, 10), (B, 5), (C, 3)]
,我想添加一列 [10/18, 5/18, 3/18]
对应于数字除以总和。
我该怎么做?谢谢大家
假设您从类似的数据集开始(已更新以处理 Option[Map[String, Long]]
):
import org.apache.spark.sql.Dataset
case class Model(id: Long, dist: Option[Map[String, Long]])
val ds: Dataset[Model] = List(
Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
Model(3, None)
).toDS()
和您的新列 percentage
,需要 String
列才能显示 {x}/{y}
:
Spark >= 3.0.0
import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc + x))
// format column with total
.withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
.drop("total", "values")
.show(false)
给出:
+---+----------------------------+-------------------------+
|id |dist |percentage |
+---+----------------------------+-------------------------+
|1 |{a -> 10, b -> 5, c -> 3} |[10/18, 5/18, 3/18] |
|2 |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3 |null |null |
+---+----------------------------+-------------------------+
其架构为:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: array (nullable = true)
| |-- element: string (containsNull = false)
Spark < 3.0.0(使用 Spark 2.4.5 测试)
import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._
val toDivisionUDF = udf(
(values: Seq[String], total: Long) =>
"[" + values.map(v => s"$v/$total").mkString(", ") + "]"
)
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc + v)"))
// format column with total
.withColumn("percentage", toDivisionUDF(col("values"), col("total")))
.drop("total", "values")
.show(false)
给出:
+---+----------------------------+-------------------------+
|id |dist |percentage |
+---+----------------------------+-------------------------+
|1 |[a -> 10, b -> 5, c -> 3] |[10/18, 5/18, 3/18] |
|2 |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3 |null |null |
+---+----------------------------+-------------------------+
其架构为:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: string (nullable = true)
请注意,包含 Option
根本没有改变代码,因为 Spark 处理选项,将选项内的字段视为可为空。