pyspark 用一些与最后一个非空值相关的计算替换空值
pyspark replacing null values with some calculation related to last not null values
您好,我的问题与此有关 ()
但我的问题的要求略有变化:
data: expected output:
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock| | item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null| |673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110| |673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null| |673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| null| |673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109| => |673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null| |673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108| |673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| null| |673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| null| |673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| null| |673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
我的预期输出基于最后一个已知的非空值和 sales_qty,如果存在 sales_qty,则应根据该值调整股票价值。
我尝试了以下逻辑
my_window = Window.partitionBy('item','store').orderBy('timestamp')
df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))
但它只适用于一个空值,有人可以帮我实现预期的结果吗?
注意:数量并不是一直在持续减少的,所以需要考虑上次的非空值来计算新的
你可以试试这个。我基本上首先生成两列(第一个非空值 = 110)和 stock2,这基本上是库存的增量总和,然后将它们相互减去以获得所需的库存。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 110|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
如果您想将您的第一个值强制为 null 而不是 110(如您所需的输出所示),您可以使用它。(基本上使用 rownumber 将第一个 110 值替换为 null):
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
附加数据输入和输出:
#input1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 3| null|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 4| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 3| 106|
|673895|35578| 20180105| 0| 106|
|673895|35578| 20180106| 1| 105|
|673895|35578| 20180107| 0| 105|
|673895|35578| 20180108| 4| 101|
|673895|35578| 20180109| 0| 101|
|673895|35578| 20180110| 1| 100|
+------+-----+---------+---------+-----+
#input2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| null|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| 102|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| 98|
|673895|35578| 20180109| 0| 98|
|673895|35578| 20180110| 1| 97|
+------+-----+---------+---------+-----+
如果,stock
数量不是连续的,像这样:
df.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| null|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
你可以使用这个:(我基本上为每个非空 last 计算一个动态 window)
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 0|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| 110|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| 107|
|673895|35578| 20180109| 0| 107|
|673895|35578| 20180110| 1| 106|
+------+-----+---------+---------+-----+
您好,我的问题与此有关 (
data: expected output:
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock| | item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null| |673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110| |673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null| |673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| null| |673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109| => |673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null| |673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108| |673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| null| |673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| null| |673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| null| |673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+ +------+-----+---------+---------+-----+
我的预期输出基于最后一个已知的非空值和 sales_qty,如果存在 sales_qty,则应根据该值调整股票价值。 我尝试了以下逻辑
my_window = Window.partitionBy('item','store').orderBy('timestamp')
df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))
但它只适用于一个空值,有人可以帮我实现预期的结果吗?
注意:数量并不是一直在持续减少的,所以需要考虑上次的非空值来计算新的
你可以试试这个。我基本上首先生成两列(第一个非空值 = 110)和 stock2,这基本上是库存的增量总和,然后将它们相互减去以获得所需的库存。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 110|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
如果您想将您的第一个值强制为 null 而不是 110(如您所需的输出所示),您可以使用它。(基本上使用 rownumber 将第一个 110 值替换为 null):
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
附加数据输入和输出:
#input1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 3| null|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 4| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 3| 106|
|673895|35578| 20180105| 0| 106|
|673895|35578| 20180106| 1| 105|
|673895|35578| 20180107| 0| 105|
|673895|35578| 20180108| 4| 101|
|673895|35578| 20180109| 0| 101|
|673895|35578| 20180110| 1| 100|
+------+-----+---------+---------+-----+
#input2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| null|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| 102|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| 98|
|673895|35578| 20180109| 0| 98|
|673895|35578| 20180110| 1| 97|
+------+-----+---------+---------+-----+
如果,stock
数量不是连续的,像这样:
df.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| null|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
你可以使用这个:(我基本上为每个非空 last 计算一个动态 window)
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 0|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| 110|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| 107|
|673895|35578| 20180109| 0| 107|
|673895|35578| 20180110| 1| 106|
+------+-----+---------+---------+-----+