将数据行添加到 Spark 数据框中的每个组

Add rows of data to each group in a Spark dataframe

我有这个数据框 -

data = [(0,1,1,201505,3),
        (1,1,1,201506,5),
        (2,1,1,201507,7),
        (3,1,1,201508,2),
        (4,2,2,201750,3),
        (5,2,2,201751,0),
        (6,2,2,201752,1),
        (7,2,2,201753,1)
       ]
cols = ['id','item','store','week','sales']
data_df = spark.createDataFrame(data=data,schema=cols)
display(data_df)

我要这个-

data_new = [(0,1,1,201505,3,0),
            (1,1,1,201506,5,0),
            (2,1,1,201507,7,0),
            (3,1,1,201508,2,0),
            (4,1,1,201509,0,0),
            (5,1,1,201510,0,0),
            (6,1,1,201511,0,0),
            (7,1,1,201512,0,0),
            (8,2,2,201750,3,0),
            (9,2,2,201751,0,0),
            (10,2,2,201752,1,0),
            (11,2,2,201753,1,0),
            (12,2,2,201801,0,0),
            (13,2,2,201802,0,0),
            (14,2,2,201803,0,0),
            (15,2,2,201804,0,0)]
cols_new = ['id','item','store','week','sales','flag',]
data_df_new = spark.createDataFrame(data=data_new,schema=cols_new)
display(data_df_new)

所以基本上,我需要 8(这也可以是 6 或 10)周的数据用于每个商品商店 groupby 组合。无论一年的 52/53 周在哪里结束,我都需要下一年的周数,正如我在样本中提到的那样。我在 PySpark 中需要这个,提前致谢!

在下面查看我的尝试。本来可以让它更短,但我觉得应该尽可能明确,所以我力图把灵魂联系起来。下面的代码

from pyspark.sql import functions as F
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


# Convert week of the year to date
s=data_df.withColumn("week", expr("cast (week as string)")).withColumn('date', F.to_date(F.concat("week",F.lit("6")), "yyyywwu"))


s = (s.groupby('item', 'store').agg(F.collect_list('sales').alias('sales'),F.collect_list('date').alias('date'))#Put sales and dates in an array
     .withColumn("id", sequence(lit(0), lit(6)))#Create sequence ids with the required expansion range per group
    )

#Explode datframe back with each item/store combination in a row
s =s.selectExpr('item','store','inline(arrays_zip(date,id,sales))')

#Create partition window broadcasting from start to end for each item/store combination
w = Window.partitionBy('item','store').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize)

#Create partition window broadcasting from start to end for each item/store/date combination. the purpose here is to aggregate over null dates as group
w1 = Window.partitionBy('item','store','date').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.currentRow)

s=(s.withColumn('increment', F.when(col('date').isNull(),(row_number().over(w1))*7).otherwise(0))#Create increment values per item/store combination
   
   .withColumn('date1', F.when(col('date').isNull(),max('date').over(w)).otherwise(col('date')))#get last date in each item/store combination
   
  )



# #Compute the  week of year and drop columns not wanted
s = s.withColumn("weekofyear", expr("weekofyear(date_add(date1, cast(increment as int)))")).drop('date','increment','date1').na.fill(0)               
               


s.show(truncate=False)

结果

+----+-----+---+-----+----------+
|item|store|id |sales|weekofyear|
+----+-----+---+-----+----------+
|1   |1    |0  |3    |5         |
|1   |1    |1  |5    |6         |
|1   |1    |2  |7    |7         |
|1   |1    |3  |2    |8         |
|1   |1    |4  |0    |9         |
|1   |1    |5  |0    |10        |
|1   |1    |6  |0    |11        |
|2   |2    |0  |3    |50        |
|2   |2    |1  |0    |51        |
|2   |2    |2  |1    |52        |
|2   |2    |3  |1    |1         |
|2   |2    |4  |0    |2         |
|2   |2    |5  |0    |3         |
|2   |2    |6  |0    |4         |
+----+-----+---+-----+----------+