无法在 Databricks 中使用 Pandas UDF

Unable to use Pandas UDF in Databricks

我必须 运行 一个脚本,它接受一些参数作为输入,returns 一些结果作为输出,所以首先我在我的本地机器上开发它 - 工作正常 - 现在我的目标运行在 Databricks 中对其进行并行处理。

当我尝试将其并行化时,问题就出现了。我正在从已经安装的 Datalake 中获取数据(问题不存在,因为我可以在读取数据后打印 DataFrame),将其转换为 Spark DataFrame 并将每一行传递给按 [=18 分组的主函数=]:

import pandas as pd
import os
import numpy as np
import scipy.stats as stats

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType

# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
                    StructField('Alpha', IntegerType(), True),
                    StructField('Beta', IntegerType(), True),
                    StructField('Sales', IntegerType(), True),
                    StructField('SL', FloatType(), True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
    material = data['Material'].iloc[0]
    print(material)      #<-------- THIS IS NOT PRINTING
    print('Hello world')   #<------ NEITHER IS THIS

    start = data['start '].iloc[0]
    end = data['end '].iloc[0]
    mu_lt = data['mu_lt'].iloc[0]
    sigma_lt = data['sigma_lt'].iloc[0]
    
    df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
    
    for beta in range(1, 2):
        for alpha in range(3, 5):
            # Do stuff
    
    return df


if __name__ == '__main__':
  spark = SparkSession.builder.getOrCreate()
  params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
  params_spark = spark.createDataFrame(params) 

  params_spark.groupby('Material').apply(main).show()

我不确定我是否正确地将 DF 传递给了主函数,甚至声明它是正确的,但是打印的 none 和主函数中定义的 DF 似乎是 运行宁。代码没有抛出任何错误,但也没有返回任何输出。

尝试 this:

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])