当密钥不同时如何在spark scala中执行分组

How to perform grouping in spark scala when key is not same

我想计算二级账户对应金额的总和,并与一级账户进行比较。在以下示例中,以“643”开头的帐号是主帐号,之后出现的帐号是其副帐号。再次出现另一个主帐户,它以“643”开头,后面是它的辅助帐户。我想对包含主要帐户和次要帐户的记录进行分组,并计算次要帐户的金额总和。

输入: 账户、金额 643100、10000----主账号 234100、4000----二级账户 231300、1000----二级账户 136400、5000----二级账户 643841、20000----下一组 562100, 10000 432176, 10000 643304、40000----下一组 124562, 20000 234567, 5000

输出: 账户,金额,sumofsecamounts 643100, 10000, 10000 643841, 20000, 20000 643304, 40000, 25000

您有一些问题需要解决。

  1. 您无法保证插入时的顺序。我在尝试模仿您的问题时遇到了这个问题,因此不得不添加一列以确保我的数据看起来像您的数据。
    1. 如果您的 table 确实已经有此订单,那么您可能没问题。
  2. 您需要一个列来对数据进行排序,window 才能正常工作。这里我们将使用monotonically_increasing_id,来帮助我们得到那个列。
  3. 您需要按主账户对数据进行分区
    1. 您可以通过对子帐户使用 0 的数学技巧来解决这个问题,这样如果您进行滚动求和,它们就不会更改总和,这有助于对数据进行分区。

.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

spark.sql("create table accounts ( Account int, Amount int, order int)") // I had to add order so my data would land in the format you had.
spark.sql("insert into accounts values ( 643100, 10000,1 ), (234100, 4000,2),(231300, 1000,3),(136400, 5000,4),(643841, 20000,5),(562100, 10000,6),(432176, 10000,7),(643304, 40000,8 ),(124562, 20000,9),(234567, 5000,10)")
val increasing = spark.sql("select * from accounts order by order").withColumn("monotonically_increasing_id", monotonically_increasing_id()) // we need a column to order by a column for windows so this give us an ordered field to use. Link to documentation below.
val winowSpec = Window.partitionBy().orderBy("monotonically_increasing_id")
increasing
.withColumn("is_Primary", 
  when( col("Account") > 643000, col("Account") )
  .otherwise( 0 ) // This identifies primary & secondary accounts
).withColumn("partition", 
  sum("is_Primary").over(winowSpec)  // rolling sum Trick that partitions data by nature of the fact 0's denote secondary.
).groupBy( 
  col("partition"), //groups primary and secondary
  col("is_Primary") //splits primary vs secondary totals.
).agg( sum("Amount") ).show()
+---------+----------+-----------+
|partition|is_Primary|sum(Amount)|
+---------+----------+-----------+
|   643100|    643100|      10000|
|   643100|         0|      10000|
|  1286941|    643841|      20000|
|  1286941|         0|      20000|
|  1930245|    643304|      40000|
|  1930245|         0|      25000|
+---------+----------+-----------+

一旦你有了这个,如果你真的想要你描述的行,你可以轻松地自行加入 table。

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id