Pyspark:如何解决复杂的dataframe逻辑加join
Pyspark: how to solve complicated dataframe logic plus join
我有两个数据框要处理,第一个看起来像下面这样 df1
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("total_time", IntegerType(), True) ])
df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\
('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\
('2020-08-05','111','1','11010',1),('2020-08-06','111','1','11010',-1)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+--------+------------+--------+----------+
| Date|store_id|warehouse_id|class_id|total_time|
+----------+--------+------------+--------+----------+
|2020-08-01| 110| 1| 11010| 3|
|2020-08-02| 110| 1| 11010| 2|
|2020-08-03| 110| 1| 11010| 3|
|2020-08-04| 110| 1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1|
|2020-08-06| 111| 1| 11010| -1|
+----------+--------+------------+--------+----------+
我计算了一个叫做arrival_date
的东西
#To calculate the arrival_date
#logic : add the Date + total_time so in first row, 2020-08-01 +3 would give me 2020-08-04
#if total_time is -1 then return blank
df1= df1.withColumn('arrival_date', F.when(col('total_time') != -1, expr("date_add(date, total_time)"))
.otherwise(''))
+----------+--------+------------+--------+----------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|
+----------+--------+------------+--------+----------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06|
|2020-08-06| 111| 1| 11010| -1| |
+----------+--------+------------+--------+----------+------------+
而我要计算的是这个..
#to calculate the transit_date
#if arrival_date is same, ex) 2020-08-04 is repeated 2 or more times, then take min("Date")
#which will be 2020-08-01 otherwise just return the Date ex) 2020-08-07 would just return 2020-08-04
#we need to care about cloth_id too, we have arrival_date = 2020-08-06 repeated 2 times as well but since
#if one of store_id or warehouse_id is different we treat them separately. so at arrival_date = 2020-08-06 at date = 2020-08-03,
##we must return 2020-08-03
#so we treat them separately when one of (store_id, warehouse_id ) is different.
#*Note* we dont care about class_id, its not effective.
#if arrival_date = blank then leave it as blank..
#so our df would look something like this.
+----------+--------+------------+--------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| 2020-08-01|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| 2020-08-01|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| 2020-08-03|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| 2020-08-04|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| 2020-08-05|
|2020-08-06| 111| 1| 11010| -1| | |
+----------+--------+------------+--------+----------+------------+------------+
接下来,我有 df2 如下所示..
#we have another dataframe call it df2
df2_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("cloth_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("type", StringType(), True),\
StructField("quantity", IntegerType(), True)])
df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\
('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\
('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]
rdd = sc.parallelize(df_data)
df2 = sqlContext.createDataFrame(df_data, df2_schema)
df2 = df2.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df2.show()
+----------+--------+------------+--------+--------+----+--------+
| Date|store_id|warehouse_id|cloth_id|class_id|type|quantity|
+----------+--------+------------+--------+--------+----+--------+
|2020-08-01| 110| 1| M_1| 11010| R| 5|
|2020-08-01| 110| 1| M_1| 11010| R| 2|
|2020-08-02| 110| 1| M_1| 11010| C| 3|
|2020-08-03| 110| 1| M_1| 11010| R| 1|
|2020-08-04| 110| 1| M_1| 11010| R| 3|
|2020-08-05| 111| 1| M_2| 11010| R| 5|
+----------+--------+------------+--------+--------+----+--------+
我计算了 quantity2,这只是数量总和,其中 type=R
df2 =df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\
.agg( F.sum(F.when(col('type')=='R', col('quantity'))\
.otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')
+----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| M_2| 11010| 5|
+----------+--------+------------+--------+--------+---------+
现在我有 df1 和 df2。我想加入这样它看起来像这样......
我试过这样的东西
df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')
df4= df4.filter(" transit_date != '' ")
df4=df4.withColumnRenamed('arrival_date', 'date')
df3 = df2.join(df1, on=['Date','store_id','warehouse_id','class_id'],how='inner').orderBy('Date')
df5 = df3.join(df4, on=['Date','store_id','warehouse_id','class_id'], how='left').orderBy('Date')
但我认为这不是正确的方法....结果 df 应该如下所示..
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
请注意 transit_date 去了 Date = arrival_date
的位置,当然 null 被替换为空白。
最后,如果今天是2020-08-04,那么看一下arrival_date == 2020-08-04 的位置,把数量加起来放在今天。所以....它看起来像这样...其中 store_id = 111,它将有单独的日期。此处未显示.. 因此当 store_id = 111 时逻辑也需要有意义.. 我刚刚展示了 store_id = 110 的示例
这里是df1,
from pyspark.sql import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import builtins as p
df1_schema = StructType(
[
StructField('Date', StringType(), True),
StructField('store_id', StringType(), True),
StructField('warehouse_id', StringType(), True),
StructField('class_id', StringType(), True),
StructField('total_time', IntegerType(), True)
]
)
df1_data = [
('2020-08-01','110','1','11010',3),
('2020-08-02','110','1','11010',2),
('2020-08-03','110','1','11010',3),
('2020-08-04','110','1','11010',3),
('2020-08-05','111','1','11010',1),
('2020-08-06','111','1','11010',-1)
]
df1 = spark.createDataFrame(df1_data, df1_schema)
df1 = df1.withColumn('Date', to_date('Date'))
df1 = df1.withColumn('arrival_date', when(col('total_time') != -1, expr("date_add(date, total_time)")).otherwise(''))
w = Window.partitionBy('arrival_date', 'store_id', 'warehouse_id').orderBy('Date')
df1 = df1.withColumn('transit_date', when(col('total_time') != -1, first('Date').over(w)).otherwise('')).orderBy('Date')
df1.show()
+----------+--------+------------+--------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| 2020-08-01|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| 2020-08-01|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| 2020-08-03|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| 2020-08-04|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| 2020-08-05|
|2020-08-06| 111| 1| 11010| -1| | |
+----------+--------+------------+--------+----------+------------+------------+
和 df2 一样,
df2_schema = StructType(
[
StructField('Date', StringType(), True),
StructField('store_id', StringType(), True),
StructField('warehouse_id', StringType(), True),
StructField('cloth_id', StringType(), True),
StructField('class_id', StringType(), True),
StructField('type', StringType(), True),
StructField('quantity', IntegerType(), True)
]
)
df2_data = [
('2020-08-01','110','1','M_1','11010','R',5),
('2020-08-01','110','1','M_1','11010','R',2),
('2020-08-02','110','1','M_1','11010','C',3),
('2020-08-03','110','1','M_1','11010','R',1),
('2020-08-04','110','1','M_1','11010','R',3),
('2020-08-05','111','1','M_2','11010','R',5)
]
df2 = spark.createDataFrame(df2_data, df2_schema)
df2 = df2.withColumn('Date', to_date('Date'))
df2 = df2.groupBy('Date', 'store_id', 'warehouse_id', 'cloth_id', 'class_id') \
.agg(
sum(
when(col('type') == 'R', col('quantity')).otherwise(0)
).alias('quantity2')
).orderBy('Date')
df2.show()
+----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| M_1| 11010| 0|
|2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| M_2| 11010| 5|
+----------+--------+------------+--------+--------+---------+
最后是连接结果。
df3 = df1.filter('total_time != -1') \
.join(df2, on=['Date', 'store_id', 'warehouse_id', 'class_id'], how='left') \
.drop('Date', 'total_time', 'cloth_id') \
.withColumnRenamed('arrival_date', 'Date')
df4 = df1.drop('transit_date') \
.join(df3, on=['Date', 'store_id', 'warehouse_id', 'class_id'], how='left') \
.groupBy('Date', 'store_id', 'warehouse_id', 'class_id', 'arrival_date', 'transit_date') \
.agg(sum('quantity2').alias('want')) \
.orderBy('Date')
df4.show()
+----------+--------+------------+--------+------------+------------+----+
| Date|store_id|warehouse_id|class_id|arrival_date|transit_date|want|
+----------+--------+------------+--------+------------+------------+----+
|2020-08-01| 110| 1| 11010| 2020-08-04| null|null|
|2020-08-02| 110| 1| 11010| 2020-08-04| null|null|
|2020-08-03| 110| 1| 11010| 2020-08-06| null|null|
|2020-08-04| 110| 1| 11010| 2020-08-07| 2020-08-01| 7|
|2020-08-05| 111| 1| 11010| 2020-08-06| null|null|
|2020-08-06| 111| 1| 11010| | 2020-08-05| 5|
+----------+--------+------------+--------+------------+------------+----+
根据我对你的问题的理解以及你已经拥有以下 df1
和 df2
的地方:
df1.orderBy('Date').show() df2.orderBy('Date').show()
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date| | Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| |2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| |2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| |2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| |2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| |2020-08-05| 111| 1| M_2| 11010| 5|
|2020-08-06| 111| 1| 11010| -1| | +----------+--------+------------+--------+--------+---------+
+----------+--------+------------+--------+----------+------------+
您可以尝试以下5个步骤:
第 1 步: 设置列名列表 grp_cols
以进行连接:
from pyspark.sql import functions as F
grp_cols = ["Date", "store_id", "warehouse_id", "class_id"]
步骤 2: 创建包含 transit_date
的 df3,这是 arrival_date
、store_id
、[= 的每个组合的最小日期22=] 和 class_id
:
df3 = df1.filter('total_time != -1') \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id") \
.agg(F.min('Date').alias('transit_date')) \
.withColumnRenamed("arrival_date", "Date")
df3.orderBy('Date').show()
+----------+--------+------------+--------+------------+
| Date|store_id|warehouse_id|class_id|transit_date|
+----------+--------+------------+--------+------------+
|2020-08-04| 110| 1| 11010| 2020-08-01|
|2020-08-06| 111| 1| 11010| 2020-08-05|
|2020-08-06| 110| 1| 11010| 2020-08-03|
|2020-08-07| 110| 1| 11010| 2020-08-04|
+----------+--------+------------+--------+------------+
第 3 步: 通过将 df2 与 df1 连接来设置 df4,并使用 grp_cols 离开连接 df3,坚持 df4
df4 = df2.join(df1, grp_cols).join(df3, grp_cols, "left") \
.withColumn('transit_date', F.when(F.col('total_time') != -1, F.col("transit_date")).otherwise('')) \
.persist()
_ = df4.count()
df4.orderBy('Date').show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
第 4 步: 从 df4 计算 sum(quantity2) as want
每个 arrival_date
+ store_id
+ warehouse_id
+ class_id
+ cloth_id
df5 = df4 \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id", "cloth_id") \
.agg(F.sum("quantity2").alias("want")) \
.withColumnRenamed("arrival_date", "Date")
df5.orderBy('Date').show()
+----------+--------+------------+--------+--------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|want|
+----------+--------+------------+--------+--------+----+
|2020-08-04| 110| 1| 11010| M_1| 10|
|2020-08-06| 111| 1| 11010| M_2| 5|
|2020-08-06| 110| 1| 11010| M_1| 1|
|2020-08-07| 110| 1| 11010| M_1| 3|
+----------+--------+------------+--------+--------+----+
第 5 步: 通过左连接 df4 和 df5 创建最终数据帧
df_new = df4.join(df5, grp_cols+["cloth_id"], "left").fillna(0, subset=['want'])
df_new.orderBy("Date").show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|want|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null| 0|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null| 0|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null| 0|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01| 10|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null| 0|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
df4.unpersist()
我有两个数据框要处理,第一个看起来像下面这样 df1
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("total_time", IntegerType(), True) ])
df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\
('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\
('2020-08-05','111','1','11010',1),('2020-08-06','111','1','11010',-1)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+--------+------------+--------+----------+
| Date|store_id|warehouse_id|class_id|total_time|
+----------+--------+------------+--------+----------+
|2020-08-01| 110| 1| 11010| 3|
|2020-08-02| 110| 1| 11010| 2|
|2020-08-03| 110| 1| 11010| 3|
|2020-08-04| 110| 1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1|
|2020-08-06| 111| 1| 11010| -1|
+----------+--------+------------+--------+----------+
我计算了一个叫做arrival_date
的东西#To calculate the arrival_date
#logic : add the Date + total_time so in first row, 2020-08-01 +3 would give me 2020-08-04
#if total_time is -1 then return blank
df1= df1.withColumn('arrival_date', F.when(col('total_time') != -1, expr("date_add(date, total_time)"))
.otherwise(''))
+----------+--------+------------+--------+----------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|
+----------+--------+------------+--------+----------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06|
|2020-08-06| 111| 1| 11010| -1| |
+----------+--------+------------+--------+----------+------------+
而我要计算的是这个..
#to calculate the transit_date
#if arrival_date is same, ex) 2020-08-04 is repeated 2 or more times, then take min("Date")
#which will be 2020-08-01 otherwise just return the Date ex) 2020-08-07 would just return 2020-08-04
#we need to care about cloth_id too, we have arrival_date = 2020-08-06 repeated 2 times as well but since
#if one of store_id or warehouse_id is different we treat them separately. so at arrival_date = 2020-08-06 at date = 2020-08-03,
##we must return 2020-08-03
#so we treat them separately when one of (store_id, warehouse_id ) is different.
#*Note* we dont care about class_id, its not effective.
#if arrival_date = blank then leave it as blank..
#so our df would look something like this.
+----------+--------+------------+--------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| 2020-08-01|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| 2020-08-01|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| 2020-08-03|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| 2020-08-04|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| 2020-08-05|
|2020-08-06| 111| 1| 11010| -1| | |
+----------+--------+------------+--------+----------+------------+------------+
接下来,我有 df2 如下所示..
#we have another dataframe call it df2
df2_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("cloth_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("type", StringType(), True),\
StructField("quantity", IntegerType(), True)])
df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\
('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\
('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]
rdd = sc.parallelize(df_data)
df2 = sqlContext.createDataFrame(df_data, df2_schema)
df2 = df2.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df2.show()
+----------+--------+------------+--------+--------+----+--------+
| Date|store_id|warehouse_id|cloth_id|class_id|type|quantity|
+----------+--------+------------+--------+--------+----+--------+
|2020-08-01| 110| 1| M_1| 11010| R| 5|
|2020-08-01| 110| 1| M_1| 11010| R| 2|
|2020-08-02| 110| 1| M_1| 11010| C| 3|
|2020-08-03| 110| 1| M_1| 11010| R| 1|
|2020-08-04| 110| 1| M_1| 11010| R| 3|
|2020-08-05| 111| 1| M_2| 11010| R| 5|
+----------+--------+------------+--------+--------+----+--------+
我计算了 quantity2,这只是数量总和,其中 type=R
df2 =df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\
.agg( F.sum(F.when(col('type')=='R', col('quantity'))\
.otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')
+----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| M_2| 11010| 5|
+----------+--------+------------+--------+--------+---------+
现在我有 df1 和 df2。我想加入这样它看起来像这样...... 我试过这样的东西
df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')
df4= df4.filter(" transit_date != '' ")
df4=df4.withColumnRenamed('arrival_date', 'date')
df3 = df2.join(df1, on=['Date','store_id','warehouse_id','class_id'],how='inner').orderBy('Date')
df5 = df3.join(df4, on=['Date','store_id','warehouse_id','class_id'], how='left').orderBy('Date')
但我认为这不是正确的方法....结果 df 应该如下所示..
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
请注意 transit_date 去了 Date = arrival_date
的位置,当然 null 被替换为空白。
最后,如果今天是2020-08-04,那么看一下arrival_date == 2020-08-04 的位置,把数量加起来放在今天。所以....它看起来像这样...其中 store_id = 111,它将有单独的日期。此处未显示.. 因此当 store_id = 111 时逻辑也需要有意义.. 我刚刚展示了 store_id = 110 的示例
这里是df1,
from pyspark.sql import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import builtins as p
df1_schema = StructType(
[
StructField('Date', StringType(), True),
StructField('store_id', StringType(), True),
StructField('warehouse_id', StringType(), True),
StructField('class_id', StringType(), True),
StructField('total_time', IntegerType(), True)
]
)
df1_data = [
('2020-08-01','110','1','11010',3),
('2020-08-02','110','1','11010',2),
('2020-08-03','110','1','11010',3),
('2020-08-04','110','1','11010',3),
('2020-08-05','111','1','11010',1),
('2020-08-06','111','1','11010',-1)
]
df1 = spark.createDataFrame(df1_data, df1_schema)
df1 = df1.withColumn('Date', to_date('Date'))
df1 = df1.withColumn('arrival_date', when(col('total_time') != -1, expr("date_add(date, total_time)")).otherwise(''))
w = Window.partitionBy('arrival_date', 'store_id', 'warehouse_id').orderBy('Date')
df1 = df1.withColumn('transit_date', when(col('total_time') != -1, first('Date').over(w)).otherwise('')).orderBy('Date')
df1.show()
+----------+--------+------------+--------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| 2020-08-01|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| 2020-08-01|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| 2020-08-03|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| 2020-08-04|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| 2020-08-05|
|2020-08-06| 111| 1| 11010| -1| | |
+----------+--------+------------+--------+----------+------------+------------+
和 df2 一样,
df2_schema = StructType(
[
StructField('Date', StringType(), True),
StructField('store_id', StringType(), True),
StructField('warehouse_id', StringType(), True),
StructField('cloth_id', StringType(), True),
StructField('class_id', StringType(), True),
StructField('type', StringType(), True),
StructField('quantity', IntegerType(), True)
]
)
df2_data = [
('2020-08-01','110','1','M_1','11010','R',5),
('2020-08-01','110','1','M_1','11010','R',2),
('2020-08-02','110','1','M_1','11010','C',3),
('2020-08-03','110','1','M_1','11010','R',1),
('2020-08-04','110','1','M_1','11010','R',3),
('2020-08-05','111','1','M_2','11010','R',5)
]
df2 = spark.createDataFrame(df2_data, df2_schema)
df2 = df2.withColumn('Date', to_date('Date'))
df2 = df2.groupBy('Date', 'store_id', 'warehouse_id', 'cloth_id', 'class_id') \
.agg(
sum(
when(col('type') == 'R', col('quantity')).otherwise(0)
).alias('quantity2')
).orderBy('Date')
df2.show()
+----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| M_1| 11010| 0|
|2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| M_2| 11010| 5|
+----------+--------+------------+--------+--------+---------+
最后是连接结果。
df3 = df1.filter('total_time != -1') \
.join(df2, on=['Date', 'store_id', 'warehouse_id', 'class_id'], how='left') \
.drop('Date', 'total_time', 'cloth_id') \
.withColumnRenamed('arrival_date', 'Date')
df4 = df1.drop('transit_date') \
.join(df3, on=['Date', 'store_id', 'warehouse_id', 'class_id'], how='left') \
.groupBy('Date', 'store_id', 'warehouse_id', 'class_id', 'arrival_date', 'transit_date') \
.agg(sum('quantity2').alias('want')) \
.orderBy('Date')
df4.show()
+----------+--------+------------+--------+------------+------------+----+
| Date|store_id|warehouse_id|class_id|arrival_date|transit_date|want|
+----------+--------+------------+--------+------------+------------+----+
|2020-08-01| 110| 1| 11010| 2020-08-04| null|null|
|2020-08-02| 110| 1| 11010| 2020-08-04| null|null|
|2020-08-03| 110| 1| 11010| 2020-08-06| null|null|
|2020-08-04| 110| 1| 11010| 2020-08-07| 2020-08-01| 7|
|2020-08-05| 111| 1| 11010| 2020-08-06| null|null|
|2020-08-06| 111| 1| 11010| | 2020-08-05| 5|
+----------+--------+------------+--------+------------+------------+----+
根据我对你的问题的理解以及你已经拥有以下 df1
和 df2
的地方:
df1.orderBy('Date').show() df2.orderBy('Date').show()
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date| | Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| |2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| |2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| |2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| |2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| |2020-08-05| 111| 1| M_2| 11010| 5|
|2020-08-06| 111| 1| 11010| -1| | +----------+--------+------------+--------+--------+---------+
+----------+--------+------------+--------+----------+------------+
您可以尝试以下5个步骤:
第 1 步: 设置列名列表 grp_cols
以进行连接:
from pyspark.sql import functions as F
grp_cols = ["Date", "store_id", "warehouse_id", "class_id"]
步骤 2: 创建包含 transit_date
的 df3,这是 arrival_date
、store_id
、[= 的每个组合的最小日期22=] 和 class_id
:
df3 = df1.filter('total_time != -1') \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id") \
.agg(F.min('Date').alias('transit_date')) \
.withColumnRenamed("arrival_date", "Date")
df3.orderBy('Date').show()
+----------+--------+------------+--------+------------+
| Date|store_id|warehouse_id|class_id|transit_date|
+----------+--------+------------+--------+------------+
|2020-08-04| 110| 1| 11010| 2020-08-01|
|2020-08-06| 111| 1| 11010| 2020-08-05|
|2020-08-06| 110| 1| 11010| 2020-08-03|
|2020-08-07| 110| 1| 11010| 2020-08-04|
+----------+--------+------------+--------+------------+
第 3 步: 通过将 df2 与 df1 连接来设置 df4,并使用 grp_cols 离开连接 df3,坚持 df4
df4 = df2.join(df1, grp_cols).join(df3, grp_cols, "left") \
.withColumn('transit_date', F.when(F.col('total_time') != -1, F.col("transit_date")).otherwise('')) \
.persist()
_ = df4.count()
df4.orderBy('Date').show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
第 4 步: 从 df4 计算 sum(quantity2) as want
每个 arrival_date
+ store_id
+ warehouse_id
+ class_id
+ cloth_id
df5 = df4 \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id", "cloth_id") \
.agg(F.sum("quantity2").alias("want")) \
.withColumnRenamed("arrival_date", "Date")
df5.orderBy('Date').show()
+----------+--------+------------+--------+--------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|want|
+----------+--------+------------+--------+--------+----+
|2020-08-04| 110| 1| 11010| M_1| 10|
|2020-08-06| 111| 1| 11010| M_2| 5|
|2020-08-06| 110| 1| 11010| M_1| 1|
|2020-08-07| 110| 1| 11010| M_1| 3|
+----------+--------+------------+--------+--------+----+
第 5 步: 通过左连接 df4 和 df5 创建最终数据帧
df_new = df4.join(df5, grp_cols+["cloth_id"], "left").fillna(0, subset=['want'])
df_new.orderBy("Date").show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|want|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null| 0|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null| 0|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null| 0|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01| 10|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null| 0|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
df4.unpersist()