spark sql:如何在组级别实现数据帧的并行处理,但在每个组中,我们需要顺序处理行
spark sql : How to achieve parallel processing of dataframe at group level but with in each group, we require sequential processing of rows
- 对数据框应用分组。假设它产生了 100 个组,每个组有 10 行。
- 我有一个函数必须应用于每个组。它可以并行方式和以任何顺序发生(即,spark 可以自行决定以任何顺序选择任何组执行)。
- 但是对于in group,我需要保证对行的顺序处理。因为在处理组中的每一行之后,我在处理组中剩余的任何行时使用输出。
我们采用了以下方法,其中一切都在驱动程序上 运行 并且无法利用 spark 集群的跨节点并行处理(正如预期的那样,性能非常糟糕)
1) 将主DF分解成多个dataframes,放在一个数组中:
val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))
2) 循环遍历数据帧并传递给方法进行处理,从上面的数据帧逐行处理:
df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
row => AllocOneOutput.allocOneOutput(row)}
)
我们正在寻找的是并行和顺序处理的结合。
组级别的并行处理。因为,这些都是独立的组,可以并行化。
在每个组中,行必须按顺序逐行处理,这对我们的用例非常重要。
sample Data
在 SECURITY_ID、CC、BU、MPU 上应用分组,从上面给我们 2 个组(SECID_1、CC_A、BU_A、MPU_A和 SECID_2,CC_A,BU_A,MPU_A).
在优先级矩阵的帮助下(只有一个 ref table 用于将排名分配给行),我们将每个组转置到下面:
Transposed Data
以上组中的每一行都有一个优先级,并按该顺序排序。现在,我想通过将它们传递给函数来逐行处理每一行并获得如下输出:
output
用例的详细说明:
- 基础数据框包含一家金融公司的所有交易头寸数据。一些客户买入(多头)给定的金融产品(由 securityId 唯一标识),一些客户卖出(空头)它们。
- 我们应用的思路是identify/pair给定securityId中的多头头寸和空头头寸。
- 由于这种配对发生在一个 securityId 中,我们说基础数据帧根据这个 securityId 分成组,每个组可以独立处理。
- 为什么我们要在组内寻找顺序处理?这是因为,当给定组中有许多多头头寸和许多空头头寸时(如示例数据),参考 table(优先级矩阵)决定哪个多头头寸必须与哪个空头头寸配对。基本上,它给出了处理顺序。
- 第二个原因是,当给定多头数量和空头数量
不相等则剩余数量符合配对条件。
即,如果剩余数量多,则可以与下一个配对
按优先级或副级在组中可用的短缺数量
相反。
- 由于 4 和 5 中提到的原因,我们希望在一个组中逐行处理。
以上几点使用下面的数据集进行描述。
基础数据帧
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
4| secId2| Acc3| -25|
5| secId2| Acc3| -25|
基础数据帧按securityID分组划分。让我们使用 secId 组如下
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
在上述情况下,正数 100 可以与 -50 或 -25 配对。为了打破平局,以下参考 table 称为优先级矩阵有助于定义顺序。
+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
Acc1| Acc3| 1|
Acc1| Acc2| 2|
所以,从上面的矩阵我们知道第 1 行和第 3 行将首先配对,然后第 1 行和第 2 行。这就是我们正在谈论的顺序(顺序处理)。让我们现在将它们配对如下:
+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
1| secId| Acc1| +100| 3| secId| Acc3| -25|
1| secId| Acc1| +100| 2| secId| Acc2| -150|
在第 2 行之后处理第 1 行会发生什么情况? (这就是我们需要的)
1.After 处理第 1 行 - Acc1 中的位置将为 (100 - 25) = 75 Acc3 中的位置将为 0。Acc1 中更新的位置 75 现在将用于处理第二行。
2.After 处理第 2 行 - Acc1 中的位置将为 0 。 Acc2 中的位置将为 (75-150) -75.
结果数据框:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -75|
3| secId| Acc3| 0|
在第 1 行之后处理第 2 行会发生什么情况? (我们不想要这个)
- 处理第 2 行后 - Acc1 中的位置将为 0 Acc2 中的位置将为 (100-150) -50。 Acc1 中的更新位置为 0 现在将用于处理第一行。
- 处理第 1 行后 - Acc1 中的位置将为 0。Acc3 中的位置将保持不变,为 -25。
结果数据框:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -50|
3| secId| Acc3| -25|
正如你在上面看到的,一组中的处理顺序决定了我们的输出
我也想问 - 为什么 spark 不支持在一段数据帧中进行顺序处理?我们是说我们需要集群的并行处理能力。这就是为什么我们将数据框分成几组,并要求集群在这些组上并行应用逻辑。我们要说的是,如果该组有 100 行,那么让这 100 行按顺序依次处理。 spark 不支持这个吗?
如果不是,那么大数据中还有哪些其他技术可以帮助实现这一目标?
替代实现:
- 将数据帧分成与组数一样多的分区(在我们的例子中是 50000。组更多,但任何组中的行不超过几百)。
- 运行 'ForeachPartition' 对数据帧的操作,其中逻辑独立地跨分区执行。
- 将每个分区的处理输出写入集群。
- 处理完整个数据帧后,一个单独的作业将从步骤 3 中读取这些单独的文件并写入单个 file/dataframe。
我怀疑数千个分区是否有用,但我想知道这种方法听起来是否不错。
这个概念在这个规则之前已经足够好了:
- Second reason is that, when a given long quantity and short quantity are not equal then the residual quantity is eligible for
pairing. i.e., if long quantity is left, then it can be paired with
the next short quantity available in the group as per the priority or
vice versa.
这是因为您需要迭代,使用依赖逻辑循环,这很难用更面向流的 Spark 进行编码。
I also worked on a project where everything was stated - do it in Big
Data with Spark, scala or pyspark. Being an architect as well as coder,
I looked at the algorithm for something similar to your area, but not
quite, in which for commodities all the periods for a set of data
points needed to be classified as bull, bear, or not. Like your
algorithm, but still different, I did not know what amount of looping
to do up-front. In fact I needed to do something, then decide to
repeat that something to the left and to the right of a period I had
marked as either bull or bear or nothing, potentially. Termination
conditions were required. See picture below. Sort of like a 'flat' binary tree
traversal until all paths exhausted. Not that Spark-ish.
I actually - for academic purposes - solved my specific situation in
Spark, but it was an academic exercise. The point to the matter is
that this type of processing - my example and your example are a poor
fit for Spark. We did these calculation in ORACLE and simply sqooped
the results to Hadoop datastore.
因此,我的建议是您不要在 Spark 中尝试这样做,因为它不够适合用例。相信我,它会变得混乱。老实说,我很快就意识到这种处理方式是个问题。但是刚开始的时候,查询是一个常见的方面。
- 对数据框应用分组。假设它产生了 100 个组,每个组有 10 行。
- 我有一个函数必须应用于每个组。它可以并行方式和以任何顺序发生(即,spark 可以自行决定以任何顺序选择任何组执行)。
- 但是对于in group,我需要保证对行的顺序处理。因为在处理组中的每一行之后,我在处理组中剩余的任何行时使用输出。
我们采用了以下方法,其中一切都在驱动程序上 运行 并且无法利用 spark 集群的跨节点并行处理(正如预期的那样,性能非常糟糕)
1) 将主DF分解成多个dataframes,放在一个数组中:
val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))
2) 循环遍历数据帧并传递给方法进行处理,从上面的数据帧逐行处理:
df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
row => AllocOneOutput.allocOneOutput(row)}
)
我们正在寻找的是并行和顺序处理的结合。
组级别的并行处理。因为,这些都是独立的组,可以并行化。
在每个组中,行必须按顺序逐行处理,这对我们的用例非常重要。
sample Data
在 SECURITY_ID、CC、BU、MPU 上应用分组,从上面给我们 2 个组(SECID_1、CC_A、BU_A、MPU_A和 SECID_2,CC_A,BU_A,MPU_A).
在优先级矩阵的帮助下(只有一个 ref table 用于将排名分配给行),我们将每个组转置到下面:
Transposed Data
以上组中的每一行都有一个优先级,并按该顺序排序。现在,我想通过将它们传递给函数来逐行处理每一行并获得如下输出:
output
用例的详细说明:
- 基础数据框包含一家金融公司的所有交易头寸数据。一些客户买入(多头)给定的金融产品(由 securityId 唯一标识),一些客户卖出(空头)它们。
- 我们应用的思路是identify/pair给定securityId中的多头头寸和空头头寸。
- 由于这种配对发生在一个 securityId 中,我们说基础数据帧根据这个 securityId 分成组,每个组可以独立处理。
- 为什么我们要在组内寻找顺序处理?这是因为,当给定组中有许多多头头寸和许多空头头寸时(如示例数据),参考 table(优先级矩阵)决定哪个多头头寸必须与哪个空头头寸配对。基本上,它给出了处理顺序。
- 第二个原因是,当给定多头数量和空头数量 不相等则剩余数量符合配对条件。 即,如果剩余数量多,则可以与下一个配对 按优先级或副级在组中可用的短缺数量 相反。
- 由于 4 和 5 中提到的原因,我们希望在一个组中逐行处理。
以上几点使用下面的数据集进行描述。
基础数据帧
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
4| secId2| Acc3| -25|
5| secId2| Acc3| -25|
基础数据帧按securityID分组划分。让我们使用 secId 组如下
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
在上述情况下,正数 100 可以与 -50 或 -25 配对。为了打破平局,以下参考 table 称为优先级矩阵有助于定义顺序。
+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
Acc1| Acc3| 1|
Acc1| Acc2| 2|
所以,从上面的矩阵我们知道第 1 行和第 3 行将首先配对,然后第 1 行和第 2 行。这就是我们正在谈论的顺序(顺序处理)。让我们现在将它们配对如下:
+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
1| secId| Acc1| +100| 3| secId| Acc3| -25|
1| secId| Acc1| +100| 2| secId| Acc2| -150|
在第 2 行之后处理第 1 行会发生什么情况? (这就是我们需要的)
1.After 处理第 1 行 - Acc1 中的位置将为 (100 - 25) = 75 Acc3 中的位置将为 0。Acc1 中更新的位置 75 现在将用于处理第二行。
2.After 处理第 2 行 - Acc1 中的位置将为 0 。 Acc2 中的位置将为 (75-150) -75.
结果数据框:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -75|
3| secId| Acc3| 0|
在第 1 行之后处理第 2 行会发生什么情况? (我们不想要这个)
- 处理第 2 行后 - Acc1 中的位置将为 0 Acc2 中的位置将为 (100-150) -50。 Acc1 中的更新位置为 0 现在将用于处理第一行。
- 处理第 1 行后 - Acc1 中的位置将为 0。Acc3 中的位置将保持不变,为 -25。
结果数据框:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -50|
3| secId| Acc3| -25|
正如你在上面看到的,一组中的处理顺序决定了我们的输出
我也想问 - 为什么 spark 不支持在一段数据帧中进行顺序处理?我们是说我们需要集群的并行处理能力。这就是为什么我们将数据框分成几组,并要求集群在这些组上并行应用逻辑。我们要说的是,如果该组有 100 行,那么让这 100 行按顺序依次处理。 spark 不支持这个吗?
如果不是,那么大数据中还有哪些其他技术可以帮助实现这一目标?
替代实现:
- 将数据帧分成与组数一样多的分区(在我们的例子中是 50000。组更多,但任何组中的行不超过几百)。
- 运行 'ForeachPartition' 对数据帧的操作,其中逻辑独立地跨分区执行。
- 将每个分区的处理输出写入集群。
- 处理完整个数据帧后,一个单独的作业将从步骤 3 中读取这些单独的文件并写入单个 file/dataframe。
我怀疑数千个分区是否有用,但我想知道这种方法听起来是否不错。
这个概念在这个规则之前已经足够好了:
- Second reason is that, when a given long quantity and short quantity are not equal then the residual quantity is eligible for pairing. i.e., if long quantity is left, then it can be paired with the next short quantity available in the group as per the priority or vice versa.
这是因为您需要迭代,使用依赖逻辑循环,这很难用更面向流的 Spark 进行编码。
I also worked on a project where everything was stated - do it in Big Data with Spark, scala or pyspark. Being an architect as well as coder, I looked at the algorithm for something similar to your area, but not quite, in which for commodities all the periods for a set of data points needed to be classified as bull, bear, or not. Like your algorithm, but still different, I did not know what amount of looping to do up-front. In fact I needed to do something, then decide to repeat that something to the left and to the right of a period I had marked as either bull or bear or nothing, potentially. Termination conditions were required. See picture below. Sort of like a 'flat' binary tree traversal until all paths exhausted. Not that Spark-ish.
I actually - for academic purposes - solved my specific situation in Spark, but it was an academic exercise. The point to the matter is that this type of processing - my example and your example are a poor fit for Spark. We did these calculation in ORACLE and simply sqooped the results to Hadoop datastore.
因此,我的建议是您不要在 Spark 中尝试这样做,因为它不够适合用例。相信我,它会变得混乱。老实说,我很快就意识到这种处理方式是个问题。但是刚开始的时候,查询是一个常见的方面。