Pyspark window 具有来自不同列值的偏移值的引导函数
Pyspark window lead function with offset value from value of different column
我正在尝试使用 window 前导函数派生一个新列,但是我对前导函数的偏移值因列的值而异,这是我的示例数据
inputdata = (("James", "Sales", 3000), \
("James", "Sales", 4600), \
("James", "Sales", 4100), \
("James", "Finance", 3000), \
("James", "Sales", 3000), \
("James", "Finance", 3300), \
("James", "Finance", 3900), \
("Kumar", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("James", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = inputdata, schema = columns)
输入
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|James |Sales |3000 |
|James |Sales |4100 |
|James |Sales |4100 |
|James |Sales |4600 |
|James |Finance |3000 |
|James |Finance |3300 |
|James |Finance |3900 |
|Kumar |Marketing |2000 |
|Kumar |Marketing |3000 |
+-------------+----------+------+
预期输出:
这里我有一列叫做期望值
需要从salary列导出,取值需要下一个直接大值
如果列的下一个值相同,那么它应该考虑下一个值,直到找到不同的值,
在下面的示例中,前两列的预期值为 4100,这是这里的下一个大值,9999 是默认值
我使用了window lead 函数,但问题是偏移量需要保持不变我的意思是lead 只能前进到固定数量的记录,有什么办法可以解决这个问题吗?
这是我试过的:
创建了一个偏移列,引导函数需要走得更远
windowSpec = Window.partitionBy("employee_name","department","salary").orderBy('employee_name','department','salary')
dfk = df.withColumn("row_number",row_number().over(windowSpec))
dfk = dfk.withColumn("max_row_number", max(col("row_number")+1).over(windowSpec))
dfk = dfk.orderBy("employee_name","department","salary").withColumn('offset', dfk['max_row_number']-dfk['row_number'])
+-------------+----------+------+----------+--------------+------+
|employee_name|department|salary|row_number|max_row_number|offset|
+-------------+----------+------+----------+--------------+------+
| James| Finance| 3000| 1| 2| 1|
| James| Finance| 3300| 2| 3| 1|
| James| Finance| 3900| 3| 4| 1|
| James| Sales| 3000| 1| 3| 2|
| James| Sales| 3000| 2| 3| 1|
| James| Sales| 4100| 3| 5| 2|
| James| Sales| 4100| 4| 5| 1|
| James| Sales| 4600| 5| 6| 1|
| Kumar| Marketing| 2000| 1| 2| 1|
| Kumar| Marketing| 3000| 2| 3| 1|
+-------------+----------+------+----------+--------------+------+
但我无法将偏移列作为偏移量传递给引导函数或类似的东西
windowSpec_2 = Window.partitionBy("employee_name","department").orderBy('employee_name','department','salary','row_number')
dfk.withColumn("*** expected value ****",lead('salary', **dfk['offset']**, 9999).over(windowSpec_2)).show()
我已经尝试使用 express/eval 但我仍然看到
iter
raise TypeError("Column is not iterable") TypeError: Column is not iterable
有什么办法可以达到上面的预期效果吗?
提前致谢
您的用例有点棘手,所以我认为 lead
可能不合适。您最好使用 rangeBetweeen
window,并在 window:
中获取 min
值
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'expected_value',
F.coalesce(
F.min('salary').over(
Window.partitionBy(
"employee_name","department"
).orderBy(
'salary'
).rangeBetween(
1, Window.unboundedFollowing
)
),
F.lit(9999)
)
)
df2.show()
+-------------+----------+------+--------------+
|employee_name|department|salary|expected_value|
+-------------+----------+------+--------------+
| James| Sales| 3000| 4100|
| James| Sales| 3000| 4100|
| James| Sales| 4100| 4600|
| James| Sales| 4100| 4600|
| James| Sales| 4600| 9999|
| Kumar| Marketing| 2000| 3000|
| Kumar| Marketing| 3000| 9999|
| James| Finance| 3000| 3300|
| James| Finance| 3300| 3900|
| James| Finance| 3900| 9999|
+-------------+----------+------+--------------+
我正在尝试使用 window 前导函数派生一个新列,但是我对前导函数的偏移值因列的值而异,这是我的示例数据
inputdata = (("James", "Sales", 3000), \
("James", "Sales", 4600), \
("James", "Sales", 4100), \
("James", "Finance", 3000), \
("James", "Sales", 3000), \
("James", "Finance", 3300), \
("James", "Finance", 3900), \
("Kumar", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("James", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = inputdata, schema = columns)
输入
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|James |Sales |3000 |
|James |Sales |4100 |
|James |Sales |4100 |
|James |Sales |4600 |
|James |Finance |3000 |
|James |Finance |3300 |
|James |Finance |3900 |
|Kumar |Marketing |2000 |
|Kumar |Marketing |3000 |
+-------------+----------+------+
预期输出:
这里我有一列叫做期望值
需要从salary列导出,取值需要下一个直接大值
如果列的下一个值相同,那么它应该考虑下一个值,直到找到不同的值,
在下面的示例中,前两列的预期值为 4100,这是这里的下一个大值,9999 是默认值
我使用了window lead 函数,但问题是偏移量需要保持不变我的意思是lead 只能前进到固定数量的记录,有什么办法可以解决这个问题吗?
这是我试过的:
创建了一个偏移列,引导函数需要走得更远
windowSpec = Window.partitionBy("employee_name","department","salary").orderBy('employee_name','department','salary')
dfk = df.withColumn("row_number",row_number().over(windowSpec))
dfk = dfk.withColumn("max_row_number", max(col("row_number")+1).over(windowSpec))
dfk = dfk.orderBy("employee_name","department","salary").withColumn('offset', dfk['max_row_number']-dfk['row_number'])
+-------------+----------+------+----------+--------------+------+
|employee_name|department|salary|row_number|max_row_number|offset|
+-------------+----------+------+----------+--------------+------+
| James| Finance| 3000| 1| 2| 1|
| James| Finance| 3300| 2| 3| 1|
| James| Finance| 3900| 3| 4| 1|
| James| Sales| 3000| 1| 3| 2|
| James| Sales| 3000| 2| 3| 1|
| James| Sales| 4100| 3| 5| 2|
| James| Sales| 4100| 4| 5| 1|
| James| Sales| 4600| 5| 6| 1|
| Kumar| Marketing| 2000| 1| 2| 1|
| Kumar| Marketing| 3000| 2| 3| 1|
+-------------+----------+------+----------+--------------+------+
但我无法将偏移列作为偏移量传递给引导函数或类似的东西
windowSpec_2 = Window.partitionBy("employee_name","department").orderBy('employee_name','department','salary','row_number')
dfk.withColumn("*** expected value ****",lead('salary', **dfk['offset']**, 9999).over(windowSpec_2)).show()
我已经尝试使用 express/eval 但我仍然看到
iter raise TypeError("Column is not iterable") TypeError: Column is not iterable
有什么办法可以达到上面的预期效果吗?
提前致谢
您的用例有点棘手,所以我认为 lead
可能不合适。您最好使用 rangeBetweeen
window,并在 window:
min
值
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'expected_value',
F.coalesce(
F.min('salary').over(
Window.partitionBy(
"employee_name","department"
).orderBy(
'salary'
).rangeBetween(
1, Window.unboundedFollowing
)
),
F.lit(9999)
)
)
df2.show()
+-------------+----------+------+--------------+
|employee_name|department|salary|expected_value|
+-------------+----------+------+--------------+
| James| Sales| 3000| 4100|
| James| Sales| 3000| 4100|
| James| Sales| 4100| 4600|
| James| Sales| 4100| 4600|
| James| Sales| 4600| 9999|
| Kumar| Marketing| 2000| 3000|
| Kumar| Marketing| 3000| 9999|
| James| Finance| 3000| 3300|
| James| Finance| 3300| 3900|
| James| Finance| 3900| 9999|
+-------------+----------+------+--------------+