TensorFlow:帮助创建服务输入函数

TensorFlow: Help creating a serving input function

我是 TensorFlow Serving 的新手。我使用估算器训练了一个广泛而深入的模型。现在我想为我的模型服务。我创建我的服务输入接收器函数并保存模型。当我尝试使用保存的模型进行预测时,我总是收到 InternalError: Unable to get element as bytes。我真的不明白服务函数中应该包含什么或者我应该发送什么格式类型。谁能先给我解释一下服务函数的概念以及如何正确创建函数。

可重现的例子:https://github.com/dangz90/wide_and_deep_debugging/blob/master/wide%20and%20deep%20debug.ipynb

服务功能:

def serving_input_receiver_fn():
  serialized_tf_example = tf.placeholder(dtype=tf.string, 
                                         shape=[],
                                         name='input_tensor')
  receiver_tensors = {'inputs': serialized_tf_example}

  parsed_features = tf.parse_single_example(
            serialized_tf_example,
            # Defaults are not specified since both keys are required.
            features={
                'var1': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var2': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var3': tf.FixedLenFeature(shape=[1], dtype=tf.string),
                'var4': tf.VarLenFeature(dtype=tf.string),
            })

  return tf.estimator.export.ServingInputReceiver(parsed_features, receiver_tensors)

estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
estimator_predictor({ 'inputs': examples.SerializeToString() })

我试过发送 pandas、示例等。但真的不知道数据应该是什么格式。对于我的训练,输入的数据被保存为 tfrecord,然后作为数据集加载。 注意:我如果直接使用模型m.predict()我能够得到正确的预测

完整错误:

--------------------------------------------------------------------------- InternalError                             Traceback (most recent call last) /databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)    1333     try:
-> 1334       return fn(*args)    1335     except errors.OpError as e:

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run_fn(feed_dict, fetch_list, target_list, options, run_metadata)  1318       return self._call_tf_sessionrun(
-> 1319           options, feed_dict, fetch_list, target_list, run_metadata)    1320 

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _call_tf_sessionrun(self, options, feed_dict, fetch_list, target_list, run_metadata)    1406         self._session, options, feed_dict, fetch_list, target_list,
-> 1407         run_metadata)    1408 

InternalError: Unable to get element as bytes.

During handling of the above exception, another exception occurred:

InternalError                             Traceback (most recent call last) <command-364073753108128> in <module>()
      3 
      4 estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
----> 5 estimator_predictor({ 'inputs': examples_ })

/databricks/python/lib/python3.6/site-packages/tensorflow/contrib/predictor/predictor.py in __call__(self, input_dict)
     75       if value is not None:
     76         feed_dict[self.feed_tensors[key]] = value
---> 77     return self._session.run(fetches=self.fetch_tensors, feed_dict=feed_dict)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
    674                           feed_dict=feed_dict,
    675                           options=options,
--> 676                           run_metadata=run_metadata)
    677 
    678   def run_step_fn(self, step_fn):

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)    1169        feed_dict=feed_dict,    1170                               options=options,
-> 1171                               run_metadata=run_metadata)    1172       except _PREEMPTION_ERRORS as e:    1173         logging.info('An error was raised. This may be due to a preemption in '

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1268         raise six.reraise(*original_exc_info)    1269       else:
-> 1270         raise six.reraise(*original_exc_info)    1271     1272 

/databricks/python/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1253   def run(self, *args,
**kwargs):    1254     try:
-> 1255       return self._sess.run(*args, **kwargs)    1256     except _PREEMPTION_ERRORS:    1257       raise

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)    1325        feed_dict=feed_dict,    1326                                   options=options,
-> 1327                                   run_metadata=run_metadata)    1328     1329     for hook in self._hooks:

/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs)    1089     1090   def run(self, *args,
**kwargs):
-> 1091     return self._sess.run(*args, **kwargs)    1092     1093   def run_step_fn(self, step_fn, raw_session, run_with_hooks):

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
    927     try:
    928       result = self._run(None, fetches, feed_dict, options_ptr,
--> 929                          run_metadata_ptr)
    930       if run_metadata:
    931         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata)    1150     if final_fetches or final_targets or (handle and feed_dict_tensor):    1151       results = self._do_run(handle, final_targets, final_fetches,
-> 1152                              feed_dict_tensor, options, run_metadata)    1153     else:    1154       results = []

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)    1326     if handle is None:    1327       return self._do_call(_run_fn, feeds, fetches, targets, options,
-> 1328                            run_metadata)    1329     else:    1330       return self._do_call(_prun_fn, handle, feeds, fetches)

