用于 PySpark 的 Pickling monkey-patched Keras 模型

Pickling monkey-patched Keras model for use in PySpark

我想要实现的总体目标是向每个 spark worker 发送一个 Keras 模型,以便我可以在应用于 DataFrame 的列的 UDF 中使用该模型。为此,Keras 模型需要是可腌制的。

似乎很多人都成功地通过猴子修补模型 class 来腌制 keras 模型,如下面的 link 所示:

http://zachmoshe.com/2017/04/03/pickling-keras-models.html

但是,我还没有看到任何关于如何与 Spark 一起执行此操作的示例。我的第一次尝试只是 运行 驱动程序中的 make_keras_picklable() 函数允许我在驱动程序中 pickle 和 unpickle 模型,但我无法在 UDF 中 pickle 模型。

def make_keras_picklable():
    "Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
    ...

make_keras_picklable()

model = Sequential() # etc etc

def score(case):
    ....
    score = model.predict(case)
    ...

def scoreUDF = udf(score, ArrayType(FloatType()))

我得到的错误表明在 UDF 中解开模型没有使用 monkey-patched 模型 class。

AttributeError: 'Sequential' object has no attribute '_built'

看起来另一个用户 运行 在 this SO post 中遇到了类似的错误,答案是 "run make_keras_picklable() on each worker as well." 没有给出如何执行此操作的示例。

我的问题是:调用所有 worker make_keras_picklable() 的合适方法是什么?

我尝试使用 broadcast()(见下文)但得到了与上述相同的错误。

def make_keras_picklable():
    "Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
    ...

make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())

model = Sequential() # etc etc

def score(case):
    ....
    score = model.predict(case)
    ...

def scoreUDF = udf(score, ArrayType(FloatType()))
Spark 用户邮件列表上的

Khaled Zaouk 通过建议将 make_keras_picklable() 更改为包装器 class 帮助了我。这太棒了!

import tempfile

import tensorflow as tf


class KerasModelWrapper:
    """Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"""

    def __init__(self, model):
        self.model = model

    def __getstate__(self):
        model_str = ""
        with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
            tf.keras.models.save_model(self.model, fd.name, overwrite=True)
            model_str = fd.read()
        d = {"model_str": model_str}
        return d

    def __setstate__(self, state):
        with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
            fd.write(state["model_str"])
            fd.flush()
            self.model = tf.keras.models.load_model(fd.name)

当然,通过将其实现为 Keras 模型 class 或 PySpark.ML transformer/estimator 的子class,这可能会变得更优雅一些。

与 Erp12 相同的想法,您可以使用此 class 包装 keras 模型,动态创建其所有属性,具有装饰器模式的相同精神并扩展 keras 模型,如 Erp12 所建议的.

import tempfile
import tensorflow as tf

class PicklableKerasModel(tf.keras.models.Model):

    def __init__(self, model):
        self._model = model

    def __getstate__(self):
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            tf.keras.models.save_model(self._model, fd.name, overwrite=True)
            model_str = fd.read()
        d = {'model_str': model_str}
        return d

    def __setstate__(self, state):
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            fd.write(state['model_str'])
            fd.flush()
            model = tf.keras.models.load_model(fd.name)
        self._model = model

    def __getattr__(self, name):
        return getattr(self.__dict__['_model'], name)

    def __setattr__(self, name, value):
        if name == '_model':
            self.__dict__['_model'] = value
        else:
            setattr(self.__dict__['_model'], name, value)

    def __delattr__(self, name):
        delattr(self.__dict__['_model'], name)

然后你就可以像这样使用包装你的keras模型的模型:

model = Sequential() # etc etc

picklable_model = PicklableKerasModel(model)