如何在 Scala 中使用 Spark 处理 redistribution/allocation 算法

How to handle redistribution/allocation algorithm using Spark in Scala

假设我在全国各地有一群企鹅,我需要为企鹅分配食物供应(也分布在全国各地)。

我试图将问题简化为解决:

输入

按区域划分的企鹅分布,按邻近程度分组并按优先顺序排列

+------------+------+-------+--------------------------------------+----------+
| PENGUIN ID | AERA | GROUP | PRIORITY (lower are allocated first) | QUANTITY |
+------------+------+-------+--------------------------------------+----------+
| P1         | A    | A1    |                                    1 |        5 |
| P2         | A    | A1    |                                    2 |        5 |
| P3         | A    | A2    |                                    1 |        5 |
| P4         | B    | B1    |                                    1 |        5 |
| P5         | B    | B2    |                                    1 |        5 |
+------------+------+-------+--------------------------------------+----------+

按地区分布的食物,也按接近程度分组并优先为

+---------+------+-------+--------------------------------------+----------+
| FOOD ID | AERA | GROUP | PRIORITY (lower are allocated first) | QUANTITY |
+---------+------+-------+--------------------------------------+----------+
| F1      | A    | A1    |                                    2 |        5 |
| F2      | A    | A1    |                                    1 |        2 |
| F3      | A    | A2    |                                    1 |        7 |
| F4      | B    | B1    |                                    1 |        7 |
+---------+------+-------+--------------------------------------+----------+

预期输出

挑战是先把食物分配给同组的企鹅,尊重食物和企鹅的优先顺序,然后把剩下的食物带到另一个区域。

因此,根据以上数据,我们将首先在同一区域和组内进行分配:

第一阶段:A1(同区同组)

+------+-------+---------+------------+--------------------+
| AREA | GROUP | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+-------+---------+------------+--------------------+
| A    | A1    | F2      | P1         |                  2 |
| A    | A1    | F1      | P1         |                  3 |
| A    | A1    | F1      | P2         |                  2 |
| A    | A1    | X       | P2         |                  3 |
+------+-------+---------+------------+--------------------+

第一阶段:A2(同区同组)

+------+-------+---------+------------+--------------------+
| AREA | GROUP | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+-------+---------+------------+--------------------+
| A    | A2    | F3      | P3         |                  5 |
| A    | A2    | F3      | X          |                  2 |
+------+-------+---------+------------+--------------------+

阶段 2:A(相同区域,阶段 1:A2 剩下的食物现在可以运送到阶段 1:A1 企鹅)

+------+---------+------------+--------------------+
| AREA | FOOD ID | PINGUIN ID | ALLOCATED_QUANTITY |
+------+---------+------------+--------------------+
| A    | F2      | P1         |                  2 |
| A    | F1      | P1         |                  3 |
| A    | F1      | P2         |                  2 |
| A    | F3      | P3         |                  5 |
| A    | F3      | P2         |                  2 |
| A    | X       | P2         |                  1 |
+------+---------+------------+--------------------+

然后我们继续对第 3 阶段(通过 AERA)、第 4 阶段(通过 AERA2(乘火车))执行相同的操作,这是与 AERA(通过卡车)不同的地理切割,所以我们不能只是重新合计), 5...

我试过的

我非常熟悉如何使用简单的 R 代码高效地完成它,使用一堆 For 循环、数组指针并为每个分配逐行创建输出。然而,对于 Spark/Scala,我最终只能得到大而 none 高效的代码来解决这样一个简单的问题,我想接触社区,因为它可能只是我错过了一个 spark 功能。

我可以使用大量的火花行转换来做到这一点,如 [withColumn,groupby,agg(sum),join,union,filters] 但是 DAG 创建最终变得如此之大以至于开始减慢 DAG 构建5/6 阶段后上升。我可以通过在每个阶段后将输出保存为文件来解决这个问题,但后来我遇到了 IO 问题,因为我每个阶段都有数百万条记录要保存。

我也可以做到这一点 运行 每个阶段的一个 UDAF(使用 .split() 缓冲区),分解结果然后连接回原始 table 以更新每个阶段的每个数量。它确实使 DAG 的构建变得更加简单和快速,但不幸的是,由于 UDAF 内部的字符串操作,它对于少数分区来说太慢了。

最后感觉上面两种方法都不对,因为它们更像是hack,必须有更简单的方法来解决这个问题。理想情况下,我更喜欢使用转换而不是放松惰性评估,因为这只是许多其他转换中的一个步骤

非常感谢您的宝贵时间。我很乐意讨论任何建议的方法。

这是psuedocode/description,但我对第 1 阶段的解决方案。这个问题很有趣,我觉得你描述得很好。

我的想法是使用 spark 的 window、struct、collect_list(可能还有 sortWithinPartitions)、累积和和滞后得到这样的结果:

C1   C2  C3  C4    C5   C6                C7   | C8
P1 | A | A1 | 5  | 0 | [(F1,2), (F2,7)] | [F2] | 2
P1 | A | A1 | 10 | 5 | [(F1,2), (F2,7)] | []   | -3

C4 = cumulative sum of quantity, grouped by area/group, ordered by priority
C5 = lag of C4 down a row, and null = 0
C6 = structure of food / quantity, with a cumulative sum of food quantity
C7/C8 = remaining food/food ids

现在您可以使用普通 udf 来 return 属于企鹅的食物组数组,因为您可以找到 C5 < C6.quantity 的第一个实例和 [=12 的第一个实例=].介于两者之间的所有内容都是 returned。如果 C4 永远不会大于 C6.quantity,那么您可以追加 X。展开这个数组的结果将得到所有的企鹅,如果企鹅没有食物。

要确定是否有额外的食物,你可以有一个 udf 来计算每一行的 "remaining food" 的数量,并使用 window 和 row_number 来获得最后一个被喂食的区域。如果remaining food > 0,那些食物id有剩余的食物,会反映到数组中,也可以做成struct映射到剩余食物的个数。

我想最后我仍然在做相当多的聚合,但希望将一些东西组合到数组中可以更快地对每个单独的项目进行比较。