Spark:agg 内的多个过滤器和 concat 不是空值

Spark: Multiple filter inside agg and concat not null values

我正在尝试连接列表列中的非空值。 我知道这可以通过使用 UDF 轻松完成,但想知道如何通过在 agg 函数中使用多个过滤条件来处理这个问题。 不知道这里缺少什么?

val df = sc.parallelize(Seq(("foo", List(null,"bar",null)), 
                            ("bar", List("one","two",null)),
                            ("rio", List("Ria","","Kevin")))).toDF("key", "value")

+---+-----------------+
|key|            value|
+---+-----------------+
|foo|[null, bar, null]|
|bar| [one, two, null]|
|rio|   [Ria, , Kevin]|
+---+-----------------+                         

df.groupBy("key")
  .agg(concat_ws(",",first(when(($"value".isNotNull || $"value" =!= ""),$"value"))).as("RemovedNullSeq"))
  .show(false)

+---+--------------+
|key|RemovedNullSeq|
+---+--------------+
|bar|one,two       |
|rio|Ria,,Kevin    |
|foo|bar           |
+---+--------------+

我不需要第二条记录中的空白值。 谢谢

根据提供的示例,我不确定是否有必要使用聚合函数。

如果您只是想连接数组中的值,则可以使用以下方法:

val df = Seq(List(null,"abc", null),
    List(null, null, null), 
    List(null, "def", "ghi", "kjl"),
    List("mno", null, "pqr")).toDF("list")

df.withColumn("concat", concat_ws(",",$"list")).show(false)

+---------------------+-----------+
|list                 |concat     |
+---------------------+-----------+
|[null, abc, null]    |abc        |
|[null, null, null]   |           |
|[null, def, ghi, kjl]|def,ghi,kjl|
|[mno, null, pqr]     |mno,pqr    |
+---------------------+-----------+

如有需要先分组:

val df2 = Seq((123,List(null,"abc", null)),
    (123,List(null,"def", "hij"))).toDF("key","list")

df2.show(false)
+---+-----------------+
|key|list             |
+---+-----------------+
|123|[null, abc, null]|
|123|[null, def, hij] |
+---+-----------------+

你可能认为你可以做类似的事情

val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))

然后将一些函数应用于数组数组以获得串联结果。但是,如果不求助于 UDF,我一直无法找到执行此操作的方法。

在这种情况下,在分组之前爆炸就可以了:

val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))
    .groupBy($"key").agg(collect_list($"listItem").as("collected"))
    .withColumn("concat", concat_ws(",",$"collected")).show(false)

+---+---------------+-----------+
|key|collected      |concat     |
+---+---------------+-----------+
|123|[abc, def, hij]|abc,def,hij|
+---+---------------+-----------+

但请注意,无法保证列表的收集顺序。

希望对您有所帮助

import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(("foo", List(null,"bar",null)), 
                            ("bar", List("one","two",null)),
                            ("rio", List("Ria","","Kevin")))).toDF("key", "value")

val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0)
val rsult = filtd.select($"*").groupBy($"key").agg(collect_list("val"))
rsult.show(5)

您可以像这样添加多个条件

val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0 && $"val".isNotNull)

输出

+---+-----------------+
|key|collect_list(val)|
+---+-----------------+
|bar|       [one, two]|
|rio|     [Ria, Kevin]|
|foo|            [bar]|
+---+-----------------+