如何使用数据框的数据创建聚合列,然后使用 pyspark 中的另一个数据框扩展行?
How do I use a dataframe's data in creating a aggregated column then expanding rows using another dataframe in pyspark?
我有一个数据框,可以为不同级别的各种产品提供资金。这是一个宽数据框,显示了从 2021 年 1 月 1 日到 2021 年 12 月 31 日的资金(Funding_Start_date
和 Funding_End_Date
格式 yyyyMMdd
)
funding_data = [
(20210101,20211231,"Family","Cars","Audi","A4", 420.0, 12345, "Lump_Sum", 50000)
]
funding_schema = StructType([ \
StructField("Funding_Start_Date",IntegerType(),True), \
StructField("Funding_End_Date",IntegerType(),True), \
StructField("Funding_Level",StringType(),True), \
StructField("Type", StringType(), True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True), \
StructField("SKU_ID", IntegerType(), True), \
StructField("Allocation_Basis", StringType(), True), \
StructField("Amount", IntegerType(), True) \
])
funding_df = spark.createDataFrame(data=funding_data,schema=funding_schema)
funding_df.show()
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum|50000|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
我希望每一天的资金都有一行,每天 Amount
取决于以下因素:
a sale has been made on that day at that Funding_Level
我在日期和 SKU 级别有销售额 table。
sales_data = [
(20210105,352210,"Cars","Audi","A4", 420.0, 1),
(20210106,352207,"Cars","Audi","A4", 420.0, 5),
(20210106,352196,"Cars","Audi","A4", 420.0, 2),
(20210109,352212,"Cars","Audi","A4", 420.0, 3),
(20210112,352212,"Cars","Audi","A4", 420.0, 1),
(20210112,352212,"Cars","Audi","A4", 420.0, 2),
(20210112,352212,"Cars","BMW","X6", 325.0, 2),
(20210126,352196,"Cars","Audi","A4", 420.0, 1),
]
sales_schema = StructType([ \
StructField("DATE_ID",IntegerType(),True), \
StructField("SKU_ID",IntegerType(),True), \
StructField("Type",StringType(),True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True),
StructField("Quantity", IntegerType(), True)
])
sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)
sales_df.show()
+--------+------+----+-----+---------+------+--------+
| DATE_ID|SKU_ID|Type|Brand|Brand_Low|Family|Quantity|
+--------+------+----+-----+---------+------+--------+
|20210105|352210|Cars| Audi| A4| 420.0| 1|
|20210106|352207|Cars| Audi| A4| 420.0| 5|
|20210106|352196|Cars| Audi| A4| 420.0| 2|
|20210109|352212|Cars| Audi| A4| 420.0| 3|
|20210112|352212|Cars| Audi| A4| 420.0| 1|
|20210112|352212|Cars| Audi| A4| 420.0| 2|
|20210112|352212|Cars| BMW| X6| 325.0| 2|
|20210126|352196|Cars| Audi| A4| 420.0| 1|
+--------+------+----+-----+---------+------+--------+
这会告诉我有 5 个独特的日子,其中 Family
列为 420.0 的产品已售出。
sales_df.filter(col('Family') == 420.0).select('DATE_ID').distinct().show()
+--------+
| DATE_ID|
+--------+
|20210112|
|20210109|
|20210105|
|20210106|
|20210126|
+--------+
所以 Lumpsum/Day
将是 50000/5 = 10000
所以我正在尝试获得这样的最终数据框:
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
| DATE_ID|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount|Lumpsum/Day|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210105| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210106| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210109| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210112| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210126| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
我已经尝试过 UDF,但我无法通过其中的 sales_df
来计算天数并将其除以 Lump_Sum
数量,因为 UDF 不接受数据帧。
如何从上面的两个数据帧得到这个最终的数据帧?
根据family
和Funding_Start_Date
和Funding_End_Date
找到Lumpsum/Day
:
- 将
Funding_Start_Date
、Funding_End_Date
和 DATE_ID
转换为 DateType
。
- Select 不同于
sales_df
. 的 DATE_ID
和 Family
- 连接
funding_df
和 sales_df
使得 DATE_ID
在 Funding_Start_Date
和 Funding_End_Date
之间并且 Family
相同。
- 对
Funding_Start_Date
、Funding_End_Date
和 Family
应用 count
window 聚合来查找销售天数。
- 将
Amount
除以第 4 步的结果得到 Lumpsum/Day
。
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
funding_data = [
(20210101,20211231,"Family","Cars","Audi","A4", 420.0, 12345, "Lump_Sum", 50000)
]
funding_schema = StructType([ \
StructField("Funding_Start_Date",IntegerType(),True), \
StructField("Funding_End_Date",IntegerType(),True), \
StructField("Funding_Level",StringType(),True), \
StructField("Type", StringType(), True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True), \
StructField("SKU_ID", IntegerType(), True), \
StructField("Allocation_Basis", StringType(), True), \
StructField("Amount", IntegerType(), True) \
])
funding_df = spark.createDataFrame(data=funding_data,schema=funding_schema)
# STEP 1
funding_df = (funding_df.withColumn("Funding_Start_Date", F.to_date(F.col("Funding_Start_Date").cast("string"), "yyyyMMdd"))
.withColumn("Funding_End_Date", F.to_date(F.col("Funding_End_Date").cast("string"), "yyyyMMdd")))
sales_data = [
(20210105,352210,"Cars","Audi","A4", 420.0, 1),
(20210106,352207,"Cars","Audi","A4", 420.0, 5),
(20210106,352196,"Cars","Audi","A4", 420.0, 2),
(20210109,352212,"Cars","Audi","A4", 420.0, 3),
(20210112,352212,"Cars","Audi","A4", 420.0, 1),
(20210112,352212,"Cars","Audi","A4", 420.0, 2),
(20210112,352212,"Cars","BMW","X6", 325.0, 2),
(20210126,352196,"Cars","Audi","A4", 420.0, 1),
]
sales_schema = StructType([ \
StructField("DATE_ID",IntegerType(),True), \
StructField("SKU_ID",IntegerType(),True), \
StructField("Type",StringType(),True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True),
StructField("Quantity", IntegerType(), True)
])
sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)
# STEP 1
sales_df = sales_df.withColumn("DATE_ID", F.to_date(F.col("DATE_ID").cast("string"), "yyyyMMdd"))
# STEP 2
sales_df = sales_df.select("DATE_ID", "Family").distinct()
# STEP 3
joined_df = funding_df.join(sales_df, (sales_df["DATE_ID"].between(funding_df["Funding_Start_Date"], funding_df["Funding_End_Date"]) & (funding_df["Family"] == sales_df["Family"])))
joined_df = joined_df.select(*[funding_df[c] for c in funding_df.columns], "DATE_ID")
# STEP 4 and 5
ws = Window.partitionBy("Funding_Start_Date", "Funding_End_Date", "Family").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
(joined_df.withColumn("Lumpsum/Day", F.col("amount") / (F.count("DATE_ID").over(ws)))
.withColumn("Funding_Start_Date", F.date_format("Funding_Start_Date", "yyyyMMdd").cast("int"))
.withColumn("Funding_End_Date", F.date_format("Funding_End_Date", "yyyyMMdd").cast("int"))
.withColumn("DATE_ID", F.date_format("DATE_ID", "yyyyMMdd").cast("int"))
).show()
输出
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+
|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount| DATE_ID|Lumpsum/Day|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210106| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210112| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210126| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210105| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210109| 10000.0|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+
我有一个数据框,可以为不同级别的各种产品提供资金。这是一个宽数据框,显示了从 2021 年 1 月 1 日到 2021 年 12 月 31 日的资金(Funding_Start_date
和 Funding_End_Date
格式 yyyyMMdd
)
funding_data = [
(20210101,20211231,"Family","Cars","Audi","A4", 420.0, 12345, "Lump_Sum", 50000)
]
funding_schema = StructType([ \
StructField("Funding_Start_Date",IntegerType(),True), \
StructField("Funding_End_Date",IntegerType(),True), \
StructField("Funding_Level",StringType(),True), \
StructField("Type", StringType(), True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True), \
StructField("SKU_ID", IntegerType(), True), \
StructField("Allocation_Basis", StringType(), True), \
StructField("Amount", IntegerType(), True) \
])
funding_df = spark.createDataFrame(data=funding_data,schema=funding_schema)
funding_df.show()
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum|50000|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+
我希望每一天的资金都有一行,每天 Amount
取决于以下因素:
a sale has been made on that day at that
Funding_Level
我在日期和 SKU 级别有销售额 table。
sales_data = [
(20210105,352210,"Cars","Audi","A4", 420.0, 1),
(20210106,352207,"Cars","Audi","A4", 420.0, 5),
(20210106,352196,"Cars","Audi","A4", 420.0, 2),
(20210109,352212,"Cars","Audi","A4", 420.0, 3),
(20210112,352212,"Cars","Audi","A4", 420.0, 1),
(20210112,352212,"Cars","Audi","A4", 420.0, 2),
(20210112,352212,"Cars","BMW","X6", 325.0, 2),
(20210126,352196,"Cars","Audi","A4", 420.0, 1),
]
sales_schema = StructType([ \
StructField("DATE_ID",IntegerType(),True), \
StructField("SKU_ID",IntegerType(),True), \
StructField("Type",StringType(),True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True),
StructField("Quantity", IntegerType(), True)
])
sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)
sales_df.show()
+--------+------+----+-----+---------+------+--------+
| DATE_ID|SKU_ID|Type|Brand|Brand_Low|Family|Quantity|
+--------+------+----+-----+---------+------+--------+
|20210105|352210|Cars| Audi| A4| 420.0| 1|
|20210106|352207|Cars| Audi| A4| 420.0| 5|
|20210106|352196|Cars| Audi| A4| 420.0| 2|
|20210109|352212|Cars| Audi| A4| 420.0| 3|
|20210112|352212|Cars| Audi| A4| 420.0| 1|
|20210112|352212|Cars| Audi| A4| 420.0| 2|
|20210112|352212|Cars| BMW| X6| 325.0| 2|
|20210126|352196|Cars| Audi| A4| 420.0| 1|
+--------+------+----+-----+---------+------+--------+
这会告诉我有 5 个独特的日子,其中 Family
列为 420.0 的产品已售出。
sales_df.filter(col('Family') == 420.0).select('DATE_ID').distinct().show()
+--------+
| DATE_ID|
+--------+
|20210112|
|20210109|
|20210105|
|20210106|
|20210126|
+--------+
所以 Lumpsum/Day
将是 50000/5 = 10000
所以我正在尝试获得这样的最终数据框:
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
| DATE_ID|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount|Lumpsum/Day|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210105| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210106| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210109| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210112| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
|20210126| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000| 10000|
+--------+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+-----------+
我已经尝试过 UDF,但我无法通过其中的 sales_df
来计算天数并将其除以 Lump_Sum
数量,因为 UDF 不接受数据帧。
如何从上面的两个数据帧得到这个最终的数据帧?
根据family
和Funding_Start_Date
和Funding_End_Date
找到Lumpsum/Day
:
- 将
Funding_Start_Date
、Funding_End_Date
和DATE_ID
转换为DateType
。 - Select 不同于
sales_df
. 的 - 连接
funding_df
和sales_df
使得DATE_ID
在Funding_Start_Date
和Funding_End_Date
之间并且Family
相同。 - 对
Funding_Start_Date
、Funding_End_Date
和Family
应用count
window 聚合来查找销售天数。 - 将
Amount
除以第 4 步的结果得到Lumpsum/Day
。
DATE_ID
和 Family
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
funding_data = [
(20210101,20211231,"Family","Cars","Audi","A4", 420.0, 12345, "Lump_Sum", 50000)
]
funding_schema = StructType([ \
StructField("Funding_Start_Date",IntegerType(),True), \
StructField("Funding_End_Date",IntegerType(),True), \
StructField("Funding_Level",StringType(),True), \
StructField("Type", StringType(), True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True), \
StructField("SKU_ID", IntegerType(), True), \
StructField("Allocation_Basis", StringType(), True), \
StructField("Amount", IntegerType(), True) \
])
funding_df = spark.createDataFrame(data=funding_data,schema=funding_schema)
# STEP 1
funding_df = (funding_df.withColumn("Funding_Start_Date", F.to_date(F.col("Funding_Start_Date").cast("string"), "yyyyMMdd"))
.withColumn("Funding_End_Date", F.to_date(F.col("Funding_End_Date").cast("string"), "yyyyMMdd")))
sales_data = [
(20210105,352210,"Cars","Audi","A4", 420.0, 1),
(20210106,352207,"Cars","Audi","A4", 420.0, 5),
(20210106,352196,"Cars","Audi","A4", 420.0, 2),
(20210109,352212,"Cars","Audi","A4", 420.0, 3),
(20210112,352212,"Cars","Audi","A4", 420.0, 1),
(20210112,352212,"Cars","Audi","A4", 420.0, 2),
(20210112,352212,"Cars","BMW","X6", 325.0, 2),
(20210126,352196,"Cars","Audi","A4", 420.0, 1),
]
sales_schema = StructType([ \
StructField("DATE_ID",IntegerType(),True), \
StructField("SKU_ID",IntegerType(),True), \
StructField("Type",StringType(),True), \
StructField("Brand", StringType(), True), \
StructField("Brand_Low", StringType(), True), \
StructField("Family", FloatType(), True),
StructField("Quantity", IntegerType(), True)
])
sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)
# STEP 1
sales_df = sales_df.withColumn("DATE_ID", F.to_date(F.col("DATE_ID").cast("string"), "yyyyMMdd"))
# STEP 2
sales_df = sales_df.select("DATE_ID", "Family").distinct()
# STEP 3
joined_df = funding_df.join(sales_df, (sales_df["DATE_ID"].between(funding_df["Funding_Start_Date"], funding_df["Funding_End_Date"]) & (funding_df["Family"] == sales_df["Family"])))
joined_df = joined_df.select(*[funding_df[c] for c in funding_df.columns], "DATE_ID")
# STEP 4 and 5
ws = Window.partitionBy("Funding_Start_Date", "Funding_End_Date", "Family").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
(joined_df.withColumn("Lumpsum/Day", F.col("amount") / (F.count("DATE_ID").over(ws)))
.withColumn("Funding_Start_Date", F.date_format("Funding_Start_Date", "yyyyMMdd").cast("int"))
.withColumn("Funding_End_Date", F.date_format("Funding_End_Date", "yyyyMMdd").cast("int"))
.withColumn("DATE_ID", F.date_format("DATE_ID", "yyyyMMdd").cast("int"))
).show()
输出
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+
|Funding_Start_Date|Funding_End_Date|Funding_Level|Type|Brand|Brand_Low|Family|SKU_ID|Allocation_Basis|Amount| DATE_ID|Lumpsum/Day|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210106| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210112| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210126| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210105| 10000.0|
| 20210101| 20211231| Family|Cars| Audi| A4| 420.0| 12345| Lump_Sum| 50000|20210109| 10000.0|
+------------------+----------------+-------------+----+-----+---------+------+------+----------------+------+--------+-----------+