如何在pyspark中的多列中循环多个衰减率
How to loop multiple decay rate in multiple columns in pyspark
我尝试在我的函数的参数中传递一个列表。
我的列表由不同的系数组成,适用于滞后众多列。
但是,我只设法在我的数据框中为列表的第一个值生成列。
这是我的实际结果:
"col1"、"col2"、"col1_0.2"、"col2_0.2"
预期结果:
"col1"、"col2"、"col1_0.2"、"col2_0.2"、"col1_0.4"、"col2_0.4"、"col1_0.6"、"col2_0.6"
我一定是在我的循环中遗漏了一些东西?
selected_col = col_selector(df, ["col1", "col2"])
w = Window.partitionBy("student").orderBy("date")
coef = (.1,.4,.6)
def custom_coef(col, w, coef):
for x in coef:
return sum(
pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
for i in range(1)
).alias(col +"_"+str(x))
new_df = df.select(
F.col("*"),
*[custom_coef(col, w, coef) for col in selected_col]
)
谢谢
custom_coef
函数中的return
语句在第一次循环coef
后结束函数。这意味着 custom_coef
将始终 return 第一个列定义,这是 coef 0.1 的列定义。由于该函数在 selected_col
中每列调用一次,您将获得所描述的结果。
在不更改代码结构的情况下解决问题的一种方法是将 return
替换为 yield
。这样 custom_coef
为 selected_col
的每个元素创建一个生成器。这些生成器可以用 itertools.chain 链接起来,这个结果可以用作 select
语句的参数:
def custom_coef(col, w, coef):
for x in coef:
yield sum( #use yield instead of return
pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
for i in range(1)
).alias(col +"_"+str(x))
new_df = df.select(
F.col("*"),
*chain(*[custom_coef(col, w, coef) for col in selected_col]) #chain the generators
)
new_df.show()
我尝试在我的函数的参数中传递一个列表。
我的列表由不同的系数组成,适用于滞后众多列。
但是,我只设法在我的数据框中为列表的第一个值生成列。
这是我的实际结果:
"col1"、"col2"、"col1_0.2"、"col2_0.2"
预期结果:
"col1"、"col2"、"col1_0.2"、"col2_0.2"、"col1_0.4"、"col2_0.4"、"col1_0.6"、"col2_0.6"
我一定是在我的循环中遗漏了一些东西?
selected_col = col_selector(df, ["col1", "col2"])
w = Window.partitionBy("student").orderBy("date")
coef = (.1,.4,.6)
def custom_coef(col, w, coef):
for x in coef:
return sum(
pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
for i in range(1)
).alias(col +"_"+str(x))
new_df = df.select(
F.col("*"),
*[custom_coef(col, w, coef) for col in selected_col]
)
谢谢
custom_coef
函数中的return
语句在第一次循环coef
后结束函数。这意味着 custom_coef
将始终 return 第一个列定义,这是 coef 0.1 的列定义。由于该函数在 selected_col
中每列调用一次,您将获得所描述的结果。
在不更改代码结构的情况下解决问题的一种方法是将 return
替换为 yield
。这样 custom_coef
为 selected_col
的每个元素创建一个生成器。这些生成器可以用 itertools.chain 链接起来,这个结果可以用作 select
语句的参数:
def custom_coef(col, w, coef):
for x in coef:
yield sum( #use yield instead of return
pow(i, x) * F.lag(F.col(col), i, default=0).over(w)
for i in range(1)
).alias(col +"_"+str(x))
new_df = df.select(
F.col("*"),
*chain(*[custom_coef(col, w, coef) for col in selected_col]) #chain the generators
)
new_df.show()