聚合函数 Pyspark Dataframe 中的错误

Error in aggregate function Pyspark Dataframe

我需要有关 Pyspark Dataframe 中聚合函数的帮助。我需要根据 'buy' 或 'sell'.

来计算客户的费用

如果buy表示我应该从信用额度中扣除金额,如果sell表示我应该将金额添加到信用额度中

下面是我的table

+----------+-----------------+------+----------+----------------+
|account_id|credit_card_limit|amount|      date|transaction_code|
+----------+-----------------+------+----------+----------------+
|     12345|             1000|   400|01/06/2020|             buy|
|     12345|             1000|   100|02/06/2020|             buy|
|     12345|             1000|   500|02/06/2020|            sell|
|     12345|             1000|   200|03/06/2020|             buy|
|     22332|             2000|  1000|02/06/2020|             buy|
|     22332|             2000|   200|03/06/2020|             buy|
+----------+-----------------+------+----------+----------------+

我尝试了一个代码,但它没有给我正确的 results.Below 是我的代码

w = Window.partitionBy(f.lit(0)).orderBy('date')
finaldf=df.groupBy('account_id','credit_card_limit','date').agg(f.sum(f.when(f.col('transaction_code')=='buy',-f.col('amount')).\
              otherwise(f.col('amount'))).alias('expenses')).\
    select('*',(f.col('credit_card_limit')+f.sum(f.col('expenses')).over(w)).alias('credit_left'))

我得到的输出:

    +----------+-----------------+----------+--------+-----------+
    |account_id|credit_card_limit|      date|expenses|credit_left|
    +----------+-----------------+----------+--------+-----------+
    |     12345|             1000|01/06/2020|    -400|        600|
    |     12345|             1000|02/06/2020|     400|          0|
    |     12345|             1000|03/06/2020|    -200|       -400|
    |     22332|             2000|02/06/2020|   -1000|       1000|
    |     22332|             2000|03/06/2020|    -200|        800|
    +----------+-----------------+----------+--------+-----------+

在这里你可以看到 credit_left 列有错误的答案。

预期输出:

    +----------+-----------------+----------+--------+-----------+
    |account_id|credit_card_limit|      date|expenses|credit_left|
    +----------+-----------------+----------+--------+-----------+
    |     12345|             1000|01/06/2020|    -400|        600|
    |     12345|             1000|02/06/2020|     400|       1000|
    |     12345|             1000|03/06/2020|    -200|        800|
    |     22332|             2000|02/06/2020|   -1000|       1000|
    |     22332|             2000|03/06/2020|    -200|        800|
    +----------+-----------------+----------+--------+-----------+

我还需要credit_left到credit_card_limit,如果值超过credit_limit.Please帮助我解决这个问题。非常感谢!!

我假设根据逻辑和预期答案,对于日期为 03/06/2020 的帐户 22332,credicardlimit 是 1000。请试试这个,如果有效请告诉我。

df = spark.sql("""
select 12345 as account_id, 1000 as credit_card_limit, 400 as amount, '01/06/2020' as date, 'buy' as  transaction_code
union                                                                                                                                                                                                   
select 12345 as account_id, 1000 as credit_card_limit, 100 as amount, '02/06/2020' as date, 'buy' as  transaction_code
union                                                                                                                                                                                                   
select 12345 as account_id, 1000 as credit_card_limit, 500 as amount, '02/06/2020' as date, 'sell' as  transaction_code
union                                                                                                                                                                                                   
select 12345 as account_id, 1000 as credit_card_limit, 200 as amount, '03/06/2020' as date, 'buy' as  transaction_code
union                                                                                                                                                                                                   
select 22332 as account_id, 2000 as credit_card_limit, 1000 as amount, '02/06/2020' as date, 'buy' as  transaction_code
union
select 22332 as account_id, 1000 as credit_card_limit, 200 as amount, '03/06/2020' as date, 'buy' as  transaction_code
""").orderBy("account_id","date")

