将 Keras ModelCheckpoints 保存在 Google Cloud Bucket 中
Save Keras ModelCheckpoints in Google Cloud Bucket
我正在使用 Keras 和 TensorFlow 后端在 Google 云机器学习引擎上训练 LSTM 网络。在对 gcloud 和我的 python 脚本进行一些调整后,我设法部署了我的模型并成功执行了训练任务。
然后我尝试使用 Keras modelCheckpoint callback 让我的模型在每个纪元之后保存检查点。 运行 使用 Google Cloud 的本地训练工作按预期完美运行。在每个纪元之后,权重都存储在指定的路径中。但是,当我尝试在 Google 云机器学习引擎上在线 运行 相同的作业时, weights.hdf5
不会写入我的 Google 云存储桶。相反,我收到以下错误:
...
File "h5f.pyx", line 71, in h5py.h5f.open (h5py/h5f.c:1797)
IOError: Unable to open file (Unable to open file: name =
'gs://.../weights.hdf5', errno = 2, error message = 'no such file or
directory', flags = 0, o_flags = 0)
我调查了这个问题,结果证明,Bucket 本身没有问题,因为 Keras Tensorboard callback 工作正常并将预期的输出写入同一个 bucket。我还确保 h5py
包含在 setup.py
中,位于:
├── setup.py
└── trainer
├── __init__.py
├── ...
setup.py
中的实际包含如下所示:
# setup.py
from setuptools import setup, find_packages
setup(name='kerasLSTM',
version='0.1',
packages=find_packages(),
author='Kevin Katzke',
install_requires=['keras','h5py','simplejson'],
zip_safe=False)
我想问题归结为 GCS 无法使用 Python open
for I/O 访问,因为它提供了自定义实现:
import tensorflow as tf
from tensorflow.python.lib.io import file_io
with file_io.FileIO("gs://...", 'r') as f:
f.write("Hi!")
在检查了 Keras modelCheckpoint 回调如何实现实际文件写入后,结果发现它使用 h5py.File() for I/O:
with h5py.File(filepath, mode='w') as f:
f.attrs['keras_version'] = str(keras_version).encode('utf8')
f.attrs['backend'] = K.backend().encode('utf8')
f.attrs['model_config'] = json.dumps({
'class_name': model.__class__.__name__,
'config': model.get_config()
}, default=get_json_type).encode('utf8')
并且由于 h5py package
是 HDF5 binary data format
的 Pythonic 接口,据我所知,h5py.File()
似乎调用了用 Fortran 编写的底层 HDF5
功能: source, documentation.
如何解决此问题并使 modelCheckpoint 回调写入我的 GCS 存储桶?有没有办法 "monkey patching" 以某种方式覆盖 hdf5 文件的打开方式以使其使用 GCS file_io.FileIO()
?
一个 hacky 解决方法是保存到本地文件系统,然后使用 TF IO API 进行复制。我在 GoogleCloudPlatform ML 样本上的 Keras 示例中添加了一个示例。
基本上它会检查目标目录是否是 GCS 路径 ("gs://") 并将强制将 h5py 写入本地文件系统,然后使用 TF file_io API 复制到 GCS .参见示例:https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/keras/trainer/task.py#L146
问题可以通过以下代码解决:
# Save Keras ModelCheckpoints locally
model.save('model.h5')
# Copy model.h5 over to Google Cloud Storage
with file_io.FileIO('model.h5', mode='r') as input_f:
with file_io.FileIO('model.h5', mode='w+') as output_f:
output_f.write(input_f.read())
print("Saved model.h5 to GCS")
model.h5 保存在本地文件系统中并复制到 GCS。正如 Jochen 指出的那样,目前还没有简单的支持将 HDF5 模型检查点写入 GCS。使用此 hack 可以写入数据,直到提供更简单的解决方案。
我遇到了类似的问题,上面的解决方案对我不起作用。必须以二进制形式读取和写入文件。否则会抛出这个错误。
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte
所以代码将是
def copy_file_to_gcs(job_dir, file_path):
with file_io.FileIO(file_path, mode='rb') as input_f:
with file_io.FileIO(os.path.join(job_dir, file_path), mode='wb+') as output_f:
output_f.write(input_f.read())
我可能有点晚了,但为了未来的访问者,我将描述如何调整以前在本地 运行 的代码以从 IO 中识别 GoogleML 的整个过程观点。
- Python 标准
open(file_name, mode)
不适用于存储桶 (gs://...../file_name
)。需要 from tensorflow.python.lib.io import file_io
并将对 open(file_name, mode)
的所有调用更改为 file_io.FileIO(file_name, mode=mode)
(注意命名的 mode
参数)。打开手柄的界面是一样的
- Keras and/or 其他库大多在内部使用标准
open(file_name, mode)
。也就是说,对 3rd 方库的 trained_model.save(file_path)
调用将无法将结果存储到存储桶中。作业成功完成后检索模型的唯一方法是 将其存储在本地然后移动到存储桶 .
下面的代码效率很低,因为它会一次加载整个模型,然后将其转储到存储桶中,但它适用于相对较小的模型:
model.save(file_path)
with file_io.FileIO(file_path, mode='rb') as if:
with file_io.FileIO(os.path.join(model_dir, file_path), mode='wb+') as of:
of.write(if.read())
模式必须设置为二进制以进行读写。
当文件比较大时,分块读写以减少内存消耗是有意义的。
- 在 运行 执行真正的任务之前,我建议 运行 一个简单地将文件保存到远程存储桶的存根。
这个实现,暂时代替真正的 train_model
调用,应该做:
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--job-dir',
help='GCS location with read/write access',
required=True
)
args = parser.parse_args()
arguments = args.__dict__
job_dir = arguments.pop('job_dir')
with file_io.FileIO(os.path.join(job_dir, "test.txt"), mode='wb+') as of:
of.write("Test passed.")
成功执行后,您应该会在存储桶中看到内容为 "Test passed."
的文件 test.txt
。
这是我为在每个时期后保存模型而编写的代码。
import os
import numpy as np
import warnings
from keras.callbacks import ModelCheckpoint
class ModelCheckpointGC(ModelCheckpoint):
"""Taken from and modified:
https://github.com/keras-team/keras/blob/tf-keras/keras/callbacks.py
"""
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.epochs_since_last_save += 1
if self.epochs_since_last_save >= self.period:
self.epochs_since_last_save = 0
filepath = self.filepath.format(epoch=epoch, **logs)
if self.save_best_only:
current = logs.get(self.monitor)
if current is None:
warnings.warn('Can save best model only with %s available, '
'skipping.' % (self.monitor), RuntimeWarning)
else:
if self.monitor_op(current, self.best):
if self.verbose > 0:
print('Epoch %05d: %s improved from %0.5f to %0.5f,'
' saving model to %s'
% (epoch, self.monitor, self.best,
current, filepath))
self.best = current
if self.save_weights_only:
self.model.save_weights(filepath, overwrite=True)
else:
if is_development():
self.model.save(filepath, overwrite=True)
else:
self.model.save(filepath.split(
"/")[-1])
with file_io.FileIO(filepath.split(
"/")[-1], mode='rb') as input_f:
with file_io.FileIO(filepath, mode='wb+') as output_f:
output_f.write(input_f.read())
else:
if self.verbose > 0:
print('Epoch %05d: %s did not improve' %
(epoch, self.monitor))
else:
if self.verbose > 0:
print('Epoch %05d: saving model to %s' % (epoch, filepath))
if self.save_weights_only:
self.model.save_weights(filepath, overwrite=True)
else:
if is_development():
self.model.save(filepath, overwrite=True)
else:
self.model.save(filepath.split(
"/")[-1])
with file_io.FileIO(filepath.split(
"/")[-1], mode='rb') as input_f:
with file_io.FileIO(filepath, mode='wb+') as output_f:
output_f.write(input_f.read())
有一个函数is_development()
可以检查是本地环境还是gcloud环境。在本地环境中,我确实设置了变量 LOCAL_ENV=1
:
def is_development():
"""check if the environment is local or in the gcloud
created the local variable in bash profile
export LOCAL_ENV=1
Returns:
[boolean] -- True if local env
"""
try:
if os.environ['LOCAL_ENV'] == '1':
return True
else:
return False
except:
return False
然后就可以使用了:
ModelCheckpointGC(
'gs://your_bucket/models/model.h5',
monitor='loss',
verbose=1,
save_best_only=True,
mode='min'))
我希望这对某人有所帮助并节省一些时间。
我不确定为什么没有提到这一点,但有一个解决方案,您不需要在代码中添加复制功能。
按照以下步骤安装 gcsfuse:
export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse
然后在本地安装您的存储桶:
mkdir bucket
gcsfuse <cloud_bucket_name> bucket
然后使用本地目录bucket/
作为模型的日志目录。
云和本地目录的同步将为您自动进行,您的代码可以保持干净。
希望对您有所帮助:)
对我来说,最简单的方法是使用 gsutil。
model.save('model.h5')
!gsutil -m cp model.h5 gs://name-of-cloud-storage/model.h5
tf.keras.models.save_model(model, filepath, save_format="tf")
save_format:'tf'或'h5',表示将模型保存到Tensorflow SavedModel还是HDF5。在 TF 2.X 中默认为 'tf',在 TF 1.X 中默认为 'h5'。
我正在使用 Keras 和 TensorFlow 后端在 Google 云机器学习引擎上训练 LSTM 网络。在对 gcloud 和我的 python 脚本进行一些调整后,我设法部署了我的模型并成功执行了训练任务。
然后我尝试使用 Keras modelCheckpoint callback 让我的模型在每个纪元之后保存检查点。 运行 使用 Google Cloud 的本地训练工作按预期完美运行。在每个纪元之后,权重都存储在指定的路径中。但是,当我尝试在 Google 云机器学习引擎上在线 运行 相同的作业时, weights.hdf5
不会写入我的 Google 云存储桶。相反,我收到以下错误:
...
File "h5f.pyx", line 71, in h5py.h5f.open (h5py/h5f.c:1797)
IOError: Unable to open file (Unable to open file: name =
'gs://.../weights.hdf5', errno = 2, error message = 'no such file or
directory', flags = 0, o_flags = 0)
我调查了这个问题,结果证明,Bucket 本身没有问题,因为 Keras Tensorboard callback 工作正常并将预期的输出写入同一个 bucket。我还确保 h5py
包含在 setup.py
中,位于:
├── setup.py
└── trainer
├── __init__.py
├── ...
setup.py
中的实际包含如下所示:
# setup.py
from setuptools import setup, find_packages
setup(name='kerasLSTM',
version='0.1',
packages=find_packages(),
author='Kevin Katzke',
install_requires=['keras','h5py','simplejson'],
zip_safe=False)
我想问题归结为 GCS 无法使用 Python open
for I/O 访问,因为它提供了自定义实现:
import tensorflow as tf
from tensorflow.python.lib.io import file_io
with file_io.FileIO("gs://...", 'r') as f:
f.write("Hi!")
在检查了 Keras modelCheckpoint 回调如何实现实际文件写入后,结果发现它使用 h5py.File() for I/O:
with h5py.File(filepath, mode='w') as f:
f.attrs['keras_version'] = str(keras_version).encode('utf8')
f.attrs['backend'] = K.backend().encode('utf8')
f.attrs['model_config'] = json.dumps({
'class_name': model.__class__.__name__,
'config': model.get_config()
}, default=get_json_type).encode('utf8')
并且由于 h5py package
是 HDF5 binary data format
的 Pythonic 接口,据我所知,h5py.File()
似乎调用了用 Fortran 编写的底层 HDF5
功能: source, documentation.
如何解决此问题并使 modelCheckpoint 回调写入我的 GCS 存储桶?有没有办法 "monkey patching" 以某种方式覆盖 hdf5 文件的打开方式以使其使用 GCS file_io.FileIO()
?
一个 hacky 解决方法是保存到本地文件系统,然后使用 TF IO API 进行复制。我在 GoogleCloudPlatform ML 样本上的 Keras 示例中添加了一个示例。
基本上它会检查目标目录是否是 GCS 路径 ("gs://") 并将强制将 h5py 写入本地文件系统,然后使用 TF file_io API 复制到 GCS .参见示例:https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/keras/trainer/task.py#L146
问题可以通过以下代码解决:
# Save Keras ModelCheckpoints locally
model.save('model.h5')
# Copy model.h5 over to Google Cloud Storage
with file_io.FileIO('model.h5', mode='r') as input_f:
with file_io.FileIO('model.h5', mode='w+') as output_f:
output_f.write(input_f.read())
print("Saved model.h5 to GCS")
model.h5 保存在本地文件系统中并复制到 GCS。正如 Jochen 指出的那样,目前还没有简单的支持将 HDF5 模型检查点写入 GCS。使用此 hack 可以写入数据,直到提供更简单的解决方案。
我遇到了类似的问题,上面的解决方案对我不起作用。必须以二进制形式读取和写入文件。否则会抛出这个错误。
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte
所以代码将是
def copy_file_to_gcs(job_dir, file_path):
with file_io.FileIO(file_path, mode='rb') as input_f:
with file_io.FileIO(os.path.join(job_dir, file_path), mode='wb+') as output_f:
output_f.write(input_f.read())
我可能有点晚了,但为了未来的访问者,我将描述如何调整以前在本地 运行 的代码以从 IO 中识别 GoogleML 的整个过程观点。
- Python 标准
open(file_name, mode)
不适用于存储桶 (gs://...../file_name
)。需要from tensorflow.python.lib.io import file_io
并将对open(file_name, mode)
的所有调用更改为file_io.FileIO(file_name, mode=mode)
(注意命名的mode
参数)。打开手柄的界面是一样的 - Keras and/or 其他库大多在内部使用标准
open(file_name, mode)
。也就是说,对 3rd 方库的trained_model.save(file_path)
调用将无法将结果存储到存储桶中。作业成功完成后检索模型的唯一方法是 将其存储在本地然后移动到存储桶 .
下面的代码效率很低,因为它会一次加载整个模型,然后将其转储到存储桶中,但它适用于相对较小的模型:
model.save(file_path)
with file_io.FileIO(file_path, mode='rb') as if:
with file_io.FileIO(os.path.join(model_dir, file_path), mode='wb+') as of:
of.write(if.read())
模式必须设置为二进制以进行读写。
当文件比较大时,分块读写以减少内存消耗是有意义的。
- 在 运行 执行真正的任务之前,我建议 运行 一个简单地将文件保存到远程存储桶的存根。
这个实现,暂时代替真正的 train_model
调用,应该做:
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--job-dir',
help='GCS location with read/write access',
required=True
)
args = parser.parse_args()
arguments = args.__dict__
job_dir = arguments.pop('job_dir')
with file_io.FileIO(os.path.join(job_dir, "test.txt"), mode='wb+') as of:
of.write("Test passed.")
成功执行后,您应该会在存储桶中看到内容为 "Test passed."
的文件 test.txt
。
这是我为在每个时期后保存模型而编写的代码。
import os
import numpy as np
import warnings
from keras.callbacks import ModelCheckpoint
class ModelCheckpointGC(ModelCheckpoint):
"""Taken from and modified:
https://github.com/keras-team/keras/blob/tf-keras/keras/callbacks.py
"""
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.epochs_since_last_save += 1
if self.epochs_since_last_save >= self.period:
self.epochs_since_last_save = 0
filepath = self.filepath.format(epoch=epoch, **logs)
if self.save_best_only:
current = logs.get(self.monitor)
if current is None:
warnings.warn('Can save best model only with %s available, '
'skipping.' % (self.monitor), RuntimeWarning)
else:
if self.monitor_op(current, self.best):
if self.verbose > 0:
print('Epoch %05d: %s improved from %0.5f to %0.5f,'
' saving model to %s'
% (epoch, self.monitor, self.best,
current, filepath))
self.best = current
if self.save_weights_only:
self.model.save_weights(filepath, overwrite=True)
else:
if is_development():
self.model.save(filepath, overwrite=True)
else:
self.model.save(filepath.split(
"/")[-1])
with file_io.FileIO(filepath.split(
"/")[-1], mode='rb') as input_f:
with file_io.FileIO(filepath, mode='wb+') as output_f:
output_f.write(input_f.read())
else:
if self.verbose > 0:
print('Epoch %05d: %s did not improve' %
(epoch, self.monitor))
else:
if self.verbose > 0:
print('Epoch %05d: saving model to %s' % (epoch, filepath))
if self.save_weights_only:
self.model.save_weights(filepath, overwrite=True)
else:
if is_development():
self.model.save(filepath, overwrite=True)
else:
self.model.save(filepath.split(
"/")[-1])
with file_io.FileIO(filepath.split(
"/")[-1], mode='rb') as input_f:
with file_io.FileIO(filepath, mode='wb+') as output_f:
output_f.write(input_f.read())
有一个函数is_development()
可以检查是本地环境还是gcloud环境。在本地环境中,我确实设置了变量 LOCAL_ENV=1
:
def is_development():
"""check if the environment is local or in the gcloud
created the local variable in bash profile
export LOCAL_ENV=1
Returns:
[boolean] -- True if local env
"""
try:
if os.environ['LOCAL_ENV'] == '1':
return True
else:
return False
except:
return False
然后就可以使用了:
ModelCheckpointGC(
'gs://your_bucket/models/model.h5',
monitor='loss',
verbose=1,
save_best_only=True,
mode='min'))
我希望这对某人有所帮助并节省一些时间。
我不确定为什么没有提到这一点,但有一个解决方案,您不需要在代码中添加复制功能。
按照以下步骤安装 gcsfuse:
export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse
然后在本地安装您的存储桶:
mkdir bucket
gcsfuse <cloud_bucket_name> bucket
然后使用本地目录bucket/
作为模型的日志目录。
云和本地目录的同步将为您自动进行,您的代码可以保持干净。
希望对您有所帮助:)
对我来说,最简单的方法是使用 gsutil。
model.save('model.h5')
!gsutil -m cp model.h5 gs://name-of-cloud-storage/model.h5
tf.keras.models.save_model(model, filepath, save_format="tf")
save_format:'tf'或'h5',表示将模型保存到Tensorflow SavedModel还是HDF5。在 TF 2.X 中默认为 'tf',在 TF 1.X 中默认为 'h5'。