无法在 Databricks 中使用 Pandas UDF
Unable to use Pandas UDF in Databricks
我必须 运行 一个脚本,它接受一些参数作为输入,returns 一些结果作为输出,所以首先我在我的本地机器上开发它 - 工作正常 - 现在我的目标运行在 Databricks 中对其进行并行处理。
当我尝试将其并行化时,问题就出现了。我正在从已经安装的 Datalake 中获取数据(问题不存在,因为我可以在读取数据后打印 DataFrame),将其转换为 Spark DataFrame 并将每一行传递给按 [=18 分组的主函数=]:
import pandas as pd
import os
import numpy as np
import scipy.stats as stats
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType
# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
StructField('Alpha', IntegerType(), True),
StructField('Beta', IntegerType(), True),
StructField('Sales', IntegerType(), True),
StructField('SL', FloatType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
material = data['Material'].iloc[0]
print(material) #<-------- THIS IS NOT PRINTING
print('Hello world') #<------ NEITHER IS THIS
start = data['start '].iloc[0]
end = data['end '].iloc[0]
mu_lt = data['mu_lt'].iloc[0]
sigma_lt = data['sigma_lt'].iloc[0]
df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
for beta in range(1, 2):
for alpha in range(3, 5):
# Do stuff
return df
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
params_spark = spark.createDataFrame(params)
params_spark.groupby('Material').apply(main).show()
我不确定我是否正确地将 DF 传递给了主函数,甚至声明它是正确的,但是打印的 none 和主函数中定义的 DF 似乎是 运行宁。代码没有抛出任何错误,但也没有返回任何输出。
尝试 this:
@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])
我必须 运行 一个脚本,它接受一些参数作为输入,returns 一些结果作为输出,所以首先我在我的本地机器上开发它 - 工作正常 - 现在我的目标运行在 Databricks 中对其进行并行处理。
当我尝试将其并行化时,问题就出现了。我正在从已经安装的 Datalake 中获取数据(问题不存在,因为我可以在读取数据后打印 DataFrame),将其转换为 Spark DataFrame 并将每一行传递给按 [=18 分组的主函数=]:
import pandas as pd
import os
import numpy as np
import scipy.stats as stats
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType
# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
StructField('Alpha', IntegerType(), True),
StructField('Beta', IntegerType(), True),
StructField('Sales', IntegerType(), True),
StructField('SL', FloatType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
material = data['Material'].iloc[0]
print(material) #<-------- THIS IS NOT PRINTING
print('Hello world') #<------ NEITHER IS THIS
start = data['start '].iloc[0]
end = data['end '].iloc[0]
mu_lt = data['mu_lt'].iloc[0]
sigma_lt = data['sigma_lt'].iloc[0]
df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
for beta in range(1, 2):
for alpha in range(3, 5):
# Do stuff
return df
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
params_spark = spark.createDataFrame(params)
params_spark.groupby('Material').apply(main).show()
我不确定我是否正确地将 DF 传递给了主函数,甚至声明它是正确的,但是打印的 none 和主函数中定义的 DF 似乎是 运行宁。代码没有抛出任何错误,但也没有返回任何输出。
尝试 this:
@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])