如何计算数据框中的总价

How to count Total Price in dataframe

我有零售数据,我从中创建了零售数据框

spark.sparkContext.addFile('https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/retail-data/all/online-retail-dataset.csv')

retail_df = spark.read.csv(SparkFiles.get('online-retail-dataset.csv'), header=True, inferSchema=True)\
  .withColumn('OverallItems', struct('StockCode', 'Description', 'UnitPrice', 'Quantity', 'InvoiceDate','CustomerID', 'Country'))

然后我创建了 retail_array,它有两列 InvoiceNoItems

retail_array = retail_df.groupBy('InvoiceNo')\
  .agg(collect_list(col('OverallItems')).alias('Items'))

我想计算发票项目的总价并添加到 retail_array 中的项目列中。

到目前为止我已经写了这段代码:

transformer = lambda x: struct(x['UnitPrice'], x['Quantity'], x['UnitPrice'] * x['Quantity']).cast("struct<UnitPrice:double,Quantity:double,TotalPrice:double>")

TotalPrice_df = retail_array\
  .withColumn('TotalPrice', transform("items", transformer))

TotalPrice_df.show(truncate=False)

但使用此代码我m adding to retail_arraynew column, but I want this new column to be part of items column inretail_array`。 一张发票项目的输出如下:

--+
|InvoiceNo|Items|TotalPrice|
+---------+---------------------------------------------------------------------------------------
|536366   |[{22633, HAND WARMER UNION JACK, 1.85, 6, 12/1/2010 8:28, 17850, United Kingdom}, {22632, HAND WARMER RED POLKA DOT, 1.85, 6, 12/1/2010 8:28, 17850, United Kingdom}|[{1.85, 6.0, 11.100000000000001}, {1.85, 6.0, 11.100000000000001}] 

我希望它计数 11.100000000000001 + 11.100000000000001 并将其添加到没有额外列的项目列中。此外,对于其他发票项目,有时我想将两个以上的总价相加。

使用 aggregate 而不是 transform 函数来计算总价,如下所示:

from pyspark.sql import functions as F

retail_array = retail_df.groupBy("InvoiceNo").agg(
    F.collect_list(F.col("OverallItems")).alias("Items")
).withColumn(
    "TotalPrice", 
    F.aggregate("items", F.lit(.0), lambda acc, x: acc + (x["Quantity"] * x["UnitPrice"]))
)

但是请注意,当您收集结构列表时,您实际上可以在同一聚合中计算此 TotalPrice,从而避免通过迭代数组元素进行额外计算:

retail_array = retail_df.groupBy("InvoiceNo").agg(
    F.collect_list(F.col("OverallItems")).alias("Items"),
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("TotalPrice")
)

retail_array.show(1)
#+---------+--------------------+------------------+
#|InvoiceNo|               Items|        TotalPrice|
#+---------+--------------------+------------------+
#|   536366|[{22633, HAND WAR...|22.200000000000003|
#+---------+--------------------+------------------+

But with this code I'm adding to retail_array new column, but I want this new column to be part of items column in retail_array

请注意,我正确理解了这部分内容。 Items 列是一个结构数组,在每个列中复制 InvoiceNo 的总价没有多大意义 它的项目。

也就是说,如果你真的想这样做,你可以在计算总价后使用transform(上面的步骤):

result = retail_array.withColumn(
    "Items",
    F.transform("Items", lambda x: x.withField("TotalPrice", F.col("TotalPrice")))
).drop("TotalPrice")

result.show(1, False)

#+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|InvoiceNo|Items                                                                                                                                                                                                        |
#+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|536366   |[{22633, HAND WARMER UNION JACK, 1.85, 6, 12/1/2010 8:28, 17850, United Kingdom, 22.200000000000003}, {22632, HAND WARMER RED POLKA DOT, 1.85, 6, 12/1/2010 8:28, 17850, United Kingdom, 22.200000000000003}]|
#+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+