window pyspark 中的函数(滞后、超前)实现?
window functions( lag, lead) implementation in pyspark?
下面附上T-SQL代码。我尝试使用 window 函数将其转换为 pyspark,该函数也已附加。
case
when eventaction = 'IN' and lead(eventaction,1) over (PARTITION BY barcode order by barcode,eventdate,transactionid) in('IN','OUT')
then lead(eventaction,1) over (PARTITION BY barcode order by barcode,eventdate,transactionid)
else ''
end as next_action
Pyspark 代码使用 window 函数导致错误
Tgt_df = Tgt_df.withColumn((('Lead', lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")) == 'IN' )|
('1', lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")) == 'OUT')
, (lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate"))).otherwise('').alias("next_action")))
但它不起作用。怎么办!?
withColumn
方法应该用作 df.withColumn('name_of_col', value_of_column)
,这就是你出错的原因。
根据您的 T-SQL 请求,相应的 pyspark 代码应为:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")
Tgt_df = Tgt_df.withColumn('next_action',
F.when((F.col('event_action')=='IN')&(F.lead('event_action', 1).over(w).isin(['IN', 'OUT'])),
F.lead('event_action', 1).over(w)
).otherwise('')
)
下面附上T-SQL代码。我尝试使用 window 函数将其转换为 pyspark,该函数也已附加。
case
when eventaction = 'IN' and lead(eventaction,1) over (PARTITION BY barcode order by barcode,eventdate,transactionid) in('IN','OUT')
then lead(eventaction,1) over (PARTITION BY barcode order by barcode,eventdate,transactionid)
else ''
end as next_action
Pyspark 代码使用 window 函数导致错误
Tgt_df = Tgt_df.withColumn((('Lead', lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")) == 'IN' )|
('1', lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")) == 'OUT')
, (lead('eventaction').over(Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate"))).otherwise('').alias("next_action")))
但它不起作用。怎么办!?
withColumn
方法应该用作 df.withColumn('name_of_col', value_of_column)
,这就是你出错的原因。
根据您的 T-SQL 请求,相应的 pyspark 代码应为:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("barcode").orderBy("barcode","transactionid", "eventdate")
Tgt_df = Tgt_df.withColumn('next_action',
F.when((F.col('event_action')=='IN')&(F.lead('event_action', 1).over(w).isin(['IN', 'OUT'])),
F.lead('event_action', 1).over(w)
).otherwise('')
)