withColumn 没有在 pyspark 中使用 groupby 给出预期结果

withColumn is not giving expected result with groupby in pyspark

我有一个如下所示的数据框

 +----------+---------------+---------+-----------------+----------------------------+
|CustomerNo|TransactionDate|SKUItemID|one_day_back_date|last_12month_date_from_trans|
+----------+---------------+---------+-----------------+----------------------------+
|   10080.0|     2020-08-04|  1297636|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297637|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297638|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297639|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297640|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297642|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297643|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297644|       2020-08-03|                  2019-08-04|
|   10080.0|     2020-08-04|  1297645|       2020-08-03|                  2019-08-04|
|   10080.0|     2018-06-26|    33559|       2020-08-03|                  2017-06-26|
|   10080.0|     2018-07-03|    36725|       2020-08-03|                  2017-07-03|
|   10080.0|     2018-07-03|    36726|       2020-08-03|                  2017-07-03|
|   10080.0|     2018-07-03|    36727|       2020-08-03|                  2017-07-03|
|   10080.0|     2018-07-03|    36728|       2020-08-03|                  2017-07-03|
|  216988.0|     2019-12-24|   812294|       2019-12-23|                  2018-12-24|
|  216988.0|     2019-12-24|   812298|       2019-12-23|                  2018-12-24|
+----------+---------------+---------+-----------------+----------------------------+

我需要根据我在以下代码中使用的日期条件获取总项目数

c=x.withColumn('total_items',(F.col('TransactionDate')<F.col('one_day_back_date')) & (F.col('TransactionDate') >= F.col('last_12month_date_from_trans'))) \
.groupBy('CustomerNo').agg(F.count('SKUItemID').alias('total_items'))

我需要为客户 10080.0 得到“5”作为 total_items,但是当我应用上面的代码时,我得到“14”作为 total_items。我也使用了过滤器,但这在应用于大型数据集时不会给出所有行

我得到的输出

+----------+-----------+
|CustomerNo|total_items|
+----------+-----------+
|   10080.0|         14|
+----------+-----------+

期望的输出:

+----------+-----------+
|CustomerNo|total_items|
+----------+-----------+
|   10080.0|         5 
   216988.0          0
+----------+-----------+

谁能告诉我哪里做错了?

您正在创建一个新列 total_items,但您没有将其用于任何用途。我认为您应该改为应用过滤器。类似

c=x.where((F.col('TransactionDate')<F.col('one_day_back_date')) & (F.col('TransactionDate') >= F.col('last_12month_date_from_trans'))) \
.groupBy('CustomerNo').agg(F.count('SKUItemID').alias('total_items'))

这将取满足您条件的行,然后计算每组中的记录数。

要同时获得零计数,您可以执行以下操作:

c=(x.withColumn('toCount', 
                F.when( (F.col('TransactionDate')<F.col('one_day_back_date')) 
                       & 
                        (F.col('TransactionDate') >= F.col('last_12month_date_from_trans'))
                       , 1)
                .otherwise(0) 
               )
   .groupBy('CustomerNo')
   .agg(F.sum('toCount').alias('total_items')))

它将在 toCount 列中用 1 标记要计数的项目,并对 toCount 列 pr 求和。组.