用于 PySpark 的 Pickling monkey-patched Keras 模型
Pickling monkey-patched Keras model for use in PySpark
我想要实现的总体目标是向每个 spark worker 发送一个 Keras 模型,以便我可以在应用于 DataFrame 的列的 UDF 中使用该模型。为此,Keras 模型需要是可腌制的。
似乎很多人都成功地通过猴子修补模型 class 来腌制 keras 模型,如下面的 link 所示:
http://zachmoshe.com/2017/04/03/pickling-keras-models.html
但是,我还没有看到任何关于如何与 Spark 一起执行此操作的示例。我的第一次尝试只是 运行 驱动程序中的 make_keras_picklable()
函数允许我在驱动程序中 pickle 和 unpickle 模型,但我无法在 UDF 中 pickle 模型。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
我得到的错误表明在 UDF 中解开模型没有使用 monkey-patched 模型 class。
AttributeError: 'Sequential' object has no attribute '_built'
看起来另一个用户 运行 在 this SO post 中遇到了类似的错误,答案是 "run make_keras_picklable()
on each worker as well." 没有给出如何执行此操作的示例。
我的问题是:调用所有 worker make_keras_picklable()
的合适方法是什么?
我尝试使用 broadcast()
(见下文)但得到了与上述相同的错误。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
Spark 用户邮件列表上的 Khaled Zaouk 通过建议将 make_keras_picklable()
更改为包装器 class 帮助了我。这太棒了!
import tempfile
import tensorflow as tf
class KerasModelWrapper:
"""Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"""
def __init__(self, model):
self.model = model
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
tf.keras.models.save_model(self.model, fd.name, overwrite=True)
model_str = fd.read()
d = {"model_str": model_str}
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
fd.write(state["model_str"])
fd.flush()
self.model = tf.keras.models.load_model(fd.name)
当然,通过将其实现为 Keras 模型 class 或 PySpark.ML transformer/estimator 的子class,这可能会变得更优雅一些。
与 Erp12 相同的想法,您可以使用此 class 包装 keras 模型,动态创建其所有属性,具有装饰器模式的相同精神并扩展 keras 模型,如 Erp12 所建议的.
import tempfile
import tensorflow as tf
class PicklableKerasModel(tf.keras.models.Model):
def __init__(self, model):
self._model = model
def __getstate__(self):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
tf.keras.models.save_model(self._model, fd.name, overwrite=True)
model_str = fd.read()
d = {'model_str': model_str}
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
fd.write(state['model_str'])
fd.flush()
model = tf.keras.models.load_model(fd.name)
self._model = model
def __getattr__(self, name):
return getattr(self.__dict__['_model'], name)
def __setattr__(self, name, value):
if name == '_model':
self.__dict__['_model'] = value
else:
setattr(self.__dict__['_model'], name, value)
def __delattr__(self, name):
delattr(self.__dict__['_model'], name)
然后你就可以像这样使用包装你的keras模型的模型:
model = Sequential() # etc etc
picklable_model = PicklableKerasModel(model)
我想要实现的总体目标是向每个 spark worker 发送一个 Keras 模型,以便我可以在应用于 DataFrame 的列的 UDF 中使用该模型。为此,Keras 模型需要是可腌制的。
似乎很多人都成功地通过猴子修补模型 class 来腌制 keras 模型,如下面的 link 所示:
http://zachmoshe.com/2017/04/03/pickling-keras-models.html
但是,我还没有看到任何关于如何与 Spark 一起执行此操作的示例。我的第一次尝试只是 运行 驱动程序中的 make_keras_picklable()
函数允许我在驱动程序中 pickle 和 unpickle 模型,但我无法在 UDF 中 pickle 模型。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
我得到的错误表明在 UDF 中解开模型没有使用 monkey-patched 模型 class。
AttributeError: 'Sequential' object has no attribute '_built'
看起来另一个用户 运行 在 this SO post 中遇到了类似的错误,答案是 "run make_keras_picklable()
on each worker as well." 没有给出如何执行此操作的示例。
我的问题是:调用所有 worker make_keras_picklable()
的合适方法是什么?
我尝试使用 broadcast()
(见下文)但得到了与上述相同的错误。
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
Khaled Zaouk 通过建议将 make_keras_picklable()
更改为包装器 class 帮助了我。这太棒了!
import tempfile
import tensorflow as tf
class KerasModelWrapper:
"""Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"""
def __init__(self, model):
self.model = model
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
tf.keras.models.save_model(self.model, fd.name, overwrite=True)
model_str = fd.read()
d = {"model_str": model_str}
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix=".hdf5", delete=True) as fd:
fd.write(state["model_str"])
fd.flush()
self.model = tf.keras.models.load_model(fd.name)
当然,通过将其实现为 Keras 模型 class 或 PySpark.ML transformer/estimator 的子class,这可能会变得更优雅一些。
与 Erp12 相同的想法,您可以使用此 class 包装 keras 模型,动态创建其所有属性,具有装饰器模式的相同精神并扩展 keras 模型,如 Erp12 所建议的.
import tempfile
import tensorflow as tf
class PicklableKerasModel(tf.keras.models.Model):
def __init__(self, model):
self._model = model
def __getstate__(self):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
tf.keras.models.save_model(self._model, fd.name, overwrite=True)
model_str = fd.read()
d = {'model_str': model_str}
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
fd.write(state['model_str'])
fd.flush()
model = tf.keras.models.load_model(fd.name)
self._model = model
def __getattr__(self, name):
return getattr(self.__dict__['_model'], name)
def __setattr__(self, name, value):
if name == '_model':
self.__dict__['_model'] = value
else:
setattr(self.__dict__['_model'], name, value)
def __delattr__(self, name):
delattr(self.__dict__['_model'], name)
然后你就可以像这样使用包装你的keras模型的模型:
model = Sequential() # etc etc
picklable_model = PicklableKerasModel(model)