将 Keras ModelCheckpoints 保存在 Google Cloud Bucket 中

Save Keras ModelCheckpoints in Google Cloud Bucket

我正在使用 Keras 和 TensorFlow 后端在 Google 云机器学习引擎上训练 LSTM 网络。在对 gcloud 和我的 python 脚本进行一些调整后,我设法部署了我的模型并成功执行了训练任务。

然后我尝试使用 Keras modelCheckpoint callback 让我的模型在每个纪元之后保存检查点。 运行 使用 Google Cloud 的本地训练工作按预期完美运行。在每个纪元之后,权重都存储在指定的路径中。但是,当我尝试在 Google 云机器学习引擎上在线 运行 相同的作业时, weights.hdf5 不会写入我的 Google 云存储桶。相反,我收到以下错误:

...
File "h5f.pyx", line 71, in h5py.h5f.open (h5py/h5f.c:1797)
IOError: Unable to open file (Unable to open file: name = 
'gs://.../weights.hdf5', errno = 2, error message = 'no such file or
directory', flags = 0, o_flags = 0)

我调查了这个问题,结果证明,Bucket 本身没有问题,因为 Keras Tensorboard callback 工作正常并将预期的输出写入同一个 bucket。我还确保 h5py 包含在 setup.py 中,位于:

├── setup.py
    └── trainer
    ├── __init__.py
    ├── ...

setup.py中的实际包含如下所示:

# setup.py
from setuptools import setup, find_packages

setup(name='kerasLSTM',
      version='0.1',
      packages=find_packages(),
      author='Kevin Katzke',
      install_requires=['keras','h5py','simplejson'],
      zip_safe=False)

我想问题归结为 GCS 无法使用 Python open for I/O 访问,因为它提供了自定义实现:

import tensorflow as tf
from tensorflow.python.lib.io import file_io

with file_io.FileIO("gs://...", 'r') as f:
    f.write("Hi!")

在检查了 Keras modelCheckpoint 回调如何实现实际文件写入后,结果发现它使用 h5py.File() for I/O:

 with h5py.File(filepath, mode='w') as f:
    f.attrs['keras_version'] = str(keras_version).encode('utf8')
    f.attrs['backend'] = K.backend().encode('utf8')
    f.attrs['model_config'] = json.dumps({
        'class_name': model.__class__.__name__,
        'config': model.get_config()
 }, default=get_json_type).encode('utf8')

并且由于 h5py packageHDF5 binary data format 的 Pythonic 接口,据我所知,h5py.File() 似乎调用了用 Fortran 编写的底层 HDF5 功能: source, documentation.

如何解决此问题并使 modelCheckpoint 回调写入我的 GCS 存储桶?有没有办法 "monkey patching" 以某种方式覆盖 hdf5 文件的打开方式以使其使用 GCS file_io.FileIO()

一个 hacky 解决方法是保存到本地文件系统,然后使用 TF IO API 进行复制。我在 GoogleCloudPlatform ML 样本上的 Keras 示例中添加了一个示例。

基本上它会检查目标目录是否是 GCS 路径 ("gs://") 并将强制将 h5py 写入本地文件系统,然后使用 TF file_io API 复制到 GCS .参见示例:https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/keras/trainer/task.py#L146

问题可以通过以下代码解决:

# Save Keras ModelCheckpoints locally
model.save('model.h5')

# Copy model.h5 over to Google Cloud Storage
with file_io.FileIO('model.h5', mode='r') as input_f:
    with file_io.FileIO('model.h5', mode='w+') as output_f:
        output_f.write(input_f.read())
        print("Saved model.h5 to GCS")

model.h5 保存在本地文件系统中并复制到 GCS。正如 Jochen 指出的那样,目前还没有简单的支持将 HDF5 模型检查点写入 GCS。使用此 hack 可以写入数据,直到提供更简单的解决方案。

我遇到了类似的问题,上面的解决方案对我不起作用。必须以二进制形式读取和写入文件。否则会抛出这个错误。

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte

所以代码将是

def copy_file_to_gcs(job_dir, file_path):
    with file_io.FileIO(file_path, mode='rb') as input_f:
        with file_io.FileIO(os.path.join(job_dir, file_path), mode='wb+') as output_f:
            output_f.write(input_f.read())

我可能有点晚了,但为了未来的访问者,我将描述如何调整以前在本地 运行 的代码以从 IO 中识别 GoogleML 的整个过程观点。

  1. Python 标准 open(file_name, mode) 不适用于存储桶 (gs://...../file_name)。需要 from tensorflow.python.lib.io import file_io 并将对 open(file_name, mode) 的所有调用更改为 file_io.FileIO(file_name, mode=mode)(注意命名的 mode 参数)。打开手柄的界面是一样的
  2. Keras and/or 其他库大多在内部使用标准 open(file_name, mode)。也就是说,对 3rd 方库的 trained_model.save(file_path) 调用将无法将结果存储到存储桶中。作业成功完成后检索模型的唯一方法是 将其存储在本地然后移动到存储桶 .

下面的代码效率很低,因为它会一次加载整个模型,然后将其转储到存储桶中,但它适用于相对较小的模型:

model.save(file_path)

with file_io.FileIO(file_path, mode='rb') as if:
    with file_io.FileIO(os.path.join(model_dir, file_path), mode='wb+') as of:
        of.write(if.read())

模式必须设置为二进制以进行读写。

当文件比较大时,分块读写以减少内存消耗是有意义的。

  1. 在 运行 执行真正的任务之前,我建议 运行 一个简单地将文件保存到远程存储桶的存根。

这个实现,暂时代替真正的 train_model 调用,应该做:

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--job-dir',
        help='GCS location with read/write access',
        required=True
    )

    args = parser.parse_args()
    arguments = args.__dict__
    job_dir = arguments.pop('job_dir')

    with file_io.FileIO(os.path.join(job_dir, "test.txt"), mode='wb+') as of:
        of.write("Test passed.")

成功执行后,您应该会在存储桶中看到内容为 "Test passed." 的文件 test.txt

这是我为在每个时期后保存模型而编写的代码。

import os
import numpy as np
import warnings
from keras.callbacks import ModelCheckpoint

class ModelCheckpointGC(ModelCheckpoint):
"""Taken from and modified:
https://github.com/keras-team/keras/blob/tf-keras/keras/callbacks.py
"""

def on_epoch_end(self, epoch, logs=None):
    logs = logs or {}
    self.epochs_since_last_save += 1
    if self.epochs_since_last_save >= self.period:
        self.epochs_since_last_save = 0
        filepath = self.filepath.format(epoch=epoch, **logs)
        if self.save_best_only:
            current = logs.get(self.monitor)
            if current is None:
                warnings.warn('Can save best model only with %s available, '
                              'skipping.' % (self.monitor), RuntimeWarning)
            else:
                if self.monitor_op(current, self.best):
                    if self.verbose > 0:
                        print('Epoch %05d: %s improved from %0.5f to %0.5f,'
                              ' saving model to %s'
                              % (epoch, self.monitor, self.best,
                                 current, filepath))
                    self.best = current
                    if self.save_weights_only:
                        self.model.save_weights(filepath, overwrite=True)
                    else:
                        if is_development():
                            self.model.save(filepath, overwrite=True)
                        else:
                            self.model.save(filepath.split(
                                "/")[-1])
                            with file_io.FileIO(filepath.split(
                                    "/")[-1], mode='rb') as input_f:
                                with file_io.FileIO(filepath, mode='wb+') as output_f:
                                    output_f.write(input_f.read())
                else:
                    if self.verbose > 0:
                        print('Epoch %05d: %s did not improve' %
                              (epoch, self.monitor))
        else:
            if self.verbose > 0:
                print('Epoch %05d: saving model to %s' % (epoch, filepath))
            if self.save_weights_only:
                self.model.save_weights(filepath, overwrite=True)
            else:
                if is_development():
                    self.model.save(filepath, overwrite=True)
                else:
                    self.model.save(filepath.split(
                        "/")[-1])
                    with file_io.FileIO(filepath.split(
                            "/")[-1], mode='rb') as input_f:
                        with file_io.FileIO(filepath, mode='wb+') as output_f:
                            output_f.write(input_f.read())

有一个函数is_development()可以检查是本地环境还是gcloud环境。在本地环境中,我确实设置了变量 LOCAL_ENV=1:

def is_development():
    """check if the environment is local or in the gcloud
    created the local variable in bash profile
    export LOCAL_ENV=1

    Returns:
        [boolean] -- True if local env
    """
    try:
        if os.environ['LOCAL_ENV'] == '1':
            return True
        else:
            return False
    except:
        return False

然后就可以使用了:

 ModelCheckpointGC(
            'gs://your_bucket/models/model.h5',
            monitor='loss',
            verbose=1,
            save_best_only=True,
            mode='min'))

我希望这对某人有所帮助并节省一些时间。

我不确定为什么没有提到这一点,但有一个解决方案,您不需要在代码中添加复制功能。

按照以下步骤安装 gcsfuse

export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse

然后在本地安装您的存储桶:

mkdir bucket
gcsfuse <cloud_bucket_name> bucket

然后使用本地目录bucket/作为模型的日志目录。

云和本地目录的同步将为您自动进行,您的代码可以保持干净。

希望对您有所帮助:)

对我来说,最简单的方法是使用 gsutil。

model.save('model.h5')
!gsutil -m cp model.h5 gs://name-of-cloud-storage/model.h5
tf.keras.models.save_model(model, filepath, save_format="tf")

save_format:'tf'或'h5',表示将模型保存到Tensorflow SavedModel还是HDF5。在 TF 2.X 中默认为 'tf',在 TF 1.X 中默认为 'h5'。