/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args)    1346           pass    1347       message = error_interpolation.interpolate(message, self._graph)
-> 1348       raise type(e)(node_def, op, message)    1349     1350   def _extend_graph(self):

InternalError: Unable to get element as bytes.

以下是示例在序列化之前的样子。

features {
  feature {
    key: "var4"
    value {
      bytes_list {
        value: "43"
        value: "65"
        value: "89"
        value: "02"
      }
    }
  }
  feature {
    key: "var3"
    value {
      bytes_list {
        value: "0123194"
      }
    }
  }
  feature {
    key: "var2"
    value {
      bytes_list {
        value: "1243"
      }
    }
  }
  feature {
    key: "var1"
    value {
      bytes_list {
        value: "54"
      }
    }
  }
}

要获取示例,我 运行 以下脚本:

def serialise_input(data):
  dict_feature = {}

  for e in data.items():
    if e[0] == "var4":
      dict_feature.update({e[0]: Feature(bytes_list=BytesList(value=[m.encode('utf-8') for m in e[1]]))})
    else:
      dict_feature.update({e[0]: Feature(bytes_list=BytesList( value=[e[1].encode()] ))})

  example = Example(features=Features(feature=dict_feature))        

  return example.SerializeToString()

# Serialize Input
raw_data = test.toPandas().dropna().iloc[0,:-1]
examples_ = serialise_input(raw_data)

提前致谢。

您的 serialized_tf_example 占位符似乎有 shape=[],这是一个示例。您应该传递单个 tf.train.Example,序列化为字符串:

# I assume here examples_ is a list of tf.train.Examples
example = examples_[0].SerializeToString()
estimator_predictor({ 'inputs': example })

如果您想提供一批示例而不是单个序列化示例,则需要使用 shape=[None]tf.io.parse_example

此外,您提供的示例应该定义了您在特征字典中引用的特征(例如 var1),以便它们可以被正确解析。


服务函数指定您的模型在预测时应如何接收其输入 - 当您将经过训练的 TensorFlow 图/模型导出为 SavedModel 时。而不是像在培训中那样使用占位符或 tf.data.Dataset 并需要有状态 python / TF 运行 时间执行的活动 TF 会话,您希望您的 SavedModel 能够被序列化和写入到磁盘,以便您可以使用 TF Serving 或 运行 将其部署到移动设备等

所以你导出这个 SavedModel - 服务输入是里面定义如何解析它发送的请求然后将其连接到你的模型的部分你可能想要发送它们的方式是 tf.train.Examples协议缓冲区(序列化为字符串,以便可以在 RPC 中发送)。然后你将它们解析成一个特征字典,这样你的估算器就可以像理解你的训练数据一样理解它们。

问题出在我构建输入和输入服务函数的方式上。这是解决方案:

feat_name_type = {'var1':str, 'var2':str, 'var3':str, 'var4':list}

def input_fn(df):
    examples = [None] * len(df)
    for i, sample in df.iterrows():
        ex = tf.train.Example()
        for feat_name, feat_type in feat_name_type.items():
            feat = ex.features.feature[feat_name]
            if feat_type == int:
                feat.int64_list.value.extend([sample[feat_name]])
            elif feat_type == float:
                feat.float_list.value.extend([sample[feat_name]])
            elif feat_type == str:
                feat.bytes_list.value.extend([sample[feat_name].encode()])                
            elif feat_type == list:
                feat.bytes_list.value.extend([s.encode() for s in sample[feat_name]])
        examples[i] = ex.SerializeToString()
    return {"inputs": examples}

'''Service input function'''
tf_feat_cols = deep_columns + wide_columns #The feature columns created for feeding into the model

serve_rcvr_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
    tf.feature_column.make_parse_example_spec(tf_feat_cols)
)
rcvr_fn_map = {
    tf.estimator.ModeKeys.PREDICT: serve_rcvr_fn,
}