根据 pyspark 中子事务的聚合创建新行
Create new rows based on aggregate of child transactions in pyspark
我在 pyspark
中有一个数据框,如下所示
df = spark.createDataFrame(
[
("ABC",1809,'100.00'),
("ABC",851,'89.00'),
("ABC",852, '10.00'),
("BBC",1810,'10.00'),
("BBC",951,'8.90'),
("BBC",852, '1.00'),
("XXX",1810,'10.00'),
("XXX",951,'9.00'),
("XXX",852, '1.00')],
("EMC","TRAN_CODES", "amount"))
df.show()
+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
+---+----+------+
规则:
1) 1809 和 1810 是父代码
2) 851 和 852 是与 1809 关联的子代码
3) 951 和 852 是与 1810
关联的子代码
A Child code can be assoiciated with many parent codes.
下面我想做
1) Find agg amount of child codes based on `EMC`.
2) Check if the agg child amount is equal to parent code value
3) if it is equal then do nothing
4) if it is not equal then add the difference as new records with Tran_codes as `999`
预期的最终数据帧
+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
|ABC| 999| 1.00|
|BBC| 999| 0.10|
+---+----+------+
我不知道我应该做什么
differences = (
df.groupBy("EMC")
.agg(
F.sum(F.when(F.col("TRAN") > 1000, F.col("amount"))
.otherwise(-F.col("amount")).alias("amount"))
.select(F.col("EMC"), F.lit(999).alias("TRAN"), F.col("amount"))
)
result = df.unionByName(differences)
我已经使用条件 F.col("TRAN") > 1000
来判断它是 parent 还是 child,如果您需要其他逻辑,请更改它。
我在 pyspark
中有一个数据框,如下所示
df = spark.createDataFrame(
[
("ABC",1809,'100.00'),
("ABC",851,'89.00'),
("ABC",852, '10.00'),
("BBC",1810,'10.00'),
("BBC",951,'8.90'),
("BBC",852, '1.00'),
("XXX",1810,'10.00'),
("XXX",951,'9.00'),
("XXX",852, '1.00')],
("EMC","TRAN_CODES", "amount"))
df.show()
+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
+---+----+------+
规则:
1) 1809 和 1810 是父代码
2) 851 和 852 是与 1809 关联的子代码
3) 951 和 852 是与 1810
A Child code can be assoiciated with many parent codes.
下面我想做
1) Find agg amount of child codes based on `EMC`.
2) Check if the agg child amount is equal to parent code value
3) if it is equal then do nothing
4) if it is not equal then add the difference as new records with Tran_codes as `999`
预期的最终数据帧
+---+----+------+
|EMC|TRAN|amount|
+---+----+------+
|ABC|1809|100.00|
|ABC| 851| 89.00|
|ABC| 852| 10.00|
|BBC|1810| 10.00|
|BBC| 951| 8.90|
|BBC| 852| 1.00|
|XXX|1810| 10.00|
|XXX| 951| 9.00|
|XXX| 852| 1.00|
|ABC| 999| 1.00|
|BBC| 999| 0.10|
+---+----+------+
我不知道我应该做什么
differences = (
df.groupBy("EMC")
.agg(
F.sum(F.when(F.col("TRAN") > 1000, F.col("amount"))
.otherwise(-F.col("amount")).alias("amount"))
.select(F.col("EMC"), F.lit(999).alias("TRAN"), F.col("amount"))
)
result = df.unionByName(differences)
我已经使用条件 F.col("TRAN") > 1000
来判断它是 parent 还是 child,如果您需要其他逻辑,请更改它。