在 apache spark sql 的数据框中使用具有相同域的多个列的特定情况下避免多重连接
Avoiding multiple joins in a specific case with multiple columns with the same domain in dataframes of apache spark sql
我被要求在 apache spark sql (java api) 中通过数据帧做一些事情,我认为如果按照天真的方法执行的话会花费很多 (我仍在使用天真的方法,但我认为这会花费很多,因为它至少需要 4 种连接)。
我得到了以下数据框:
+----+----+----+----+----+----------+------+
| C1| C2| C3| C4| C5|UNIQUE KEY|points|
+----+----+----+----+----+----------+------+
| A| A|null|null|null| 1234| 2|
| A|null|null| H|null| 1235| 3|
| A| B|null|null|null| 1236| 3|
| B|null|null|null| E| 1237| 1|
| C|null|null| G|null| 1238| 1|
| F|null| C| E|null| 1239| 2|
|null|null| D| E| G| 1240| 1|
+----+----+----+----+----+----------+------+
C1、C2、C3、C4 和 C5 具有相同的域值,唯一键是唯一键,points 是一个整数,对于其对应的 C 列的每个不同值(例如,对于第一行 A,A,null,null,null,key,2 与 A,null,null,null,null,key,2 或 A,A,A,A,null,key,2)[=16 相同=]
我被要求 "for each existing C value get the total number of points"。
所以输出应该是:
+----+------+
| C1|points|
+----+------+
| A| 8|
| B| 4|
| C| 3|
| D| 1|
| E| 4|
| F| 2|
| G| 2|
| H| 3|
+----+------+
我打算通过简单的 .select("C1","point")
、.select("C2","point")
等将数据框分成多个小的数据框(1 列用于 C 列,1 列用于点)。但我相信如果数据量真的很大,那真的会花费很多,我相信应该有一些通过 map reduce 的技巧,但我自己找不到,因为我对这一切还很陌生世界。我想我遗漏了一些关于如何应用 map reduce 的概念。
我还考虑过使用函数 explode
,我想将 [C1、C2、C3、C4、C5] 放在一个列中,然后使用 explode,这样我每行得到 5 行,然后我只是按键分组......但我相信这会在某个时候增加数据量,如果我们谈论的是 GB,这可能不可行......我希望你能找到我正在寻找的技巧.
感谢您的宝贵时间。
使用 explode
可能是解决此问题的方法。它不会增加数据量,并且与使用多个 join
相比计算效率更高(请注意,单个 join
本身就是一项昂贵的操作)。
在这种情况下,您可以将列转换为数组,只保留每个单独行的唯一值。然后可以分解此数组并过滤掉所有空值。在这一点上,一个简单的 groupBy
和 sum 会给你想要的结果。
在 Scala 中:
df.select(explode(array_distinct(array("C1", "C2", "C3", "C4", "C5"))).as("C1"), $"points")
.filter($"C1".isNotNull)
.groupBy($"C1)
.agg(sum($"points").as("points"))
.sort($"C1") // not really necessary
这会给你想要的结果:
+----+------+
| C1|points|
+----+------+
| A| 8|
| B| 4|
| C| 3|
| D| 1|
| E| 4|
| F| 2|
| G| 2|
| H| 3|
+----+------+
我被要求在 apache spark sql (java api) 中通过数据帧做一些事情,我认为如果按照天真的方法执行的话会花费很多 (我仍在使用天真的方法,但我认为这会花费很多,因为它至少需要 4 种连接)。
我得到了以下数据框:
+----+----+----+----+----+----------+------+
| C1| C2| C3| C4| C5|UNIQUE KEY|points|
+----+----+----+----+----+----------+------+
| A| A|null|null|null| 1234| 2|
| A|null|null| H|null| 1235| 3|
| A| B|null|null|null| 1236| 3|
| B|null|null|null| E| 1237| 1|
| C|null|null| G|null| 1238| 1|
| F|null| C| E|null| 1239| 2|
|null|null| D| E| G| 1240| 1|
+----+----+----+----+----+----------+------+
C1、C2、C3、C4 和 C5 具有相同的域值,唯一键是唯一键,points 是一个整数,对于其对应的 C 列的每个不同值(例如,对于第一行 A,A,null,null,null,key,2 与 A,null,null,null,null,key,2 或 A,A,A,A,null,key,2)[=16 相同=]
我被要求 "for each existing C value get the total number of points"。
所以输出应该是:
+----+------+
| C1|points|
+----+------+
| A| 8|
| B| 4|
| C| 3|
| D| 1|
| E| 4|
| F| 2|
| G| 2|
| H| 3|
+----+------+
我打算通过简单的 .select("C1","point")
、.select("C2","point")
等将数据框分成多个小的数据框(1 列用于 C 列,1 列用于点)。但我相信如果数据量真的很大,那真的会花费很多,我相信应该有一些通过 map reduce 的技巧,但我自己找不到,因为我对这一切还很陌生世界。我想我遗漏了一些关于如何应用 map reduce 的概念。
我还考虑过使用函数 explode
,我想将 [C1、C2、C3、C4、C5] 放在一个列中,然后使用 explode,这样我每行得到 5 行,然后我只是按键分组......但我相信这会在某个时候增加数据量,如果我们谈论的是 GB,这可能不可行......我希望你能找到我正在寻找的技巧.
感谢您的宝贵时间。
使用 explode
可能是解决此问题的方法。它不会增加数据量,并且与使用多个 join
相比计算效率更高(请注意,单个 join
本身就是一项昂贵的操作)。
在这种情况下,您可以将列转换为数组,只保留每个单独行的唯一值。然后可以分解此数组并过滤掉所有空值。在这一点上,一个简单的 groupBy
和 sum 会给你想要的结果。
在 Scala 中:
df.select(explode(array_distinct(array("C1", "C2", "C3", "C4", "C5"))).as("C1"), $"points")
.filter($"C1".isNotNull)
.groupBy($"C1)
.agg(sum($"points").as("points"))
.sort($"C1") // not really necessary
这会给你想要的结果:
+----+------+
| C1|points|
+----+------+
| A| 8|
| B| 4|
| C| 3|
| D| 1|
| E| 4|
| F| 2|
| G| 2|
| H| 3|
+----+------+