将 Estimator 转换为 TPUEstimator

Convert Estimator to TPUEstimator

是否可以在 TensorFlow 中将 Estimator 转换为 TPUEstimator 而无需重写其函数?我有一个 Estimator 形式的模型,它在 CPU 上工作得很好,但我不知道有什么方便的方法可以将它转换为 TPUEstimator 而不必重写 model_fn ] 和 input_fn

之所以需要手动完成大量工作,是因为我使用 Keras 创建我的模型,然后使用以下辅助函数创建 Estimator

   my_keras_model.compile(
                optimizer=tf.keras.optimizers.SGD(lr=0.0001, momentum=0.9),
                loss='categorical_crossentropy',
                metric='accuracy')
   estimator = tf.keras.estimator.model_to_estimator(keras_model=my_keras_model)

如果我能做类似 estimator.to_TPU_estimator() 或类似的事情就太好了——也许有人知道解决方案?

不可能有这样的函数,因为model_fn两个估计器的规格不同。有些差异非常大,例如这个(来自 TPU tutorial):

When training on a cloud TPU you must wrap the optimizer in a tf.contrib.tpu.CrossShardOptimizer, which uses an allreduce to aggregate gradients and broadcast the result to each shard (each TPU core).

这意味着修补 keras 优化器的内部结构并更新操作。

推荐的方法是为 GPU 和 TPU 模型使用不同的 model_fn 包装器,这对您来说似乎是最快的方法。在您的情况下,这意味着为 TPU 估计器重写 keras model_to_estimator 函数。


第一个也是最简单的近似值是这样的:

def model_to_estimator(keras_model=None,
                       keras_model_path=None,
                       custom_objects=None,
                       model_dir=None,
                       config=None):
  keras_weights = keras_model.get_weights()
  keras_model_fn = _create_keras_tpu_model_fn(keras_model, custom_objects)
  est = tf.contrib.tpu.TPUEstimator(keras_model_fn, model_dir=model_dir, config=config)
  _save_first_checkpoint(keras_model, est, custom_objects, keras_weights)
  return est

这里,_save_first_checkpoint调用实际上是可选的,但如果你想保留它,从tensorflow.python.keras._impl.keras.estimator导入这个函数。


真正的工作发生在 _create_keras_tpu_model_fn 函数中,它取代了 _create_keras_model_fn。变化是:

  • 内部tensorflow优化器必须用前面提到的CrossShardOptimizer包装,

  • 内部函数必须returnTPUEstimatorSpec.

可能还需要修补几行,但我觉得没问题。完整版本如下:

from tensorflow.python.keras._impl.keras.estimator import _save_first_checkpoint, _clone_and_build_model

def model_to_estimator(keras_model=None,
                       keras_model_path=None,
                       custom_objects=None,
                       model_dir=None,
                       config=None):
  keras_weights = keras_model.get_weights()
  keras_model_fn = _create_keras_tpu_model_fn(keras_model, custom_objects)
  est = tf.contrib.tpu.TPUEstimator(keras_model_fn, model_dir=model_dir, config=config)
  _save_first_checkpoint(keras_model, est, custom_objects, keras_weights)
  return est


def _create_keras_tpu_model_fn(keras_model, custom_objects=None):

  def model_fn(features, labels, mode):
    """model_fn for keras Estimator."""
    model = _clone_and_build_model(mode, keras_model, custom_objects, features,
                                   labels)
    predictions = dict(zip(model.output_names, model.outputs))

    loss = None
    train_op = None
    eval_metric_ops = None

    # Set loss and metric only during train and evaluate.
    if mode is not tf.estimator.ModeKeys.PREDICT:
      model.optimizer.optimizer = tf.contrib.tpu.CrossShardOptimizer(model.optimizer.optimizer)

      model._make_train_function()  # pylint: disable=protected-access
      loss = model.total_loss

      if model.metrics:
        eval_metric_ops = {}
        # When each metric maps to an output
        if isinstance(model.metrics, dict):
          for i, output_name in enumerate(model.metrics.keys()):
            metric_name = model.metrics[output_name]
            if callable(metric_name):
              metric_name = metric_name.__name__
            # When some outputs use the same metric
            if list(model.metrics.values()).count(metric_name) > 1:
              metric_name += '_' + output_name
            eval_metric_ops[metric_name] = tf.metrics.mean(
                model.metrics_tensors[i - len(model.metrics)])
        else:
          for i, metric_name in enumerate(model.metrics):
            if callable(metric_name):
              metric_name = metric_name.__name__
            eval_metric_ops[metric_name] = tf.metrics.mean(
                model.metrics_tensors[i])

    if mode is tf.estimator.ModeKeys.TRAIN:
      train_op = model.train_function.updates_op

    return tf.contrib.tpu.TPUEstimatorSpec(
        mode=mode,
        predictions=predictions,
        loss=loss,
        train_op=train_op,
        eval_metric_ops=eval_metric_ops)

  return model_fn