Pyspark:将 UDF 的结果迭代写回数据框不会产生预期的结果
Pyspark: writing results of UDF iteratively back to dataframe does not produce expected results
我仍然是 pyspark 的新手,我正在尝试在 UDF 的帮助下评估函数并迭代地创建列。以下是函数:
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
func_udf=udf(func,IntegerType())
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
以上产生结果:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 2|
| 8| 5| 7| 0| 4| 4|
| 9| 4| 3| 0| 2| 2|
| 3| 8| 2| 0| 4| 4|
+---+---+---+-----+-----+-----+
预期结果是:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 4|
| 8| 5| 7| 0| 2| 4|
| 9| 4| 3| 0| 2| 4|
| 3| 8| 2| 0| 2| 4|
+---+---+---+-----+-----+-----+
如果我单独查看内部函数的输出,结果符合预期,即:
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func():
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
e=func()
print e
output:
0
2
4
不确定将这些结果从 UDF 写回到 df 的正确方法是什么。发布的数据框只是一个示例数据框,我需要使用 for 循环,因为在我的原始代码中,我在下面的 for loop.For 示例中调用了其他函数(谁的输出取决于迭代器的值):
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
q=[]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p < a:
cummulative_stat[0]=cummulative_stat[0]+52
pass
elif p >=a:
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
if cummulative_stat and p >1:
var1=func2(p,3000)
var2=func3(var1)
cummulative_stat=np.nan
else:
var1=func2(p,3000)
var2=func3(var1)
end[0] = start[0] + pt[0] - s[0]
q.append(end[0],var1,var2)
return q
func_udf=udf(func,ArrayType(ArrayType(IntegerType())))
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
我正在使用 pyspark 2.2。任何帮助深表感谢。
要创建此数据框:
rdd = sc.parallelize([(2,12,5),(8,5,7),
(9,4,3),
(3,8,2)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c'))
df.show()
据我了解,查看您的代码后,您的下一列值取决于前一列。如果我的理解是正确的,那么我可以说 你的 udf 函数定义放在了错误的地方 。 您需要对代码进行微小的更改 才能使其正常工作。
一步步来
您已经有
+---+---+---+
| a| b| c|
+---+---+---+
| 2| 12| 5|
| 8| 5| 7|
| 9| 4| 3|
| 3| 8| 2|
+---+---+---+
我们需要 一个初始化列,我看到它是 0
from pyspark.sql import functions as F
from pyspark.sql import types as T
df=df.withColumn('temp0', F.lit(0))
应该是
+---+---+---+-----+
| a| b| c|temp0|
+---+---+---+-----+
| 2| 12| 5| 0|
| 8| 5| 7| 0|
| 9| 4| 3| 0|
| 3| 8| 2| 0|
+---+---+---+-----+
我们应该将udf
函数移出循环作为
def func(p, end):
start = 0
s = 0
pt = 0
if p==1:
pass
elif p >1:
start = end
s=2
pt =4
end = start + pt - s
return end
func_udf=F.udf(func, T.IntegerType())
和在循环中调用udf
函数为
def get_temp(df):
l=['temp1','temp2','temp3']
for p in xrange(1,4):
df=df.withColumn(l[p-1],func_udf(F.lit(p), F.col('temp'+str(p-1))))
return df
df=get_temp(df)
最后 删除初始化列
df=df.drop('temp0')
这应该会给你你想要的输出
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 4|
| 8| 5| 7| 0| 2| 4|
| 9| 4| 3| 0| 2| 4|
| 3| 8| 2| 0| 2| 4|
+---+---+---+-----+-----+-----+
希望回答对你有帮助
我仍然是 pyspark 的新手,我正在尝试在 UDF 的帮助下评估函数并迭代地创建列。以下是函数:
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
func_udf=udf(func,IntegerType())
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
以上产生结果:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 2|
| 8| 5| 7| 0| 4| 4|
| 9| 4| 3| 0| 2| 2|
| 3| 8| 2| 0| 4| 4|
+---+---+---+-----+-----+-----+
预期结果是:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 4|
| 8| 5| 7| 0| 2| 4|
| 9| 4| 3| 0| 2| 4|
| 3| 8| 2| 0| 2| 4|
+---+---+---+-----+-----+-----+
如果我单独查看内部函数的输出,结果符合预期,即:
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func():
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
e=func()
print e
output:
0
2
4
不确定将这些结果从 UDF 写回到 df 的正确方法是什么。发布的数据框只是一个示例数据框,我需要使用 for 循环,因为在我的原始代码中,我在下面的 for loop.For 示例中调用了其他函数(谁的输出取决于迭代器的值):
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
q=[]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p < a:
cummulative_stat[0]=cummulative_stat[0]+52
pass
elif p >=a:
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
if cummulative_stat and p >1:
var1=func2(p,3000)
var2=func3(var1)
cummulative_stat=np.nan
else:
var1=func2(p,3000)
var2=func3(var1)
end[0] = start[0] + pt[0] - s[0]
q.append(end[0],var1,var2)
return q
func_udf=udf(func,ArrayType(ArrayType(IntegerType())))
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
我正在使用 pyspark 2.2。任何帮助深表感谢。 要创建此数据框:
rdd = sc.parallelize([(2,12,5),(8,5,7),
(9,4,3),
(3,8,2)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c'))
df.show()
据我了解,查看您的代码后,您的下一列值取决于前一列。如果我的理解是正确的,那么我可以说 你的 udf 函数定义放在了错误的地方 。 您需要对代码进行微小的更改 才能使其正常工作。
一步步来
您已经有
+---+---+---+
| a| b| c|
+---+---+---+
| 2| 12| 5|
| 8| 5| 7|
| 9| 4| 3|
| 3| 8| 2|
+---+---+---+
我们需要 一个初始化列,我看到它是 0
from pyspark.sql import functions as F
from pyspark.sql import types as T
df=df.withColumn('temp0', F.lit(0))
应该是
+---+---+---+-----+
| a| b| c|temp0|
+---+---+---+-----+
| 2| 12| 5| 0|
| 8| 5| 7| 0|
| 9| 4| 3| 0|
| 3| 8| 2| 0|
+---+---+---+-----+
我们应该将udf
函数移出循环作为
def func(p, end):
start = 0
s = 0
pt = 0
if p==1:
pass
elif p >1:
start = end
s=2
pt =4
end = start + pt - s
return end
func_udf=F.udf(func, T.IntegerType())
和在循环中调用udf
函数为
def get_temp(df):
l=['temp1','temp2','temp3']
for p in xrange(1,4):
df=df.withColumn(l[p-1],func_udf(F.lit(p), F.col('temp'+str(p-1))))
return df
df=get_temp(df)
最后 删除初始化列
df=df.drop('temp0')
这应该会给你你想要的输出
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 4|
| 8| 5| 7| 0| 2| 4|
| 9| 4| 3| 0| 2| 4|
| 3| 8| 2| 0| 2| 4|
+---+---+---+-----+-----+-----+
希望回答对你有帮助