在 spark 数据帧中聚合期间过滤数组值
Filter array values during aggregation in spark dataframe
我正在对以下数据框执行聚合以获取具有一系列品牌的广告商列表
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
这是我的代码:
import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))
这给了我以下数据框:
+------------+----------------+
|advertiser |brands |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+
在汇总期间,我想用以下 table 个品牌过滤品牌列表:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
为了实现:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
+------------+--------+
针对您的问题,我看到了两个解决方案,我将调用 收集解决方案 和 加入解决方案
收集解决方案
如果您可以收集 brands
数据框,则可以使用此收集的集合在执行 collect_list
时仅保留正确的品牌,然后 flatten
您的数组并将空数组替换为 null
如下:
import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
val filteredBrands = brands.select("brand").collect().map(_.getString(0))
val finalDataframe = df2
.groupBy("advertiser")
.agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
.withColumn("brands", flatten(col("brands")))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
加入解决方案
如果您的 brands
数据框不适合内存,您可以先将 df2
与 brands
左连接,如果品牌在 [=17] 中,则有一个包含品牌的列=] dataframe,否则 null
,然后进行分组,最后替换空数组,因为广告商没有您要按 null
:
过滤的品牌
import org.apache.spark.sql.functions.{col, collect_list}
val finalDataframe = df2
.join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
.groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
详情
因此,如果我们从 df2
数据框开始,如下所示:
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
和一个 brands
数据框如下:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
在 df2
和 brands
数据帧(第一行)之间的第一个左外连接之后,您将获得以下数据帧:
+------------+------+--------------+
|advertiser |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1 |
|Advertiser 1|Brand2|null |
|Advertiser 2|Brand3|Brand3 |
|Advertiser 2|Brand4|null |
|Advertiser 3|Brand5|null |
|Advertiser 3|Brand6|null |
+------------+------+--------------+
当您按广告商对这个数据框进行分组时,收集过滤后的品牌列表,您会得到以下数据框:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[] |
|Advertiser 1|[Brand1]|
+------------+--------+
最后,当您应用最后一行将空数组替换为 null 时,您会得到预期的结果:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
|Advertiser 1|[Brand1]|
+------------+--------+
结论
Collect Solution 仅创建一个昂贵的 suffle 步骤(在 groupBy 期间),如果您的 brands
数据框较小,应优先选择。 Join 解决方案 如果您的 brands
数据框很大,但它会产生很多昂贵的 suffle 步骤,一个 groupBy 和一个 join。
我正在对以下数据框执行聚合以获取具有一系列品牌的广告商列表
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
这是我的代码:
import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))
这给了我以下数据框:
+------------+----------------+
|advertiser |brands |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+
在汇总期间,我想用以下 table 个品牌过滤品牌列表:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
为了实现:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
+------------+--------+
针对您的问题,我看到了两个解决方案,我将调用 收集解决方案 和 加入解决方案
收集解决方案
如果您可以收集 brands
数据框,则可以使用此收集的集合在执行 collect_list
时仅保留正确的品牌,然后 flatten
您的数组并将空数组替换为 null
如下:
import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
val filteredBrands = brands.select("brand").collect().map(_.getString(0))
val finalDataframe = df2
.groupBy("advertiser")
.agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
.withColumn("brands", flatten(col("brands")))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
加入解决方案
如果您的 brands
数据框不适合内存,您可以先将 df2
与 brands
左连接,如果品牌在 [=17] 中,则有一个包含品牌的列=] dataframe,否则 null
,然后进行分组,最后替换空数组,因为广告商没有您要按 null
:
import org.apache.spark.sql.functions.{col, collect_list}
val finalDataframe = df2
.join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
.groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
.withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
详情
因此,如果我们从 df2
数据框开始,如下所示:
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
和一个 brands
数据框如下:
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
在 df2
和 brands
数据帧(第一行)之间的第一个左外连接之后,您将获得以下数据帧:
+------------+------+--------------+
|advertiser |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1 |
|Advertiser 1|Brand2|null |
|Advertiser 2|Brand3|Brand3 |
|Advertiser 2|Brand4|null |
|Advertiser 3|Brand5|null |
|Advertiser 3|Brand6|null |
+------------+------+--------------+
当您按广告商对这个数据框进行分组时,收集过滤后的品牌列表,您会得到以下数据框:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[] |
|Advertiser 1|[Brand1]|
+------------+--------+
最后,当您应用最后一行将空数组替换为 null 时,您会得到预期的结果:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
|Advertiser 1|[Brand1]|
+------------+--------+
结论
Collect Solution 仅创建一个昂贵的 suffle 步骤(在 groupBy 期间),如果您的 brands
数据框较小,应优先选择。 Join 解决方案 如果您的 brands
数据框很大,但它会产生很多昂贵的 suffle 步骤,一个 groupBy 和一个 join。