如何在pyspark中的多列中循环多个衰减率

How to loop multiple decay rate in multiple columns in pyspark

我尝试在我的函数的参数中传递一个列表。

我的列表由不同的系数组成,适用于滞后众多列。

但是,我只设法在我的数据框中为列表的第一个值生成列。

这是我的实际结果:

"col1"、"col2"、"col1_0.2"、"col2_0.2"

预期结果:

"col1"、"col2"、"col1_0.2"、"col2_0.2"、"col1_0.4"、"col2_0.4"、"col1_0.6"、"col2_0.6"

我一定是在我的循环中遗漏了一些东西?

selected_col = col_selector(df, ["col1", "col2"])


w = Window.partitionBy("student").orderBy("date")
coef = (.1,.4,.6)

def custom_coef(col, w, coef):
    for x in coef:
        return sum(
            pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
            for i in range(1)
        ).alias(col +"_"+str(x))

new_df = df.select(
    F.col("*"),
    *[custom_coef(col, w, coef) for col in selected_col]
)

谢谢

custom_coef函数中的return语句在第一次循环coef后结束函数。这意味着 custom_coef 将始终 return 第一个列定义,这是 coef 0.1 的列定义。由于该函数在 selected_col 中每列调用一次,您将获得所描述的结果。

在不更改代码结构的情况下解决问题的一种方法是将 return 替换为 yield。这样 custom_coefselected_col 的每个元素创建一个生成器。这些生成器可以用 itertools.chain 链接起来,这个结果可以用作 select 语句的参数:

def custom_coef(col, w, coef):
    for x in coef:
        yield sum(  #use yield instead of return
            pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
            for i in range(1)
        ).alias(col +"_"+str(x))

new_df = df.select(
    F.col("*"),
    *chain(*[custom_coef(col, w, coef) for col in selected_col]) #chain the generators
)
new_df.show()