如何创建一个映射列来计算没有 udaf 的出现次数

How to create a map column to count occurrences without udaf

我想创建一个 Map 列来计算出现的次数。

例如:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

会导致

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

目前,在 Spark 2.4.6 中,我能够使用 udaf 来实现它。

在转向 Spark3 时,我想知道我是否可以摆脱这个 udaf(我尝试使用新方法 aggregate 但没有成功)

有什么有效的方法吗? (效率部分,我可以轻松测试)

我们可以实现这个是spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))

这里的解决方案只有一个 groupBy 和一个稍微复杂的 sql 表达式。此解决方案适用于 Spark 2.4+

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),
       expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",
       expr("map_from_arrays(set,transform(set, x -> size(filter(list, y -> y=x))))"))
  .show()

输出:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a, b]|[a, a, b]|[a -> 2, b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

我们的想法是从列 a 中收集数据两次:一次收集到集合中,一次收集到列表中。然后在 transform for each element of the set the number of occurences of the particular element in the list is counted. Finally, the set and the number of elements are combined with map_from_arrays.

的帮助下

但是我不能说这种方法是否真的比 UDAF 更快。

这里是 Spark 3 解决方案:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这里使用Aggregator的解决方案:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

您始终可以将 collect_list 与 UDF 一起使用,但前提是您的分组不太长:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这应该比 UDAF 更快:Spark custom aggregation : collect_list+UDF vs UDAF