当密钥不同时如何在spark scala中执行分组
How to perform grouping in spark scala when key is not same
我想计算二级账户对应金额的总和,并与一级账户进行比较。在以下示例中,以“643”开头的帐号是主帐号,之后出现的帐号是其副帐号。再次出现另一个主帐户,它以“643”开头,后面是它的辅助帐户。我想对包含主要帐户和次要帐户的记录进行分组,并计算次要帐户的金额总和。
输入:
账户、金额
643100、10000----主账号
234100、4000----二级账户
231300、1000----二级账户
136400、5000----二级账户
643841、20000----下一组
562100, 10000
432176, 10000
643304、40000----下一组
124562, 20000
234567, 5000
输出:
账户,金额,sumofsecamounts
643100, 10000, 10000
643841, 20000, 20000
643304, 40000, 25000
您有一些问题需要解决。
- 您无法保证插入时的顺序。我在尝试模仿您的问题时遇到了这个问题,因此不得不添加一列以确保我的数据看起来像您的数据。
- 如果您的 table 确实已经有此订单,那么您可能没问题。
- 您需要一个列来对数据进行排序,window 才能正常工作。这里我们将使用monotonically_increasing_id,来帮助我们得到那个列。
- 您需要按主账户对数据进行分区
- 您可以通过对子帐户使用 0 的数学技巧来解决这个问题,这样如果您进行滚动求和,它们就不会更改总和,这有助于对数据进行分区。
.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
spark.sql("create table accounts ( Account int, Amount int, order int)") // I had to add order so my data would land in the format you had.
spark.sql("insert into accounts values ( 643100, 10000,1 ), (234100, 4000,2),(231300, 1000,3),(136400, 5000,4),(643841, 20000,5),(562100, 10000,6),(432176, 10000,7),(643304, 40000,8 ),(124562, 20000,9),(234567, 5000,10)")
val increasing = spark.sql("select * from accounts order by order").withColumn("monotonically_increasing_id", monotonically_increasing_id()) // we need a column to order by a column for windows so this give us an ordered field to use. Link to documentation below.
val winowSpec = Window.partitionBy().orderBy("monotonically_increasing_id")
increasing
.withColumn("is_Primary",
when( col("Account") > 643000, col("Account") )
.otherwise( 0 ) // This identifies primary & secondary accounts
).withColumn("partition",
sum("is_Primary").over(winowSpec) // rolling sum Trick that partitions data by nature of the fact 0's denote secondary.
).groupBy(
col("partition"), //groups primary and secondary
col("is_Primary") //splits primary vs secondary totals.
).agg( sum("Amount") ).show()
+---------+----------+-----------+
|partition|is_Primary|sum(Amount)|
+---------+----------+-----------+
| 643100| 643100| 10000|
| 643100| 0| 10000|
| 1286941| 643841| 20000|
| 1286941| 0| 20000|
| 1930245| 643304| 40000|
| 1930245| 0| 25000|
+---------+----------+-----------+
一旦你有了这个,如果你真的想要你描述的行,你可以轻松地自行加入 table。
我想计算二级账户对应金额的总和,并与一级账户进行比较。在以下示例中,以“643”开头的帐号是主帐号,之后出现的帐号是其副帐号。再次出现另一个主帐户,它以“643”开头,后面是它的辅助帐户。我想对包含主要帐户和次要帐户的记录进行分组,并计算次要帐户的金额总和。
输入: 账户、金额 643100、10000----主账号 234100、4000----二级账户 231300、1000----二级账户 136400、5000----二级账户 643841、20000----下一组 562100, 10000 432176, 10000 643304、40000----下一组 124562, 20000 234567, 5000
输出: 账户,金额,sumofsecamounts 643100, 10000, 10000 643841, 20000, 20000 643304, 40000, 25000
您有一些问题需要解决。
- 您无法保证插入时的顺序。我在尝试模仿您的问题时遇到了这个问题,因此不得不添加一列以确保我的数据看起来像您的数据。
- 如果您的 table 确实已经有此订单,那么您可能没问题。
- 您需要一个列来对数据进行排序,window 才能正常工作。这里我们将使用monotonically_increasing_id,来帮助我们得到那个列。
- 您需要按主账户对数据进行分区
- 您可以通过对子帐户使用 0 的数学技巧来解决这个问题,这样如果您进行滚动求和,它们就不会更改总和,这有助于对数据进行分区。
.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
spark.sql("create table accounts ( Account int, Amount int, order int)") // I had to add order so my data would land in the format you had.
spark.sql("insert into accounts values ( 643100, 10000,1 ), (234100, 4000,2),(231300, 1000,3),(136400, 5000,4),(643841, 20000,5),(562100, 10000,6),(432176, 10000,7),(643304, 40000,8 ),(124562, 20000,9),(234567, 5000,10)")
val increasing = spark.sql("select * from accounts order by order").withColumn("monotonically_increasing_id", monotonically_increasing_id()) // we need a column to order by a column for windows so this give us an ordered field to use. Link to documentation below.
val winowSpec = Window.partitionBy().orderBy("monotonically_increasing_id")
increasing
.withColumn("is_Primary",
when( col("Account") > 643000, col("Account") )
.otherwise( 0 ) // This identifies primary & secondary accounts
).withColumn("partition",
sum("is_Primary").over(winowSpec) // rolling sum Trick that partitions data by nature of the fact 0's denote secondary.
).groupBy(
col("partition"), //groups primary and secondary
col("is_Primary") //splits primary vs secondary totals.
).agg( sum("Amount") ).show()
+---------+----------+-----------+
|partition|is_Primary|sum(Amount)|
+---------+----------+-----------+
| 643100| 643100| 10000|
| 643100| 0| 10000|
| 1286941| 643841| 20000|
| 1286941| 0| 20000|
| 1930245| 643304| 40000|
| 1930245| 0| 25000|
+---------+----------+-----------+
一旦你有了这个,如果你真的想要你描述的行,你可以轻松地自行加入 table。