将 Pandas 最佳拟合函数转换为 pyspark
Convert Pandas best fit function into pyspark
我一直在使用此函数在 Pandas 中创建时间序列特征,returns 给定点范围的(OLS?)最佳拟合斜率:
def best_fit(X, Y):
xbar = sum(X)/len(X)
ybar = sum(Y)/len(Y)
n = len(X)
numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
denum = sum([xi**2 for xi in X]) - n * xbar**2
b = numer / denum
return b
这是一个显示结果的简单示例(请参阅下面的最终 df):
import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
df.loc[i,'x_vals'] = i
df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points
然后我应用 best_fit 函数来获得前 5 个点的斜率:
for i,row in df.iterrows():
if i>=5:
X = df['x_vals'][i-5:i]
Y = df['y_vals'][i-5:i]
df.loc[i,'slope'] = best_fit(X, Y)
df
这给了我这个:
x_vals y_vals slope
0 -0.648205 NaN
1 0.282729 NaN
2 0.785474 NaN
3 1.48546 NaN
4 0.408165 NaN
5 1.61244 0.331548
6 2.60868 0.228211
7 3.77621 0.377338
8 4.08937 0.678201
9 4.34625 0.952618
10 5.47554 0.694832
11 7.90902 0.630377
12 8.83912 0.965180
13 9.01195 1.306227
14 11.8244 1.269497
15 13.3199 1.380057
16 15.2751 1.380692
17 15.3959 1.717981
18 18.454 1.621861
19 20.0773 1.533528
我需要从 pyspark 数据帧而不是 Pandas 中获取相同的斜率列,只是我正在努力寻找一个起点(pyspark window?,OLS 内置功能?,udf?)。
使用 Pyspark window,收集前 5 个 col 值作为列表并调用 best_fit_udf
#moodified this function to handle 0 division and size of elements
def best_fit(X, Y):
xbar = sum(X)/len(X)
ybar = sum(Y)/len(Y)
n = len(X)
if n < 6 :
return None
numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
denum = sum([xi**2 for xi in X]) - n * xbar**2
if denum == 0:
return None
else:
return numer / denum
best_fit_udf = udf(best_fit, DoubleType())
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
df.loc[i,'x_vals'] = i
df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points
spark_df = spark.createDataFrame(df)
w = Window.orderBy("x_vals").rowsBetween(-5, 0)
df = spark_df.select("x_vals","y_vals",(F.collect_list('x_vals')).over(w).alias("x_list"), (F.collect_list('y_vals')).over(w).alias("y_list"))
df.withColumn("slope", best_fit_udf('x_list','y_list') ).drop('x_list','y_list').show()
这给了我这个
+------+--------------------+------------------+
|x_vals| y_vals| slope|
+------+--------------------+------------------+
| 0|-0.05626232194330516| null|
| 1| 1.0626613654187942| null|
| 2|-0.18870622421238525| null|
| 3| 1.7106172105001147| null|
| 4| 1.9398571272258158| null|
| 5| 2.3632022124308474| 0.475092382628695|
| 6| 1.7264493731921893|0.3201115790149247|
| 7| 3.298712278452215|0.5116552596172641|
| 8| 4.3179382280764305|0.4707547914949186|
| 9| 4.00691449276564|0.5077645079970263|
| 10| 6.085792506183289|0.7563877936316236|
| 11| 7.272669055040746|1.0223232959178614|
| 12| 8.70598472345308| 1.085126649123283|
| 13| 10.141576882812515|1.2686365861314373|
| 14| 11.170519757896672| 1.411962717827295|
| 15| 11.999868557507794|1.2199864149871311|
| 16| 14.86294824152797|1.3960568659909833|
| 17| 16.698964370210007| 1.570238888844051|
| 18| 18.71951724368806|1.7810890092953742|
| 19| 20.428078271618062|1.9509358501665701|
+------+--------------------+------------------+
谢谢@Ranga Vure。我根据原始 best_fit
函数(当然是用你的值)测试了你的函数,我得到了不同的斜率值。
这就是 best_fit
函数给出的结果(y_vals
值看起来是四舍五入的,但实际上不是):
x_vals y_vals slope_py
0 -0.0562623 NaN
1 1.06266 NaN
2 -0.188706 NaN
3 1.71062 NaN
4 1.93986 NaN
5 2.3632 0.464019
6 1.72645 0.472965
7 3.29871 0.448290
8 4.31794 0.296278
9 4.00691 0.569167
10 6.08579 0.587891
11 7.27267 0.942689
12 8.70598 0.971577
13 10.1416 1.204185
14 11.1705 1.488952
15 11.9999 1.303836
16 14.8629 1.191893
17 16.699 1.417222
18 18.7195 1.680720
19 20.4281 1.979709
我将 best_fit
函数翻译成 sqlContext
,这给了我与原始 best_fit
函数相同的值:
spark_df.createOrReplaceTempView('tempsql')
df_sql = sqlContext.sql("""
SELECT *,
(((sum(x_vals*y_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(y_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5) /
((sum(x_vals*x_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5)) as slope_sql
from tempsql
""")
这给出了与原始 best_fit
函数相同的值,除了点 3-5 之外,因为它计算了预期开始之前的斜率(即第 6 个点)——这是一个小怪癖,但我可以住在一起:
+------+-------------------+-------------------+--------------------+
|x_vals| y_vals| slope_py| slope_sql|
+------+-------------------+-------------------+--------------------+
| 0|-0.0562623219433051| NaN| null|
| 1| 1.06266136541879| NaN| null|
| 2| -0.188706224212385| NaN| 1.0767269459046163|
| 3| 1.71061721050011| NaN|0.060822882948800006|
| 4| 1.93985712722581| NaN| 0.4092836048203674|
| 5| 2.36320221243084| 0.4640194743419549| 0.4640194743419549|
| 6| 1.72644937319218| 0.4729645045462295| 0.4729645045462295|
| 7| 3.29871227845221|0.44828961967398656| 0.4482896196739862|
| 8| 4.31793822807643| 0.2962782381870575| 0.29627823818705823|
| 9| 4.00691449276564| 0.569167226772261| 0.5691672267722595|
我一直在使用此函数在 Pandas 中创建时间序列特征,returns 给定点范围的(OLS?)最佳拟合斜率:
def best_fit(X, Y):
xbar = sum(X)/len(X)
ybar = sum(Y)/len(Y)
n = len(X)
numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
denum = sum([xi**2 for xi in X]) - n * xbar**2
b = numer / denum
return b
这是一个显示结果的简单示例(请参阅下面的最终 df):
import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
df.loc[i,'x_vals'] = i
df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points
然后我应用 best_fit 函数来获得前 5 个点的斜率:
for i,row in df.iterrows():
if i>=5:
X = df['x_vals'][i-5:i]
Y = df['y_vals'][i-5:i]
df.loc[i,'slope'] = best_fit(X, Y)
df
这给了我这个:
x_vals y_vals slope
0 -0.648205 NaN
1 0.282729 NaN
2 0.785474 NaN
3 1.48546 NaN
4 0.408165 NaN
5 1.61244 0.331548
6 2.60868 0.228211
7 3.77621 0.377338
8 4.08937 0.678201
9 4.34625 0.952618
10 5.47554 0.694832
11 7.90902 0.630377
12 8.83912 0.965180
13 9.01195 1.306227
14 11.8244 1.269497
15 13.3199 1.380057
16 15.2751 1.380692
17 15.3959 1.717981
18 18.454 1.621861
19 20.0773 1.533528
我需要从 pyspark 数据帧而不是 Pandas 中获取相同的斜率列,只是我正在努力寻找一个起点(pyspark window?,OLS 内置功能?,udf?)。
使用 Pyspark window,收集前 5 个 col 值作为列表并调用 best_fit_udf
#moodified this function to handle 0 division and size of elements
def best_fit(X, Y):
xbar = sum(X)/len(X)
ybar = sum(Y)/len(Y)
n = len(X)
if n < 6 :
return None
numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
denum = sum([xi**2 for xi in X]) - n * xbar**2
if denum == 0:
return None
else:
return numer / denum
best_fit_udf = udf(best_fit, DoubleType())
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
df.loc[i,'x_vals'] = i
df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points
spark_df = spark.createDataFrame(df)
w = Window.orderBy("x_vals").rowsBetween(-5, 0)
df = spark_df.select("x_vals","y_vals",(F.collect_list('x_vals')).over(w).alias("x_list"), (F.collect_list('y_vals')).over(w).alias("y_list"))
df.withColumn("slope", best_fit_udf('x_list','y_list') ).drop('x_list','y_list').show()
这给了我这个
+------+--------------------+------------------+
|x_vals| y_vals| slope|
+------+--------------------+------------------+
| 0|-0.05626232194330516| null|
| 1| 1.0626613654187942| null|
| 2|-0.18870622421238525| null|
| 3| 1.7106172105001147| null|
| 4| 1.9398571272258158| null|
| 5| 2.3632022124308474| 0.475092382628695|
| 6| 1.7264493731921893|0.3201115790149247|
| 7| 3.298712278452215|0.5116552596172641|
| 8| 4.3179382280764305|0.4707547914949186|
| 9| 4.00691449276564|0.5077645079970263|
| 10| 6.085792506183289|0.7563877936316236|
| 11| 7.272669055040746|1.0223232959178614|
| 12| 8.70598472345308| 1.085126649123283|
| 13| 10.141576882812515|1.2686365861314373|
| 14| 11.170519757896672| 1.411962717827295|
| 15| 11.999868557507794|1.2199864149871311|
| 16| 14.86294824152797|1.3960568659909833|
| 17| 16.698964370210007| 1.570238888844051|
| 18| 18.71951724368806|1.7810890092953742|
| 19| 20.428078271618062|1.9509358501665701|
+------+--------------------+------------------+
谢谢@Ranga Vure。我根据原始 best_fit
函数(当然是用你的值)测试了你的函数,我得到了不同的斜率值。
这就是 best_fit
函数给出的结果(y_vals
值看起来是四舍五入的,但实际上不是):
x_vals y_vals slope_py
0 -0.0562623 NaN
1 1.06266 NaN
2 -0.188706 NaN
3 1.71062 NaN
4 1.93986 NaN
5 2.3632 0.464019
6 1.72645 0.472965
7 3.29871 0.448290
8 4.31794 0.296278
9 4.00691 0.569167
10 6.08579 0.587891
11 7.27267 0.942689
12 8.70598 0.971577
13 10.1416 1.204185
14 11.1705 1.488952
15 11.9999 1.303836
16 14.8629 1.191893
17 16.699 1.417222
18 18.7195 1.680720
19 20.4281 1.979709
我将 best_fit
函数翻译成 sqlContext
,这给了我与原始 best_fit
函数相同的值:
spark_df.createOrReplaceTempView('tempsql')
df_sql = sqlContext.sql("""
SELECT *,
(((sum(x_vals*y_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(y_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5) /
((sum(x_vals*x_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5)) as slope_sql
from tempsql
""")
这给出了与原始 best_fit
函数相同的值,除了点 3-5 之外,因为它计算了预期开始之前的斜率(即第 6 个点)——这是一个小怪癖,但我可以住在一起:
+------+-------------------+-------------------+--------------------+
|x_vals| y_vals| slope_py| slope_sql|
+------+-------------------+-------------------+--------------------+
| 0|-0.0562623219433051| NaN| null|
| 1| 1.06266136541879| NaN| null|
| 2| -0.188706224212385| NaN| 1.0767269459046163|
| 3| 1.71061721050011| NaN|0.060822882948800006|
| 4| 1.93985712722581| NaN| 0.4092836048203674|
| 5| 2.36320221243084| 0.4640194743419549| 0.4640194743419549|
| 6| 1.72644937319218| 0.4729645045462295| 0.4729645045462295|
| 7| 3.29871227845221|0.44828961967398656| 0.4482896196739862|
| 8| 4.31793822807643| 0.2962782381870575| 0.29627823818705823|
| 9| 4.00691449276564| 0.569167226772261| 0.5691672267722595|