Pyspark window 具有来自不同列值的偏移值的引导函数

Pyspark window lead function with offset value from value of different column

我正在尝试使用 window 前导函数派生一个新列,但是我对前导函数的偏移值因列的值而异,这是我的示例数据

inputdata = (("James", "Sales", 3000), \
    ("James", "Sales", 4600),  \
    ("James", "Sales", 4100),   \
    ("James", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("James", "Finance", 3300),  \
    ("James", "Finance", 3900),    \
    ("Kumar", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("James", "Sales", 4100) \
  )
  
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = inputdata, schema = columns)

输入

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|James        |Sales     |3000  |
|James        |Sales     |4100  |
|James        |Sales     |4100  |
|James        |Sales     |4600  |
|James        |Finance   |3000  |
|James        |Finance   |3300  |
|James        |Finance   |3900  |
|Kumar        |Marketing |2000  |
|Kumar        |Marketing |3000  |
+-------------+----------+------+

预期输出:

这里我有一列叫做期望值

需要从salary列导出,取值需要下一个直接大值

如果列的下一个值相同,那么它应该考虑下一个值,直到找到不同的值,

在下面的示例中,前两列的预期值为 4100,这是这里的下一个大值,9999 是默认值

我使用了window lead 函数,但问题是偏移量需要保持不变我的意思是lead 只能前进到固定数量的记录,有什么办法可以解决这个问题吗?

这是我试过的:

创建了一个偏移列,引导函数需要走得更远

windowSpec  = Window.partitionBy("employee_name","department","salary").orderBy('employee_name','department','salary')
dfk = df.withColumn("row_number",row_number().over(windowSpec))
dfk = dfk.withColumn("max_row_number", max(col("row_number")+1).over(windowSpec))
dfk = dfk.orderBy("employee_name","department","salary").withColumn('offset', dfk['max_row_number']-dfk['row_number'])

+-------------+----------+------+----------+--------------+------+
|employee_name|department|salary|row_number|max_row_number|offset|
+-------------+----------+------+----------+--------------+------+
|        James|   Finance|  3000|         1|             2|     1|
|        James|   Finance|  3300|         2|             3|     1|
|        James|   Finance|  3900|         3|             4|     1|
|        James|     Sales|  3000|         1|             3|     2|
|        James|     Sales|  3000|         2|             3|     1|
|        James|     Sales|  4100|         3|             5|     2|
|        James|     Sales|  4100|         4|             5|     1|
|        James|     Sales|  4600|         5|             6|     1|
|        Kumar| Marketing|  2000|         1|             2|     1|
|        Kumar| Marketing|  3000|         2|             3|     1|
+-------------+----------+------+----------+--------------+------+

但我无法将偏移列作为偏移量传递给引导函数或类似的东西

windowSpec_2  = Window.partitionBy("employee_name","department").orderBy('employee_name','department','salary','row_number')
dfk.withColumn("*** expected value ****",lead('salary', **dfk['offset']**, 9999).over(windowSpec_2)).show()

我已经尝试使用 express/eval 但我仍然看到

iter raise TypeError("Column is not iterable") TypeError: Column is not iterable

有什么办法可以达到上面的预期效果吗?

提前致谢

您的用例有点棘手,所以我认为 lead 可能不合适。您最好使用 rangeBetweeen window,并在 window:

中获取 min
import pyspark.sql.functions as F
from pyspark.sql.window import Window

df2 = df.withColumn(
    'expected_value',
    F.coalesce(
        F.min('salary').over(
            Window.partitionBy(
                "employee_name","department"
            ).orderBy(
                'salary'
            ).rangeBetween(
                1, Window.unboundedFollowing
            )
        ),
        F.lit(9999)
    )
)

df2.show()
+-------------+----------+------+--------------+
|employee_name|department|salary|expected_value|
+-------------+----------+------+--------------+
|        James|     Sales|  3000|          4100|
|        James|     Sales|  3000|          4100|
|        James|     Sales|  4100|          4600|
|        James|     Sales|  4100|          4600|
|        James|     Sales|  4600|          9999|
|        Kumar| Marketing|  2000|          3000|
|        Kumar| Marketing|  3000|          9999|
|        James|   Finance|  3000|          3300|
|        James|   Finance|  3300|          3900|
|        James|   Finance|  3900|          9999|
+-------------+----------+------+--------------+