Spark:agg 内的多个过滤器和 concat 不是空值
Spark: Multiple filter inside agg and concat not null values
我正在尝试连接列表列中的非空值。
我知道这可以通过使用 UDF 轻松完成,但想知道如何通过在 agg 函数中使用多个过滤条件来处理这个问题。
不知道这里缺少什么?
val df = sc.parallelize(Seq(("foo", List(null,"bar",null)),
("bar", List("one","two",null)),
("rio", List("Ria","","Kevin")))).toDF("key", "value")
+---+-----------------+
|key| value|
+---+-----------------+
|foo|[null, bar, null]|
|bar| [one, two, null]|
|rio| [Ria, , Kevin]|
+---+-----------------+
df.groupBy("key")
.agg(concat_ws(",",first(when(($"value".isNotNull || $"value" =!= ""),$"value"))).as("RemovedNullSeq"))
.show(false)
+---+--------------+
|key|RemovedNullSeq|
+---+--------------+
|bar|one,two |
|rio|Ria,,Kevin |
|foo|bar |
+---+--------------+
我不需要第二条记录中的空白值。
谢谢
根据提供的示例,我不确定是否有必要使用聚合函数。
如果您只是想连接数组中的值,则可以使用以下方法:
val df = Seq(List(null,"abc", null),
List(null, null, null),
List(null, "def", "ghi", "kjl"),
List("mno", null, "pqr")).toDF("list")
df.withColumn("concat", concat_ws(",",$"list")).show(false)
+---------------------+-----------+
|list |concat |
+---------------------+-----------+
|[null, abc, null] |abc |
|[null, null, null] | |
|[null, def, ghi, kjl]|def,ghi,kjl|
|[mno, null, pqr] |mno,pqr |
+---------------------+-----------+
如有需要先分组:
val df2 = Seq((123,List(null,"abc", null)),
(123,List(null,"def", "hij"))).toDF("key","list")
df2.show(false)
+---+-----------------+
|key|list |
+---+-----------------+
|123|[null, abc, null]|
|123|[null, def, hij] |
+---+-----------------+
你可能认为你可以做类似的事情
val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))
然后将一些函数应用于数组数组以获得串联结果。但是,如果不求助于 UDF,我一直无法找到执行此操作的方法。
在这种情况下,在分组之前爆炸就可以了:
val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))
.groupBy($"key").agg(collect_list($"listItem").as("collected"))
.withColumn("concat", concat_ws(",",$"collected")).show(false)
+---+---------------+-----------+
|key|collected |concat |
+---+---------------+-----------+
|123|[abc, def, hij]|abc,def,hij|
+---+---------------+-----------+
但请注意,无法保证列表的收集顺序。
希望对您有所帮助
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(("foo", List(null,"bar",null)),
("bar", List("one","two",null)),
("rio", List("Ria","","Kevin")))).toDF("key", "value")
val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0)
val rsult = filtd.select($"*").groupBy($"key").agg(collect_list("val"))
rsult.show(5)
您可以像这样添加多个条件
val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0 && $"val".isNotNull)
输出
+---+-----------------+
|key|collect_list(val)|
+---+-----------------+
|bar| [one, two]|
|rio| [Ria, Kevin]|
|foo| [bar]|
+---+-----------------+
我正在尝试连接列表列中的非空值。 我知道这可以通过使用 UDF 轻松完成,但想知道如何通过在 agg 函数中使用多个过滤条件来处理这个问题。 不知道这里缺少什么?
val df = sc.parallelize(Seq(("foo", List(null,"bar",null)),
("bar", List("one","two",null)),
("rio", List("Ria","","Kevin")))).toDF("key", "value")
+---+-----------------+
|key| value|
+---+-----------------+
|foo|[null, bar, null]|
|bar| [one, two, null]|
|rio| [Ria, , Kevin]|
+---+-----------------+
df.groupBy("key")
.agg(concat_ws(",",first(when(($"value".isNotNull || $"value" =!= ""),$"value"))).as("RemovedNullSeq"))
.show(false)
+---+--------------+
|key|RemovedNullSeq|
+---+--------------+
|bar|one,two |
|rio|Ria,,Kevin |
|foo|bar |
+---+--------------+
我不需要第二条记录中的空白值。 谢谢
根据提供的示例,我不确定是否有必要使用聚合函数。
如果您只是想连接数组中的值,则可以使用以下方法:
val df = Seq(List(null,"abc", null),
List(null, null, null),
List(null, "def", "ghi", "kjl"),
List("mno", null, "pqr")).toDF("list")
df.withColumn("concat", concat_ws(",",$"list")).show(false)
+---------------------+-----------+
|list |concat |
+---------------------+-----------+
|[null, abc, null] |abc |
|[null, null, null] | |
|[null, def, ghi, kjl]|def,ghi,kjl|
|[mno, null, pqr] |mno,pqr |
+---------------------+-----------+
如有需要先分组:
val df2 = Seq((123,List(null,"abc", null)),
(123,List(null,"def", "hij"))).toDF("key","list")
df2.show(false)
+---+-----------------+
|key|list |
+---+-----------------+
|123|[null, abc, null]|
|123|[null, def, hij] |
+---+-----------------+
你可能认为你可以做类似的事情
val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))
然后将一些函数应用于数组数组以获得串联结果。但是,如果不求助于 UDF,我一直无法找到执行此操作的方法。
在这种情况下,在分组之前爆炸就可以了:
val grouped = df2.groupBy($"key").agg(collect_list($"list").as("collected"))
.groupBy($"key").agg(collect_list($"listItem").as("collected"))
.withColumn("concat", concat_ws(",",$"collected")).show(false)
+---+---------------+-----------+
|key|collected |concat |
+---+---------------+-----------+
|123|[abc, def, hij]|abc,def,hij|
+---+---------------+-----------+
但请注意,无法保证列表的收集顺序。
希望对您有所帮助
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(("foo", List(null,"bar",null)),
("bar", List("one","two",null)),
("rio", List("Ria","","Kevin")))).toDF("key", "value")
val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0)
val rsult = filtd.select($"*").groupBy($"key").agg(collect_list("val"))
rsult.show(5)
您可以像这样添加多个条件
val filtd = df.select($"key" as "key", explode($"value") as "val").where (length($"val") > 0 && $"val".isNotNull)
输出
+---+-----------------+
|key|collect_list(val)|
+---+-----------------+
|bar| [one, two]|
|rio| [Ria, Kevin]|
|foo| [bar]|
+---+-----------------+