根据 pyspark 中子事务的聚合创建新行

Create new rows based on aggregate of child transactions in pyspark

我在 pyspark 中有一个数据框,如下所示

df = spark.createDataFrame(
[
("ABC",1809,'100.00'),
("ABC",851,'89.00'),
("ABC",852, '10.00'),
("BBC",1810,'10.00'),
("BBC",951,'8.90'),
("BBC",852, '1.00'),
("XXX",1810,'10.00'),
("XXX",951,'9.00'),
("XXX",852, '1.00')], 
("EMC","TRAN_CODES", "amount"))

df.show()

+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951|  8.90|
|BBC| 852|  1.00|
|XXX|1810| 10.00|
|XXX| 951|  9.00|
|XXX| 852|  1.00|
+---+----+------+

规则:
1) 1809 和 1810 是父代码 2) 851 和 852 是与 1809 关联的子代码 3) 951 和 852 是与 1810

关联的子代码
A Child code can be assoiciated with many parent codes.

下面我想做

1) Find agg amount of child codes based on `EMC`.
2) Check if the agg child amount is equal to parent code value
3) if it is equal then do nothing
4) if it is not equal then add the difference as new records with Tran_codes as `999`

预期的最终数据帧

+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951|  8.90|
|BBC| 852|  1.00|
|XXX|1810| 10.00|
|XXX| 951|  9.00|
|XXX| 852|  1.00|
|ABC| 999|  1.00|
|BBC| 999|  0.10|
+---+----+------+

我不知道我应该做什么

differences = (
    df.groupBy("EMC")
      .agg(
         F.sum(F.when(F.col("TRAN") > 1000, F.col("amount"))
             .otherwise(-F.col("amount")).alias("amount"))
      .select(F.col("EMC"), F.lit(999).alias("TRAN"), F.col("amount"))
    )

result = df.unionByName(differences)

我已经使用条件 F.col("TRAN") > 1000 来判断它是 parent 还是 child,如果您需要其他逻辑,请更改它。