无法将核心 Estimator 与 contrib Predictor 一起使用

Unable to use core Estimator with contrib Predictor

我正在使用固定估计器,并且正在努力应对糟糕的预测性能,因此我正在尝试使用 tf.contrib.predictor 来提高我的推理性能。我做了这个简单的例子来重现我的问题:

import tensorflow as tf
from tensorflow.contrib import predictor

def serving_input_fn():
  x = tf.placeholder(dtype=tf.string, shape=[1], name='x')
  inputs = {'x': x }
  return tf.estimator.export.ServingInputReceiver(inputs, inputs)

input_feature_column = tf.feature_column.numeric_column('x', shape=[1])
estimator = tf.estimator.DNNRegressor(
    feature_columns=[input_feature_column],
    hidden_units=[10, 20, 10],
    model_dir="model_dir\predictor-test")

estimator_predictor = predictor.from_estimator(estimator, serving_input_fn)

estimator_predictor({"inputs": ["1.0"]})

这会产生以下异常:

UnimplementedError (see above for traceback): Cast string to float is not supported
[[Node: dnn/input_from_feature_columns/input_layer/x/ToFloat = Cast[DstT=DT_FLOAT, SrcT=DT_STRING, _device="/job:localhost/replica:0/task:0/device:CPU:0"](dnn/input_from_feature_columns/input_layer/x/ExpandDims)]]

我尝试在我的 serving_input_fn() 中使用 tf.estimator.export.TensorServingInputReceiver 而不是 ServingInputReceiver,这样我就可以为我的模型提供我想要的数值张量:

def serving_input_fn():
  x = tf.placeholder(dtype=tf.float32, shape=[1], name='x')
  return tf.estimator.export.TensorServingInputReceiver(x, x)

但随后我在 predictor.from_estimator() 调用中遇到以下异常:

ValueError: features should be a dictionary of Tensors. Given type: <class 'tensorflow.python.framework.ops.Tensor'>

有什么想法吗?

错误在以下行中:

estimator_predictor({"inputs": ["1.0"]})

请将 1.0 放在引号外。目前它是一个字符串。

我对所有这一切的理解并不十分扎实,但我确实做到了,考虑到社区的规模,我会尝试分享我所做的。

首先,我是 运行 tensorflow 1.5 二进制文件,手动应用了这个 patch

我 运行 的确切代码是这样的:

def serving_input_fn():
    x = tf.placeholder(dtype=tf.float32, shape=[3500], name='x')
    inputs = {'x': x }

    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

estimator = tf.estimator.Estimator(
    model_fn=model_fn,
    model_dir="{}/model_dir_{}/model.ckpt-103712".format(script_dir, 3))

estimator_predictor = tf.contrib.predictor.from_estimator(
                            estimator, serving_input_fn)

p = estimator_predictor(
        {"x": np.array(sample.normalized.input_data)})

我的情况与您的示例有点不同,因为我使用的是自定义 Estimator,但对于您的情况,我想您应该尝试这样的操作:

def serving_input_fn():
  x = tf.placeholder(dtype=tf.float32, shape=[1], name='x')
  inputs = {'x': x }

  return tf.estimator.export.ServingInputReceiver(inputs, inputs)

estimator = ...

estimator_predictor = tf.contrib.predictor.from_estimator(
                            estimator, serving_input_fn)

estimator_predictor({"x": [1.0]})

经过几天的努力,我想分享一下我所做的。以下代码也可从 https://github.com/dage/tensorflow-estimator-predictor-example

TL;DR:预测器与自定义估算器配合使用效果最佳,性能提升巨大。

import tensorflow as tf
import numpy as np
import datetime
import time

FEATURES_RANK = 3   # The number of inputs
LABELS_RANK = 2     # The number of outputs

# Returns a numpy array of rank LABELS_RANK based on the features argument.
# Can be used when creating a training dataset.
def features_to_labels(features):
    sum_column = features.sum(1).reshape(features.shape[0], 1)
    labels = np.hstack((sum_column*i for i in range(1, LABELS_RANK+1)))
    return labels

def serving_input_fn():
    x = tf.placeholder(dtype=tf.float32, shape=[None, FEATURES_RANK], name='x')     # match dtype in input_fn
    inputs = {'x': x }
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

def model_fn(features, labels, mode):
    net = features["x"]         # input
    for units in [4, 8, 4]:     # hidden units
        net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
        net = tf.layers.dropout(net, rate=0.1)
    output = tf.layers.dense(net, LABELS_RANK, activation=None)

    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode, predictions=output, export_outputs={"out": tf.estimator.export.PredictOutput(output)})

    loss = tf.losses.mean_squared_error(labels, output)

    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=loss)

    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

# expecting a numpy array of shape (1, FEATURE_RANK) for constant_feature argument
def input_fn(num_samples, constant_feature = None, is_infinite = True):
    feature_values = np.full((num_samples, FEATURES_RANK), constant_feature) if isinstance(constant_feature, np.ndarray) else np.random.rand(num_samples, FEATURES_RANK)
    feature_values = np.float32(feature_values) # match dtype in serving_input_fn
    labels = features_to_labels(feature_values)
    dataset = tf.data.Dataset.from_tensors(({"x": feature_values}, labels))
    if is_infinite:
        dataset = dataset.repeat()
    return dataset.make_one_shot_iterator().get_next()

estimator = tf.estimator.Estimator(
    model_fn=model_fn,  
    model_dir="model_dir\estimator-predictor-test-{date:%Y-%m-%d %H.%M.%S}".format(date=datetime.datetime.now()))

train = estimator.train(input_fn=lambda : input_fn(50), steps=500)
evaluate = estimator.evaluate(input_fn=lambda : input_fn(20), steps=1)

predictor = tf.contrib.predictor.from_estimator(estimator, serving_input_fn)

consistency_check_features = np.random.rand(1, FEATURES_RANK)
consistency_check_labels = features_to_labels(consistency_check_features)

num_calls_predictor = 100
predictor_input = {"x": consistency_check_features}
start_time_predictor = time.clock()
for i in range(num_calls_predictor):
    predictor_prediction = predictor(predictor_input)
delta_time_predictor = 1./num_calls_predictor*(time.clock() - start_time_predictor)

num_calls_estimator_predict = 10
estimator_input = lambda : input_fn(1, consistency_check_features, False)
start_time_estimator_predict = time.clock()
for i in range(num_calls_estimator_predict):
    estimator_prediction = list(estimator.predict(input_fn=estimator_input))
delta_time_estimator = 1./num_calls_estimator_predict*(time.clock() - start_time_estimator_predict)

print("{} --> {}\n  predictor={}\n  estimator={}.\n".format(consistency_check_features, consistency_check_labels, predictor_prediction, estimator_prediction))
print("Time used per estimator.predict() call: {:.5f}s, predictor(): {:.5f}s ==> predictor is {:.0f}x faster!".format(delta_time_estimator, delta_time_predictor, delta_time_estimator/delta_time_predictor))

在我的笔记本电脑上,我得到以下结果:

[[0.55424854 0.98057611 0.98604857]] --> [[2.52087322 5.04174644]]
  predictor={'output': array([[2.5221248, 5.049496 ]], dtype=float32)}
  estimator=[array([2.5221248, 5.049496 ], dtype=float32)].

Time used per estimator.predict() call: 0.30071s, predictor(): 0.00057s ==> predictor is 530x faster!