使用 pickled MLFlow 模型和 pandas_udf 在 PySpark 中进行预测

Predictions in PySpark using pickled MLFlow model and pandas_udf

我找到了一个使用 MLFlow 保存到 .pkl 文件的随机搜索的 LightGBM 模型。目标是将该腌制模型加载到 Pyspark 中并在那里进行预测。这是否可能通过简单的 unpickling:

with open(path, 'rb') as f:
    model = pickle.load(f)

然后应用 pandas_udf:

import pyspark.sql.functions as F

@F.pandas_udf(returnType=DoubleType())
def predict_udf(*cols):
    df = pd.concat(cols, axis=1)
    return pd.Series(model.predict(df))

cols = columns_list

y_pred = X_pred.select(F.col('id'), predict_udf3(*cols).alias('prediction'))

如果我尝试显示、计算或保存输出,它 returns:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 18.0 failed 1 times, most recent failure: Lost task 1.0 in stage 18.0 (TID 217, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun.applyOrElse(PythonRunner.scala:486)

model.predict() 部分本身在 python 中运行良好。如果我用更简单的东西替换上面 pandas_udf 函数中的 model.predict() ,例如设置一个常量,然后它工作正常,所以我认为问题不应该在版本中,因为一些人在一两年前报告说 pyarrow、numpy 和 pandas 之间存在特定的工作组合,主要是由 pyarrow (0.15+) 的变化引起。我目前正在使用:

pandas==0.25.1 (downgraded from 1.2.4)
numpy==1.17.2 (downgraded from 1.20.3)
pyarrow==0.14.0 (tried all versions between 0.14.0 and 4.0.0)

之前有报道说旧的 numpy(例如 0.14)可以解决这个问题,但这现在对我不起作用,因为如果 numpy 太旧,解开模型会失败。那么,几个问题:

  1. 是否可以将 MLFlow 模型解开到 Spark 中,然后执行 model.predict()?
  2. 以上pandas_udf是否做对了?
  3. 实际上 pandas_udf 是最有效的方法吗?预计会做出数百万次预测,运行时间将至关重要。

提前致谢!

我自己解决了这个问题 - 如果 unpickled 对象太复杂(例如随机搜索、管道和模型),它就会失败。如果我只是声明模型,拟合它然后 pickle,那么 unpickling 和做出预测就没有问题。 事实上,库的版本之间存在一些不兼容,但以下确实有效:

pyspark==2.4.3
pyarrow==0.14.0
pandas==1.2.4
numpy==1.17.2