df.show()
# source data
# +----------+-----------------+------+----------+----------------+
# |account_id|credit_card_limit|amount|      date|transaction_code|
# +----------+-----------------+------+----------+----------------+
# |     12345|             1000|   400|01/06/2020|             buy|
# |     12345|             1000|   100|02/06/2020|             buy|
# |     12345|             1000|   500|02/06/2020|            sell|
# |     12345|             1000|   200|03/06/2020|             buy|
# |     22332|             2000|  1000|02/06/2020|             buy|
# |     22332|             1000|   200|03/06/2020|             buy|
# +----------+-----------------+------+----------+----------------+

df.createOrReplaceTempView("tmp1")

data1 = spark.sql("""select  account_id,
        credit_card_limit,
        amount, 
        date,
        transaction_code,
        lead(amount) over(partition by account_id order by date) as lead_amt,
        case when transaction_code = 'buy' then -1 * amount else amount end as amount_modified 
from tmp1
order by account_id,date
""")
data1.show()
# +----------+-----------------+------+----------+----------------+--------+---------------+
# |account_id|credit_card_limit|amount|      date|transaction_code|lead_amt|amount_modified|
# +----------+-----------------+------+----------+----------------+--------+---------------+
# |     12345|             1000|   400|01/06/2020|             buy|     100|           -400|
# |     12345|             1000|   100|02/06/2020|             buy|     500|           -100|
# |     12345|             1000|   500|02/06/2020|            sell|     200|            500|
# |     12345|             1000|   200|03/06/2020|             buy|    null|           -200|
# |     22332|             2000|  1000|02/06/2020|             buy|     200|          -1000|
# |     22332|             1000|   200|03/06/2020|             buy|    null|           -200|
# +----------+-----------------+------+----------+----------------+--------+---------------+

data1.createOrReplaceTempView("tmp2")

data2 = spark.sql("""
select account_id,
        credit_card_limit,
        date,
        sum(amount_modified) as expenses,
        case when (credit_card_limit + sum(amount_modified)) > credit_card_limit 
             then credit_card_limit else (credit_card_limit + sum(amount_modified)) 
        end as credit_left
from tmp2
group by account_id, credit_card_limit, date 
order by account_id, date
""")

data2.show()

# +----------+-----------------+----------+--------+-----------+
# |account_id|credit_card_limit|      date|expenses|credit_left|
# +----------+-----------------+----------+--------+-----------+
# |     12345|             1000|01/06/2020|    -400|        600|
# |     12345|             1000|02/06/2020|     400|       1000|
# |     12345|             1000|03/06/2020|    -200|        800|
# |     22332|             2000|02/06/2020|   -1000|       1000|
# |     22332|             1000|03/06/2020|    -200|        800|
# +----------+-----------------+----------+--------+-----------+

我认为您需要将 window 更改为 :

w = Window.partitionBy(f.col("account_id")).orderBy('date')

那么你的代码就可以工作了:

w = Window.partitionBy(f.col("account_id")).orderBy('date')

finaldf = (df.groupBy('account_id','credit_card_limit','date')
                .agg(f.sum(f.when(f.col('transaction_code')=='buy',-f.col('amount'))
              .otherwise(f.col('amount'))).alias('expenses')).
    select('*',(f.col('credit_card_limit')+f.sum(f.col('expenses')).over(w))
                                                      .alias('credit_left')))
finaldf.show()

finaldf.show()

+----------+-----------------+----------+--------+-----------+
|account_id|credit_card_limit|      date|expenses|credit_left|
+----------+-----------------+----------+--------+-----------+
|     12345|             1000|01/06/2020|    -400|        600|
|     12345|             1000|02/06/2020|     400|       1000|
|     12345|             1000|03/06/2020|    -200|        800|
|     22332|             2000|02/06/2020|   -1000|       1000|
|     22332|             2000|03/06/2020|    -200|        800|
+----------+-----------------+----------+--------+-----------+